MAPREDUCE-7322. revisiting TestMRIntermediateDataEncryption. Contributed by Ahmed Hussein.
(cherry picked from commit 38f86cc8c7
)
This commit is contained in:
parent
2439e07708
commit
a92300e0a9
|
@ -165,10 +165,11 @@ public class JarFinder {
|
||||||
if (!testDir.exists()) {
|
if (!testDir.exists()) {
|
||||||
testDir.mkdirs();
|
testDir.mkdirs();
|
||||||
}
|
}
|
||||||
File tempJar = File.createTempFile("hadoop-", "", testDir);
|
File tempFile = File.createTempFile("hadoop-", "", testDir);
|
||||||
tempJar = new File(tempJar.getAbsolutePath() + ".jar");
|
File tempJar = new File(tempFile.getAbsolutePath() + ".jar");
|
||||||
createJar(baseDir, tempJar);
|
createJar(baseDir, tempJar);
|
||||||
tempJar.deleteOnExit();
|
tempJar.deleteOnExit();
|
||||||
|
tempFile.deleteOnExit();
|
||||||
return tempJar.getAbsolutePath();
|
return tempJar.getAbsolutePath();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -39,6 +39,7 @@ import java.util.Map;
|
||||||
|
|
||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.mapreduce.util.MRJobConfUtil;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptFailEvent;
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptFailEvent;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
|
||||||
|
@ -104,6 +105,8 @@ import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.apache.hadoop.yarn.util.Clock;
|
import org.apache.hadoop.yarn.util.Clock;
|
||||||
import org.apache.hadoop.yarn.util.SystemClock;
|
import org.apache.hadoop.yarn.util.SystemClock;
|
||||||
|
|
||||||
|
import org.junit.BeforeClass;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.mockito.ArgumentCaptor;
|
import org.mockito.ArgumentCaptor;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
@ -113,15 +116,24 @@ import org.slf4j.LoggerFactory;
|
||||||
public class TestRecovery {
|
public class TestRecovery {
|
||||||
|
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(TestRecovery.class);
|
private static final Logger LOG = LoggerFactory.getLogger(TestRecovery.class);
|
||||||
private static Path outputDir = new Path(new File("target",
|
|
||||||
TestRecovery.class.getName()).getAbsolutePath() +
|
private static File testRootDir;
|
||||||
Path.SEPARATOR + "out");
|
private static Path outputDir;
|
||||||
private static String partFile = "part-r-00000";
|
private static String partFile = "part-r-00000";
|
||||||
private Text key1 = new Text("key1");
|
private Text key1 = new Text("key1");
|
||||||
private Text key2 = new Text("key2");
|
private Text key2 = new Text("key2");
|
||||||
private Text val1 = new Text("val1");
|
private Text val1 = new Text("val1");
|
||||||
private Text val2 = new Text("val2");
|
private Text val2 = new Text("val2");
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setupClass() throws Exception {
|
||||||
|
// setup the test root directory
|
||||||
|
testRootDir =
|
||||||
|
GenericTestUtils.setupTestRootDir(
|
||||||
|
TestRecovery.class);
|
||||||
|
outputDir = new Path(testRootDir.getAbsolutePath(), "out");
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* AM with 2 maps and 1 reduce. For 1st map, one attempt fails, one attempt
|
* AM with 2 maps and 1 reduce. For 1st map, one attempt fails, one attempt
|
||||||
* completely disappears because of failed launch, one attempt gets killed and
|
* completely disappears because of failed launch, one attempt gets killed and
|
||||||
|
@ -599,14 +611,13 @@ public class TestRecovery {
|
||||||
MRApp app = new MRAppWithHistory(1, 1, false, this.getClass().getName(),
|
MRApp app = new MRAppWithHistory(1, 1, false, this.getClass().getName(),
|
||||||
true, ++runCount) {
|
true, ++runCount) {
|
||||||
};
|
};
|
||||||
Configuration conf = new Configuration();
|
Configuration conf =
|
||||||
|
MRJobConfUtil.initEncryptedIntermediateConfigsForTesting(null);
|
||||||
conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
|
conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
|
||||||
conf.setBoolean("mapred.mapper.new-api", true);
|
conf.setBoolean("mapred.mapper.new-api", true);
|
||||||
conf.setBoolean("mapred.reducer.new-api", true);
|
conf.setBoolean("mapred.reducer.new-api", true);
|
||||||
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
|
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
|
||||||
conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
|
conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
|
||||||
conf.setBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA, true);
|
|
||||||
|
|
||||||
// run the MR job at the first attempt
|
// run the MR job at the first attempt
|
||||||
Job jobAttempt1 = app.submit(conf);
|
Job jobAttempt1 = app.submit(conf);
|
||||||
app.waitForState(jobAttempt1, JobState.RUNNING);
|
app.waitForState(jobAttempt1, JobState.RUNNING);
|
||||||
|
|
|
@ -42,7 +42,8 @@ import org.apache.hadoop.mapred.Merger.Segment;
|
||||||
import org.apache.hadoop.mapreduce.MRConfig;
|
import org.apache.hadoop.mapreduce.MRConfig;
|
||||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||||
import org.apache.hadoop.mapreduce.TaskAttemptID;
|
import org.apache.hadoop.mapreduce.TaskAttemptID;
|
||||||
import org.apache.hadoop.mapreduce.CryptoUtils;
|
import org.apache.hadoop.mapreduce.security.IntermediateEncryptedStream;
|
||||||
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@ -576,7 +577,7 @@ public class BackupStore<K,V> {
|
||||||
file = lDirAlloc.getLocalPathForWrite(tmp.toUri().getPath(),
|
file = lDirAlloc.getLocalPathForWrite(tmp.toUri().getPath(),
|
||||||
-1, conf);
|
-1, conf);
|
||||||
FSDataOutputStream out = fs.create(file);
|
FSDataOutputStream out = fs.create(file);
|
||||||
out = CryptoUtils.wrapIfNecessary(conf, out);
|
out = IntermediateEncryptedStream.wrapIfNecessary(conf, out, tmp);
|
||||||
return new Writer<K, V>(conf, out, null, null, null, null, true);
|
return new Writer<K, V>(conf, out, null, null, null, null, true);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -63,6 +63,7 @@ import org.apache.hadoop.mapreduce.TaskType;
|
||||||
import org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter;
|
import org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter;
|
||||||
import org.apache.hadoop.mapreduce.lib.map.WrappedMapper;
|
import org.apache.hadoop.mapreduce.lib.map.WrappedMapper;
|
||||||
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter;
|
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter;
|
||||||
|
import org.apache.hadoop.mapreduce.security.IntermediateEncryptedStream;
|
||||||
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex;
|
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex;
|
||||||
import org.apache.hadoop.mapreduce.task.MapContextImpl;
|
import org.apache.hadoop.mapreduce.task.MapContextImpl;
|
||||||
import org.apache.hadoop.mapreduce.CryptoUtils;
|
import org.apache.hadoop.mapreduce.CryptoUtils;
|
||||||
|
@ -1630,7 +1631,9 @@ public class MapTask extends Task {
|
||||||
IFile.Writer<K, V> writer = null;
|
IFile.Writer<K, V> writer = null;
|
||||||
try {
|
try {
|
||||||
long segmentStart = out.getPos();
|
long segmentStart = out.getPos();
|
||||||
partitionOut = CryptoUtils.wrapIfNecessary(job, out, false);
|
partitionOut =
|
||||||
|
IntermediateEncryptedStream.wrapIfNecessary(job, out, false,
|
||||||
|
filename);
|
||||||
writer = new Writer<K, V>(job, partitionOut, keyClass, valClass, codec,
|
writer = new Writer<K, V>(job, partitionOut, keyClass, valClass, codec,
|
||||||
spilledRecordsCounter);
|
spilledRecordsCounter);
|
||||||
if (combinerRunner == null) {
|
if (combinerRunner == null) {
|
||||||
|
@ -1687,6 +1690,7 @@ public class MapTask extends Task {
|
||||||
Path indexFilename =
|
Path indexFilename =
|
||||||
mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
|
mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
|
||||||
* MAP_OUTPUT_INDEX_RECORD_LENGTH);
|
* MAP_OUTPUT_INDEX_RECORD_LENGTH);
|
||||||
|
IntermediateEncryptedStream.addSpillIndexFile(indexFilename, job);
|
||||||
spillRec.writeToFile(indexFilename, job);
|
spillRec.writeToFile(indexFilename, job);
|
||||||
} else {
|
} else {
|
||||||
indexCacheList.add(spillRec);
|
indexCacheList.add(spillRec);
|
||||||
|
@ -1727,7 +1731,9 @@ public class MapTask extends Task {
|
||||||
try {
|
try {
|
||||||
long segmentStart = out.getPos();
|
long segmentStart = out.getPos();
|
||||||
// Create a new codec, don't care!
|
// Create a new codec, don't care!
|
||||||
partitionOut = CryptoUtils.wrapIfNecessary(job, out, false);
|
partitionOut =
|
||||||
|
IntermediateEncryptedStream.wrapIfNecessary(job, out, false,
|
||||||
|
filename);
|
||||||
writer = new IFile.Writer<K,V>(job, partitionOut, keyClass, valClass, codec,
|
writer = new IFile.Writer<K,V>(job, partitionOut, keyClass, valClass, codec,
|
||||||
spilledRecordsCounter);
|
spilledRecordsCounter);
|
||||||
|
|
||||||
|
@ -1761,6 +1767,7 @@ public class MapTask extends Task {
|
||||||
Path indexFilename =
|
Path indexFilename =
|
||||||
mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
|
mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
|
||||||
* MAP_OUTPUT_INDEX_RECORD_LENGTH);
|
* MAP_OUTPUT_INDEX_RECORD_LENGTH);
|
||||||
|
IntermediateEncryptedStream.addSpillIndexFile(indexFilename, job);
|
||||||
spillRec.writeToFile(indexFilename, job);
|
spillRec.writeToFile(indexFilename, job);
|
||||||
} else {
|
} else {
|
||||||
indexCacheList.add(spillRec);
|
indexCacheList.add(spillRec);
|
||||||
|
@ -1854,15 +1861,19 @@ public class MapTask extends Task {
|
||||||
finalOutFileSize += rfs.getFileStatus(filename[i]).getLen();
|
finalOutFileSize += rfs.getFileStatus(filename[i]).getLen();
|
||||||
}
|
}
|
||||||
if (numSpills == 1) { //the spill is the final output
|
if (numSpills == 1) { //the spill is the final output
|
||||||
|
Path indexFileOutput =
|
||||||
|
mapOutputFile.getOutputIndexFileForWriteInVolume(filename[0]);
|
||||||
sameVolRename(filename[0],
|
sameVolRename(filename[0],
|
||||||
mapOutputFile.getOutputFileForWriteInVolume(filename[0]));
|
mapOutputFile.getOutputFileForWriteInVolume(filename[0]));
|
||||||
if (indexCacheList.size() == 0) {
|
if (indexCacheList.size() == 0) {
|
||||||
sameVolRename(mapOutputFile.getSpillIndexFile(0),
|
Path indexFilePath = mapOutputFile.getSpillIndexFile(0);
|
||||||
mapOutputFile.getOutputIndexFileForWriteInVolume(filename[0]));
|
IntermediateEncryptedStream.validateSpillIndexFile(
|
||||||
|
indexFilePath, job);
|
||||||
|
sameVolRename(indexFilePath, indexFileOutput);
|
||||||
} else {
|
} else {
|
||||||
indexCacheList.get(0).writeToFile(
|
indexCacheList.get(0).writeToFile(indexFileOutput, job);
|
||||||
mapOutputFile.getOutputIndexFileForWriteInVolume(filename[0]), job);
|
|
||||||
}
|
}
|
||||||
|
IntermediateEncryptedStream.addSpillIndexFile(indexFileOutput, job);
|
||||||
sortPhase.complete();
|
sortPhase.complete();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -1870,6 +1881,7 @@ public class MapTask extends Task {
|
||||||
// read in paged indices
|
// read in paged indices
|
||||||
for (int i = indexCacheList.size(); i < numSpills; ++i) {
|
for (int i = indexCacheList.size(); i < numSpills; ++i) {
|
||||||
Path indexFileName = mapOutputFile.getSpillIndexFile(i);
|
Path indexFileName = mapOutputFile.getSpillIndexFile(i);
|
||||||
|
IntermediateEncryptedStream.validateSpillIndexFile(indexFileName, job);
|
||||||
indexCacheList.add(new SpillRecord(indexFileName, job));
|
indexCacheList.add(new SpillRecord(indexFileName, job));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1881,7 +1893,7 @@ public class MapTask extends Task {
|
||||||
mapOutputFile.getOutputFileForWrite(finalOutFileSize);
|
mapOutputFile.getOutputFileForWrite(finalOutFileSize);
|
||||||
Path finalIndexFile =
|
Path finalIndexFile =
|
||||||
mapOutputFile.getOutputIndexFileForWrite(finalIndexFileSize);
|
mapOutputFile.getOutputIndexFileForWrite(finalIndexFileSize);
|
||||||
|
IntermediateEncryptedStream.addSpillIndexFile(finalIndexFile, job);
|
||||||
//The output stream for the final single output file
|
//The output stream for the final single output file
|
||||||
FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096);
|
FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096);
|
||||||
FSDataOutputStream finalPartitionOut = null;
|
FSDataOutputStream finalPartitionOut = null;
|
||||||
|
@ -1893,8 +1905,9 @@ public class MapTask extends Task {
|
||||||
try {
|
try {
|
||||||
for (int i = 0; i < partitions; i++) {
|
for (int i = 0; i < partitions; i++) {
|
||||||
long segmentStart = finalOut.getPos();
|
long segmentStart = finalOut.getPos();
|
||||||
finalPartitionOut = CryptoUtils.wrapIfNecessary(job, finalOut,
|
finalPartitionOut =
|
||||||
false);
|
IntermediateEncryptedStream.wrapIfNecessary(job, finalOut,
|
||||||
|
false, finalOutputFile);
|
||||||
Writer<K, V> writer =
|
Writer<K, V> writer =
|
||||||
new Writer<K, V>(job, finalPartitionOut, keyClass, valClass, codec, null);
|
new Writer<K, V>(job, finalPartitionOut, keyClass, valClass, codec, null);
|
||||||
writer.close();
|
writer.close();
|
||||||
|
@ -1957,7 +1970,8 @@ public class MapTask extends Task {
|
||||||
|
|
||||||
//write merged output to disk
|
//write merged output to disk
|
||||||
long segmentStart = finalOut.getPos();
|
long segmentStart = finalOut.getPos();
|
||||||
finalPartitionOut = CryptoUtils.wrapIfNecessary(job, finalOut, false);
|
finalPartitionOut = IntermediateEncryptedStream.wrapIfNecessary(job,
|
||||||
|
finalOut, false, finalOutputFile);
|
||||||
Writer<K, V> writer =
|
Writer<K, V> writer =
|
||||||
new Writer<K, V>(job, finalPartitionOut, keyClass, valClass, codec,
|
new Writer<K, V>(job, finalPartitionOut, keyClass, valClass, codec,
|
||||||
spilledRecordsCounter);
|
spilledRecordsCounter);
|
||||||
|
|
|
@ -40,6 +40,7 @@ import org.apache.hadoop.mapred.IFile.Writer;
|
||||||
import org.apache.hadoop.mapreduce.MRConfig;
|
import org.apache.hadoop.mapreduce.MRConfig;
|
||||||
import org.apache.hadoop.mapreduce.TaskType;
|
import org.apache.hadoop.mapreduce.TaskType;
|
||||||
import org.apache.hadoop.mapreduce.CryptoUtils;
|
import org.apache.hadoop.mapreduce.CryptoUtils;
|
||||||
|
import org.apache.hadoop.mapreduce.security.IntermediateEncryptedStream;
|
||||||
import org.apache.hadoop.util.PriorityQueue;
|
import org.apache.hadoop.util.PriorityQueue;
|
||||||
import org.apache.hadoop.util.Progress;
|
import org.apache.hadoop.util.Progress;
|
||||||
import org.apache.hadoop.util.Progressable;
|
import org.apache.hadoop.util.Progressable;
|
||||||
|
@ -302,7 +303,7 @@ public class Merger {
|
||||||
FSDataInputStream in = fs.open(file);
|
FSDataInputStream in = fs.open(file);
|
||||||
|
|
||||||
in.seek(segmentOffset);
|
in.seek(segmentOffset);
|
||||||
in = CryptoUtils.wrapIfNecessary(conf, in);
|
in = IntermediateEncryptedStream.wrapIfNecessary(conf, in, file);
|
||||||
reader = new Reader<K, V>(conf, in,
|
reader = new Reader<K, V>(conf, in,
|
||||||
segmentLength - CryptoUtils.cryptoPadding(conf),
|
segmentLength - CryptoUtils.cryptoPadding(conf),
|
||||||
codec, readsCounter);
|
codec, readsCounter);
|
||||||
|
@ -730,7 +731,8 @@ public class Merger {
|
||||||
approxOutputSize, conf);
|
approxOutputSize, conf);
|
||||||
|
|
||||||
FSDataOutputStream out = fs.create(outputFile);
|
FSDataOutputStream out = fs.create(outputFile);
|
||||||
out = CryptoUtils.wrapIfNecessary(conf, out);
|
out = IntermediateEncryptedStream.wrapIfNecessary(conf, out,
|
||||||
|
outputFile);
|
||||||
Writer<K, V> writer = new Writer<K, V>(conf, out, keyClass, valueClass,
|
Writer<K, V> writer = new Writer<K, V>(conf, out, keyClass, valueClass,
|
||||||
codec, writesCounter, true);
|
codec, writesCounter, true);
|
||||||
writeFile(this, writer, reporter, conf);
|
writeFile(this, writer, reporter, conf);
|
||||||
|
|
|
@ -0,0 +1,89 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.mapreduce.security;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.InputStream;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.mapreduce.CryptoUtils;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Used to wrap helpers while spilling intermediate files.
|
||||||
|
* Setting the {@link SpillCallBackInjector} helps in:
|
||||||
|
* 1- adding callbacks to capture the path of the spilled files.
|
||||||
|
* 2- Verifying the encryption when intermediate encryption is enabled.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
@InterfaceStability.Unstable
|
||||||
|
public final class IntermediateEncryptedStream {
|
||||||
|
|
||||||
|
private static SpillCallBackInjector prevSpillCBInjector = null;
|
||||||
|
|
||||||
|
public static FSDataOutputStream wrapIfNecessary(Configuration conf,
|
||||||
|
FSDataOutputStream out, Path outPath) throws IOException {
|
||||||
|
SpillCallBackInjector.get().writeSpillFileCB(outPath, out, conf);
|
||||||
|
return CryptoUtils.wrapIfNecessary(conf, out, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static FSDataOutputStream wrapIfNecessary(Configuration conf,
|
||||||
|
FSDataOutputStream out, boolean closeOutputStream,
|
||||||
|
Path outPath) throws IOException {
|
||||||
|
SpillCallBackInjector.get().writeSpillFileCB(outPath, out, conf);
|
||||||
|
return CryptoUtils.wrapIfNecessary(conf, out, closeOutputStream);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static FSDataInputStream wrapIfNecessary(Configuration conf,
|
||||||
|
FSDataInputStream in, Path inputPath) throws IOException {
|
||||||
|
SpillCallBackInjector.get().getSpillFileCB(inputPath, in, conf);
|
||||||
|
return CryptoUtils.wrapIfNecessary(conf, in);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static InputStream wrapIfNecessary(Configuration conf,
|
||||||
|
InputStream in, long length, Path inputPath) throws IOException {
|
||||||
|
SpillCallBackInjector.get().getSpillFileCB(inputPath, in, conf);
|
||||||
|
return CryptoUtils.wrapIfNecessary(conf, in, length);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void addSpillIndexFile(Path indexFilename, Configuration conf) {
|
||||||
|
SpillCallBackInjector.get().addSpillIndexFileCB(indexFilename, conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void validateSpillIndexFile(Path indexFilename,
|
||||||
|
Configuration conf) {
|
||||||
|
SpillCallBackInjector.get().validateSpillIndexFileCB(indexFilename, conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static SpillCallBackInjector resetSpillCBInjector() {
|
||||||
|
return setSpillCBInjector(prevSpillCBInjector);
|
||||||
|
}
|
||||||
|
|
||||||
|
public synchronized static SpillCallBackInjector setSpillCBInjector(
|
||||||
|
SpillCallBackInjector spillInjector) {
|
||||||
|
prevSpillCBInjector =
|
||||||
|
SpillCallBackInjector.getAndSet(spillInjector);
|
||||||
|
return spillInjector;
|
||||||
|
}
|
||||||
|
|
||||||
|
private IntermediateEncryptedStream() {}
|
||||||
|
}
|
|
@ -0,0 +1,86 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.mapreduce.security;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.InputStream;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Used for injecting callbacks while spilling files.
|
||||||
|
* Calls into this are a no-op in production code.
|
||||||
|
*/
|
||||||
|
@VisibleForTesting
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public class SpillCallBackInjector {
|
||||||
|
private static SpillCallBackInjector instance = new SpillCallBackInjector();
|
||||||
|
public static SpillCallBackInjector get() {
|
||||||
|
return instance;
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* Sets the global SpillFilesCBInjector to the new value, returning the old
|
||||||
|
* value.
|
||||||
|
*
|
||||||
|
* @param spillInjector the new implementation for the spill injector.
|
||||||
|
* @return the previous implementation.
|
||||||
|
*/
|
||||||
|
public static SpillCallBackInjector getAndSet(
|
||||||
|
SpillCallBackInjector spillInjector) {
|
||||||
|
SpillCallBackInjector prev = instance;
|
||||||
|
instance = spillInjector;
|
||||||
|
return prev;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void writeSpillIndexFileCB(Path path) {
|
||||||
|
// do nothing
|
||||||
|
}
|
||||||
|
|
||||||
|
public void writeSpillFileCB(Path path, FSDataOutputStream out,
|
||||||
|
Configuration conf) {
|
||||||
|
// do nothing
|
||||||
|
}
|
||||||
|
|
||||||
|
public void getSpillFileCB(Path path, InputStream is, Configuration conf) {
|
||||||
|
// do nothing
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getSpilledFileReport() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void handleErrorInSpillFill(Path path, Exception e) {
|
||||||
|
// do nothing
|
||||||
|
}
|
||||||
|
|
||||||
|
public void corruptSpilledFile(Path fileName) throws IOException {
|
||||||
|
// do nothing
|
||||||
|
}
|
||||||
|
|
||||||
|
public void addSpillIndexFileCB(Path path, Configuration conf) {
|
||||||
|
// do nothing
|
||||||
|
}
|
||||||
|
|
||||||
|
public void validateSpillIndexFileCB(Path path, Configuration conf) {
|
||||||
|
// do nothing
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,193 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.mapreduce.security;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.InputStream;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.LinkedHashSet;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Map.Entry;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.crypto.CryptoStreamUtils;
|
||||||
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.mapreduce.CryptoUtils;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* An implementation class that keeps track of the spilled files.
|
||||||
|
*/
|
||||||
|
public class SpillCallBackPathsFinder extends SpillCallBackInjector {
|
||||||
|
private static final Logger LOG =
|
||||||
|
LoggerFactory.getLogger(SpillCallBackPathsFinder.class);
|
||||||
|
/**
|
||||||
|
* Encrypted spilled files.
|
||||||
|
*/
|
||||||
|
private final Map<Path, Set<Long>> encryptedSpillFiles =
|
||||||
|
Collections.synchronizedMap(new ConcurrentHashMap<>());
|
||||||
|
/**
|
||||||
|
* Non-Encrypted spilled files.
|
||||||
|
*/
|
||||||
|
private final Map<Path, Set<Long>> spillFiles =
|
||||||
|
Collections.synchronizedMap(new ConcurrentHashMap<>());
|
||||||
|
/**
|
||||||
|
* Invalid position access.
|
||||||
|
*/
|
||||||
|
private final Map<Path, Set<Long>> invalidAccessMap =
|
||||||
|
Collections.synchronizedMap(new ConcurrentHashMap<>());
|
||||||
|
/**
|
||||||
|
* Index spill files.
|
||||||
|
*/
|
||||||
|
private final Set<Path> indexSpillFiles = ConcurrentHashMap.newKeySet();
|
||||||
|
/**
|
||||||
|
* Paths that were not found in the maps.
|
||||||
|
*/
|
||||||
|
private final Set<Path> negativeCache = ConcurrentHashMap.newKeySet();
|
||||||
|
|
||||||
|
protected Map<Path, Set<Long>> getFilesMap(Configuration config) {
|
||||||
|
if (CryptoUtils.isEncryptedSpillEnabled(config)) {
|
||||||
|
return encryptedSpillFiles;
|
||||||
|
}
|
||||||
|
return spillFiles;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void writeSpillFileCB(Path path, FSDataOutputStream out,
|
||||||
|
Configuration conf) {
|
||||||
|
long outPos = out.getPos();
|
||||||
|
getFilesMap(conf)
|
||||||
|
.computeIfAbsent(path, p -> ConcurrentHashMap.newKeySet())
|
||||||
|
.add(outPos);
|
||||||
|
LOG.debug("writeSpillFileCB.. path:{}; pos:{}", path, outPos);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void getSpillFileCB(Path path, InputStream is, Configuration conf) {
|
||||||
|
if (path == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
Set<Long> pathEntries = getFilesMap(conf).get(path);
|
||||||
|
if (pathEntries != null) {
|
||||||
|
try {
|
||||||
|
long isPos = CryptoStreamUtils.getInputStreamOffset(is);
|
||||||
|
if (pathEntries.contains(isPos)) {
|
||||||
|
LOG.debug("getSpillFileCB... Path {}; Pos: {}", path, isPos);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
invalidAccessMap
|
||||||
|
.computeIfAbsent(path, p -> ConcurrentHashMap.newKeySet())
|
||||||
|
.add(isPos);
|
||||||
|
LOG.debug("getSpillFileCB... access incorrect position.. "
|
||||||
|
+ "Path {}; Pos: {}", path, isPos);
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.error("Could not get inputStream position.. Path {}", path, e);
|
||||||
|
// do nothing
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
negativeCache.add(path);
|
||||||
|
LOG.warn("getSpillFileCB.. Could not find spilled file .. Path: {}", path);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getSpilledFileReport() {
|
||||||
|
StringBuilder strBuilder =
|
||||||
|
new StringBuilder("\n++++++++ Spill Report ++++++++")
|
||||||
|
.append(dumpMapEntries("Encrypted Spilled Files",
|
||||||
|
encryptedSpillFiles))
|
||||||
|
.append(dumpMapEntries("Non-Encrypted Spilled Files",
|
||||||
|
spillFiles))
|
||||||
|
.append(dumpMapEntries("Invalid Spill Access",
|
||||||
|
invalidAccessMap))
|
||||||
|
.append("\n ----- Spilled Index Files ----- ")
|
||||||
|
.append(indexSpillFiles.size());
|
||||||
|
for (Path p : indexSpillFiles) {
|
||||||
|
strBuilder.append("\n\t index-path: ").append(p.toString());
|
||||||
|
}
|
||||||
|
strBuilder.append("\n ----- Negative Cache files ----- ")
|
||||||
|
.append(negativeCache.size());
|
||||||
|
for (Path p : negativeCache) {
|
||||||
|
strBuilder.append("\n\t path: ").append(p.toString());
|
||||||
|
}
|
||||||
|
return strBuilder.toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void addSpillIndexFileCB(Path path, Configuration conf) {
|
||||||
|
if (path == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
indexSpillFiles.add(path);
|
||||||
|
LOG.debug("addSpillIndexFileCB... Path: {}", path);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void validateSpillIndexFileCB(Path path, Configuration conf) {
|
||||||
|
if (path == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (indexSpillFiles.contains(path)) {
|
||||||
|
LOG.debug("validateSpillIndexFileCB.. Path: {}", path);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
LOG.warn("validateSpillIndexFileCB.. could not retrieve indexFile.. "
|
||||||
|
+ "Path: {}", path);
|
||||||
|
negativeCache.add(path);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Set<Path> getEncryptedSpilledFiles() {
|
||||||
|
return Collections.unmodifiableSet(encryptedSpillFiles.keySet());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Gets the set of path:pos of the entries that were accessed incorrectly.
|
||||||
|
* @return a set of string in the format of {@literal Path[Pos]}
|
||||||
|
*/
|
||||||
|
public Set<String> getInvalidSpillEntries() {
|
||||||
|
Set<String> result = new LinkedHashSet<>();
|
||||||
|
for (Entry<Path, Set<Long>> spillMapEntry: invalidAccessMap.entrySet()) {
|
||||||
|
for (Long singleEntry : spillMapEntry.getValue()) {
|
||||||
|
result.add(String.format("%s[%d]",
|
||||||
|
spillMapEntry.getKey(), singleEntry));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
private String dumpMapEntries(String label,
|
||||||
|
Map<Path, Set<Long>> entriesMap) {
|
||||||
|
StringBuilder strBuilder =
|
||||||
|
new StringBuilder(String.format("%n ----- %s ----- %d", label,
|
||||||
|
entriesMap.size()));
|
||||||
|
for (Entry<Path, Set<Long>> encryptedSpillEntry
|
||||||
|
: entriesMap.entrySet()) {
|
||||||
|
strBuilder.append(String.format("%n\t\tpath: %s",
|
||||||
|
encryptedSpillEntry.getKey()));
|
||||||
|
for (Long singlePos : encryptedSpillEntry.getValue()) {
|
||||||
|
strBuilder.append(String.format("%n\t\t\tentry: %d", singlePos));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return strBuilder.toString();
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,25 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
@InterfaceStability.Unstable
|
||||||
|
/**
|
||||||
|
* Helper classes for the shuffle/spill encryptions.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.mapreduce.security;
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
|
@ -42,6 +42,7 @@ import org.apache.hadoop.mapred.Reporter;
|
||||||
import org.apache.hadoop.mapreduce.MRConfig;
|
import org.apache.hadoop.mapreduce.MRConfig;
|
||||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||||
import org.apache.hadoop.mapreduce.TaskAttemptID;
|
import org.apache.hadoop.mapreduce.TaskAttemptID;
|
||||||
|
import org.apache.hadoop.mapreduce.security.IntermediateEncryptedStream;
|
||||||
import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
|
import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
|
||||||
import org.apache.hadoop.mapreduce.CryptoUtils;
|
import org.apache.hadoop.mapreduce.CryptoUtils;
|
||||||
import org.apache.hadoop.security.ssl.SSLFactory;
|
import org.apache.hadoop.security.ssl.SSLFactory;
|
||||||
|
@ -512,7 +513,9 @@ class Fetcher<K,V> extends Thread {
|
||||||
}
|
}
|
||||||
|
|
||||||
InputStream is = input;
|
InputStream is = input;
|
||||||
is = CryptoUtils.wrapIfNecessary(jobConf, is, compressedLength);
|
is =
|
||||||
|
IntermediateEncryptedStream.wrapIfNecessary(jobConf, is,
|
||||||
|
compressedLength, null);
|
||||||
compressedLength -= CryptoUtils.cryptoPadding(jobConf);
|
compressedLength -= CryptoUtils.cryptoPadding(jobConf);
|
||||||
decompressedLength -= CryptoUtils.cryptoPadding(jobConf);
|
decompressedLength -= CryptoUtils.cryptoPadding(jobConf);
|
||||||
|
|
||||||
|
|
|
@ -36,6 +36,8 @@ import org.apache.hadoop.mapred.Reporter;
|
||||||
import org.apache.hadoop.mapred.SpillRecord;
|
import org.apache.hadoop.mapred.SpillRecord;
|
||||||
import org.apache.hadoop.mapreduce.TaskAttemptID;
|
import org.apache.hadoop.mapreduce.TaskAttemptID;
|
||||||
import org.apache.hadoop.mapreduce.CryptoUtils;
|
import org.apache.hadoop.mapreduce.CryptoUtils;
|
||||||
|
import org.apache.hadoop.mapreduce.security.IntermediateEncryptedStream;
|
||||||
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@ -151,7 +153,9 @@ class LocalFetcher<K,V> extends Fetcher<K, V> {
|
||||||
FileSystem localFs = FileSystem.getLocal(job).getRaw();
|
FileSystem localFs = FileSystem.getLocal(job).getRaw();
|
||||||
FSDataInputStream inStream = localFs.open(mapOutputFileName);
|
FSDataInputStream inStream = localFs.open(mapOutputFileName);
|
||||||
try {
|
try {
|
||||||
inStream = CryptoUtils.wrapIfNecessary(job, inStream);
|
inStream =
|
||||||
|
IntermediateEncryptedStream.wrapIfNecessary(job, inStream,
|
||||||
|
mapOutputFileName);
|
||||||
inStream.seek(ir.startOffset + CryptoUtils.cryptoPadding(job));
|
inStream.seek(ir.startOffset + CryptoUtils.cryptoPadding(job));
|
||||||
mapOutput.shuffle(LOCALHOST, inStream, compressedLength,
|
mapOutput.shuffle(LOCALHOST, inStream, compressedLength,
|
||||||
decompressedLength, metrics, reporter);
|
decompressedLength, metrics, reporter);
|
||||||
|
|
|
@ -53,7 +53,7 @@ import org.apache.hadoop.mapred.Task.CombineValuesIterator;
|
||||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||||
import org.apache.hadoop.mapreduce.TaskAttemptID;
|
import org.apache.hadoop.mapreduce.TaskAttemptID;
|
||||||
import org.apache.hadoop.mapreduce.TaskID;
|
import org.apache.hadoop.mapreduce.TaskID;
|
||||||
import org.apache.hadoop.mapreduce.CryptoUtils;
|
import org.apache.hadoop.mapreduce.security.IntermediateEncryptedStream;
|
||||||
import org.apache.hadoop.mapreduce.task.reduce.MapOutput.MapOutputComparator;
|
import org.apache.hadoop.mapreduce.task.reduce.MapOutput.MapOutputComparator;
|
||||||
import org.apache.hadoop.util.Progress;
|
import org.apache.hadoop.util.Progress;
|
||||||
import org.apache.hadoop.util.ReflectionUtils;
|
import org.apache.hadoop.util.ReflectionUtils;
|
||||||
|
@ -468,7 +468,9 @@ public class MergeManagerImpl<K, V> implements MergeManager<K, V> {
|
||||||
mergeOutputSize).suffix(
|
mergeOutputSize).suffix(
|
||||||
Task.MERGED_OUTPUT_PREFIX);
|
Task.MERGED_OUTPUT_PREFIX);
|
||||||
|
|
||||||
FSDataOutputStream out = CryptoUtils.wrapIfNecessary(jobConf, rfs.create(outputPath));
|
FSDataOutputStream out =
|
||||||
|
IntermediateEncryptedStream.wrapIfNecessary(jobConf,
|
||||||
|
rfs.create(outputPath), outputPath);
|
||||||
Writer<K, V> writer = new Writer<K, V>(jobConf, out,
|
Writer<K, V> writer = new Writer<K, V>(jobConf, out,
|
||||||
(Class<K>) jobConf.getMapOutputKeyClass(),
|
(Class<K>) jobConf.getMapOutputKeyClass(),
|
||||||
(Class<V>) jobConf.getMapOutputValueClass(), codec, null, true);
|
(Class<V>) jobConf.getMapOutputValueClass(), codec, null, true);
|
||||||
|
@ -552,7 +554,9 @@ public class MergeManagerImpl<K, V> implements MergeManager<K, V> {
|
||||||
localDirAllocator.getLocalPathForWrite(inputs.get(0).toString(),
|
localDirAllocator.getLocalPathForWrite(inputs.get(0).toString(),
|
||||||
approxOutputSize, jobConf).suffix(Task.MERGED_OUTPUT_PREFIX);
|
approxOutputSize, jobConf).suffix(Task.MERGED_OUTPUT_PREFIX);
|
||||||
|
|
||||||
FSDataOutputStream out = CryptoUtils.wrapIfNecessary(jobConf, rfs.create(outputPath));
|
FSDataOutputStream out =
|
||||||
|
IntermediateEncryptedStream.wrapIfNecessary(jobConf,
|
||||||
|
rfs.create(outputPath), outputPath);
|
||||||
Writer<K, V> writer = new Writer<K, V>(jobConf, out,
|
Writer<K, V> writer = new Writer<K, V>(jobConf, out,
|
||||||
(Class<K>) jobConf.getMapOutputKeyClass(),
|
(Class<K>) jobConf.getMapOutputKeyClass(),
|
||||||
(Class<V>) jobConf.getMapOutputValueClass(), codec, null, true);
|
(Class<V>) jobConf.getMapOutputValueClass(), codec, null, true);
|
||||||
|
@ -735,7 +739,9 @@ public class MergeManagerImpl<K, V> implements MergeManager<K, V> {
|
||||||
tmpDir, comparator, reporter, spilledRecordsCounter, null,
|
tmpDir, comparator, reporter, spilledRecordsCounter, null,
|
||||||
mergePhase);
|
mergePhase);
|
||||||
|
|
||||||
FSDataOutputStream out = CryptoUtils.wrapIfNecessary(job, fs.create(outputPath));
|
FSDataOutputStream out =
|
||||||
|
IntermediateEncryptedStream.wrapIfNecessary(job,
|
||||||
|
fs.create(outputPath), outputPath);
|
||||||
Writer<K, V> writer = new Writer<K, V>(job, out, keyClass, valueClass,
|
Writer<K, V> writer = new Writer<K, V>(job, out, keyClass, valueClass,
|
||||||
codec, null, true);
|
codec, null, true);
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -37,7 +37,7 @@ import org.apache.hadoop.mapred.Reporter;
|
||||||
import org.apache.hadoop.mapred.MapOutputFile;
|
import org.apache.hadoop.mapred.MapOutputFile;
|
||||||
|
|
||||||
import org.apache.hadoop.mapreduce.TaskAttemptID;
|
import org.apache.hadoop.mapreduce.TaskAttemptID;
|
||||||
import org.apache.hadoop.mapreduce.CryptoUtils;
|
import org.apache.hadoop.mapreduce.security.IntermediateEncryptedStream;
|
||||||
import org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.CompressAwarePath;
|
import org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.CompressAwarePath;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
@ -84,7 +84,8 @@ class OnDiskMapOutput<K, V> extends IFileWrappedMapOutput<K, V> {
|
||||||
this.fs = fs;
|
this.fs = fs;
|
||||||
this.outputPath = outputPath;
|
this.outputPath = outputPath;
|
||||||
tmpOutputPath = getTempPath(outputPath, fetcher);
|
tmpOutputPath = getTempPath(outputPath, fetcher);
|
||||||
disk = CryptoUtils.wrapIfNecessary(conf, fs.create(tmpOutputPath));
|
disk = IntermediateEncryptedStream.wrapIfNecessary(conf,
|
||||||
|
fs.create(tmpOutputPath), tmpOutputPath);
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
|
|
|
@ -17,14 +17,22 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.mapreduce.util;
|
package org.apache.hadoop.mapreduce.util;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A class that contains utility methods for MR Job configuration.
|
* A class that contains utility methods for MR Job configuration.
|
||||||
*/
|
*/
|
||||||
public final class MRJobConfUtil {
|
public final class MRJobConfUtil {
|
||||||
|
private static final Logger LOG =
|
||||||
|
LoggerFactory.getLogger(MRJobConfUtil.class);
|
||||||
public static final String REDACTION_REPLACEMENT_VAL = "*********(redacted)";
|
public static final String REDACTION_REPLACEMENT_VAL = "*********(redacted)";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -130,4 +138,54 @@ public final class MRJobConfUtil {
|
||||||
public static double convertTaskProgressToFactor(final float progress) {
|
public static double convertTaskProgressToFactor(final float progress) {
|
||||||
return Math.floor(progress * MRJobConfUtil.PROGRESS_MIN_DELTA_FACTOR);
|
return Math.floor(progress * MRJobConfUtil.PROGRESS_MIN_DELTA_FACTOR);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* For unit tests, use urandom to avoid the YarnChild process from hanging
|
||||||
|
* on low entropy systems.
|
||||||
|
*/
|
||||||
|
private static final String TEST_JVM_SECURITY_EGD_OPT =
|
||||||
|
"-Djava.security.egd=file:/dev/./urandom";
|
||||||
|
|
||||||
|
public static Configuration initEncryptedIntermediateConfigsForTesting(
|
||||||
|
Configuration conf) {
|
||||||
|
Configuration config =
|
||||||
|
(conf == null) ? new Configuration(): conf;
|
||||||
|
final String childJVMOpts =
|
||||||
|
TEST_JVM_SECURITY_EGD_OPT.concat(" ")
|
||||||
|
.concat(config.get("mapred.child.java.opts", " "));
|
||||||
|
// Set the jvm arguments.
|
||||||
|
config.set("yarn.app.mapreduce.am.admin-command-opts",
|
||||||
|
TEST_JVM_SECURITY_EGD_OPT);
|
||||||
|
config.set("mapred.child.java.opts", childJVMOpts);
|
||||||
|
config.setBoolean("mapreduce.job.encrypted-intermediate-data", true);
|
||||||
|
return config;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set local directories so that the generated folders is subdirectory of the
|
||||||
|
* test directories.
|
||||||
|
* @param conf
|
||||||
|
* @param testRootDir
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
public static Configuration setLocalDirectoriesConfigForTesting(
|
||||||
|
Configuration conf, File testRootDir) {
|
||||||
|
Configuration config =
|
||||||
|
(conf == null) ? new Configuration(): conf;
|
||||||
|
final File hadoopLocalDir = new File(testRootDir, "hadoop-dir");
|
||||||
|
// create the directory
|
||||||
|
if (!hadoopLocalDir.getAbsoluteFile().mkdirs()) {
|
||||||
|
LOG.info("{} directory already exists", hadoopLocalDir.getPath());
|
||||||
|
}
|
||||||
|
Path mapredHadoopTempDir = new Path(hadoopLocalDir.getPath());
|
||||||
|
Path mapredSystemDir = new Path(mapredHadoopTempDir, "system");
|
||||||
|
Path stagingDir = new Path(mapredHadoopTempDir, "tmp/staging");
|
||||||
|
// Set the temp directories a subdir of the test directory.
|
||||||
|
config.set("mapreduce.jobtracker.staging.root.dir", stagingDir.toString());
|
||||||
|
config.set("mapreduce.jobtracker.system.dir", mapredSystemDir.toString());
|
||||||
|
config.set("mapreduce.cluster.temp.dir", mapredHadoopTempDir.toString());
|
||||||
|
config.set("mapreduce.cluster.local.dir",
|
||||||
|
new Path(mapredHadoopTempDir, "local").toString());
|
||||||
|
return config;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,6 +23,7 @@ import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
import java.io.ByteArrayOutputStream;
|
import java.io.ByteArrayOutputStream;
|
||||||
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
@ -49,43 +50,62 @@ import org.apache.hadoop.mapred.Merger;
|
||||||
import org.apache.hadoop.mapred.Merger.Segment;
|
import org.apache.hadoop.mapred.Merger.Segment;
|
||||||
import org.apache.hadoop.mapred.RawKeyValueIterator;
|
import org.apache.hadoop.mapred.RawKeyValueIterator;
|
||||||
import org.apache.hadoop.mapred.Reporter;
|
import org.apache.hadoop.mapred.Reporter;
|
||||||
import org.apache.hadoop.mapreduce.CryptoUtils;
|
|
||||||
import org.apache.hadoop.mapreduce.JobID;
|
import org.apache.hadoop.mapreduce.JobID;
|
||||||
import org.apache.hadoop.mapreduce.MRConfig;
|
import org.apache.hadoop.mapreduce.MRConfig;
|
||||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
|
||||||
import org.apache.hadoop.mapreduce.TaskAttemptID;
|
import org.apache.hadoop.mapreduce.TaskAttemptID;
|
||||||
import org.apache.hadoop.mapreduce.TaskID;
|
import org.apache.hadoop.mapreduce.TaskID;
|
||||||
import org.apache.hadoop.mapreduce.TaskType;
|
import org.apache.hadoop.mapreduce.TaskType;
|
||||||
|
import org.apache.hadoop.mapreduce.security.IntermediateEncryptedStream;
|
||||||
import org.apache.hadoop.mapreduce.security.TokenCache;
|
import org.apache.hadoop.mapreduce.security.TokenCache;
|
||||||
import org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.CompressAwarePath;
|
import org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.CompressAwarePath;
|
||||||
|
import org.apache.hadoop.mapreduce.util.MRJobConfUtil;
|
||||||
import org.apache.hadoop.security.Credentials;
|
import org.apache.hadoop.security.Credentials;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.apache.hadoop.util.Progress;
|
import org.apache.hadoop.util.Progress;
|
||||||
import org.apache.hadoop.util.Progressable;
|
import org.apache.hadoop.util.Progressable;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.Rule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import org.junit.rules.TestName;
|
||||||
import org.mockito.invocation.InvocationOnMock;
|
import org.mockito.invocation.InvocationOnMock;
|
||||||
import org.mockito.stubbing.Answer;
|
import org.mockito.stubbing.Answer;
|
||||||
|
|
||||||
public class TestMerger {
|
public class TestMerger {
|
||||||
|
|
||||||
private Configuration conf;
|
private static File testRootDir;
|
||||||
|
@Rule
|
||||||
|
public TestName unitTestName = new TestName();
|
||||||
|
private File unitTestDir;
|
||||||
private JobConf jobConf;
|
private JobConf jobConf;
|
||||||
private FileSystem fs;
|
private FileSystem fs;
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setupClass() throws Exception {
|
||||||
|
// setup the test root directory
|
||||||
|
testRootDir =
|
||||||
|
GenericTestUtils.setupTestRootDir(
|
||||||
|
TestMerger.class);
|
||||||
|
}
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setup() throws IOException {
|
public void setup() throws IOException {
|
||||||
conf = new Configuration();
|
unitTestDir = new File(testRootDir, unitTestName.getMethodName());
|
||||||
|
unitTestDir.mkdirs();
|
||||||
jobConf = new JobConf();
|
jobConf = new JobConf();
|
||||||
fs = FileSystem.getLocal(conf);
|
// Set the temp directories a subdir of the test directory.
|
||||||
|
MRJobConfUtil.setLocalDirectoriesConfigForTesting(jobConf, unitTestDir);
|
||||||
|
jobConf.set(MRConfig.FRAMEWORK_NAME, "local");
|
||||||
|
fs = FileSystem.getLocal(jobConf);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testEncryptedMerger() throws Throwable {
|
public void testEncryptedMerger() throws Throwable {
|
||||||
jobConf.setBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA, true);
|
// Enable intermediate encryption.
|
||||||
conf.setBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA, true);
|
MRJobConfUtil.initEncryptedIntermediateConfigsForTesting(jobConf);
|
||||||
Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials();
|
Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials();
|
||||||
TokenCache.setEncryptedSpillKey(new byte[16], credentials);
|
TokenCache.setEncryptedSpillKey(new byte[16], credentials);
|
||||||
UserGroupInformation.getCurrentUser().addCredentials(credentials);
|
UserGroupInformation.getCurrentUser().addCredentials(credentials);
|
||||||
|
@ -105,8 +125,8 @@ public class TestMerger {
|
||||||
LocalDirAllocator lda = new LocalDirAllocator(MRConfig.LOCAL_DIR);
|
LocalDirAllocator lda = new LocalDirAllocator(MRConfig.LOCAL_DIR);
|
||||||
|
|
||||||
MergeManagerImpl<Text, Text> mergeManager = new MergeManagerImpl<Text, Text>(
|
MergeManagerImpl<Text, Text> mergeManager = new MergeManagerImpl<Text, Text>(
|
||||||
reduceId1, jobConf, fs, lda, Reporter.NULL, null, null, null, null, null,
|
reduceId1, jobConf, fs, lda, Reporter.NULL, null, null, null, null,
|
||||||
null, null, new Progress(), new MROutputFiles());
|
null, null, null, new Progress(), new MROutputFiles());
|
||||||
|
|
||||||
// write map outputs
|
// write map outputs
|
||||||
Map<String, String> map1 = new TreeMap<String, String>();
|
Map<String, String> map1 = new TreeMap<String, String>();
|
||||||
|
@ -114,12 +134,12 @@ public class TestMerger {
|
||||||
map1.put("carrot", "delicious");
|
map1.put("carrot", "delicious");
|
||||||
Map<String, String> map2 = new TreeMap<String, String>();
|
Map<String, String> map2 = new TreeMap<String, String>();
|
||||||
map1.put("banana", "pretty good");
|
map1.put("banana", "pretty good");
|
||||||
byte[] mapOutputBytes1 = writeMapOutput(conf, map1);
|
byte[] mapOutputBytes1 = writeMapOutput(jobConf, map1);
|
||||||
byte[] mapOutputBytes2 = writeMapOutput(conf, map2);
|
byte[] mapOutputBytes2 = writeMapOutput(jobConf, map2);
|
||||||
InMemoryMapOutput<Text, Text> mapOutput1 = new InMemoryMapOutput<Text, Text>(
|
InMemoryMapOutput<Text, Text> mapOutput1 = new InMemoryMapOutput<Text, Text>(
|
||||||
conf, mapId1, mergeManager, mapOutputBytes1.length, null, true);
|
jobConf, mapId1, mergeManager, mapOutputBytes1.length, null, true);
|
||||||
InMemoryMapOutput<Text, Text> mapOutput2 = new InMemoryMapOutput<Text, Text>(
|
InMemoryMapOutput<Text, Text> mapOutput2 = new InMemoryMapOutput<Text, Text>(
|
||||||
conf, mapId2, mergeManager, mapOutputBytes2.length, null, true);
|
jobConf, mapId2, mergeManager, mapOutputBytes2.length, null, true);
|
||||||
System.arraycopy(mapOutputBytes1, 0, mapOutput1.getMemory(), 0,
|
System.arraycopy(mapOutputBytes1, 0, mapOutput1.getMemory(), 0,
|
||||||
mapOutputBytes1.length);
|
mapOutputBytes1.length);
|
||||||
System.arraycopy(mapOutputBytes2, 0, mapOutput2.getMemory(), 0,
|
System.arraycopy(mapOutputBytes2, 0, mapOutput2.getMemory(), 0,
|
||||||
|
@ -149,12 +169,12 @@ public class TestMerger {
|
||||||
map3.put("carrot", "amazing");
|
map3.put("carrot", "amazing");
|
||||||
Map<String, String> map4 = new TreeMap<String, String>();
|
Map<String, String> map4 = new TreeMap<String, String>();
|
||||||
map4.put("banana", "bla");
|
map4.put("banana", "bla");
|
||||||
byte[] mapOutputBytes3 = writeMapOutput(conf, map3);
|
byte[] mapOutputBytes3 = writeMapOutput(jobConf, map3);
|
||||||
byte[] mapOutputBytes4 = writeMapOutput(conf, map4);
|
byte[] mapOutputBytes4 = writeMapOutput(jobConf, map4);
|
||||||
InMemoryMapOutput<Text, Text> mapOutput3 = new InMemoryMapOutput<Text, Text>(
|
InMemoryMapOutput<Text, Text> mapOutput3 = new InMemoryMapOutput<Text, Text>(
|
||||||
conf, mapId3, mergeManager, mapOutputBytes3.length, null, true);
|
jobConf, mapId3, mergeManager, mapOutputBytes3.length, null, true);
|
||||||
InMemoryMapOutput<Text, Text> mapOutput4 = new InMemoryMapOutput<Text, Text>(
|
InMemoryMapOutput<Text, Text> mapOutput4 = new InMemoryMapOutput<Text, Text>(
|
||||||
conf, mapId4, mergeManager, mapOutputBytes4.length, null, true);
|
jobConf, mapId4, mergeManager, mapOutputBytes4.length, null, true);
|
||||||
System.arraycopy(mapOutputBytes3, 0, mapOutput3.getMemory(), 0,
|
System.arraycopy(mapOutputBytes3, 0, mapOutput3.getMemory(), 0,
|
||||||
mapOutputBytes3.length);
|
mapOutputBytes3.length);
|
||||||
System.arraycopy(mapOutputBytes4, 0, mapOutput4.getMemory(), 0,
|
System.arraycopy(mapOutputBytes4, 0, mapOutput4.getMemory(), 0,
|
||||||
|
@ -173,12 +193,13 @@ public class TestMerger {
|
||||||
Assert.assertEquals(2, mergeManager.onDiskMapOutputs.size());
|
Assert.assertEquals(2, mergeManager.onDiskMapOutputs.size());
|
||||||
|
|
||||||
List<CompressAwarePath> paths = new ArrayList<CompressAwarePath>();
|
List<CompressAwarePath> paths = new ArrayList<CompressAwarePath>();
|
||||||
Iterator<CompressAwarePath> iterator = mergeManager.onDiskMapOutputs.iterator();
|
Iterator<CompressAwarePath> iterator =
|
||||||
|
mergeManager.onDiskMapOutputs.iterator();
|
||||||
List<String> keys = new ArrayList<String>();
|
List<String> keys = new ArrayList<String>();
|
||||||
List<String> values = new ArrayList<String>();
|
List<String> values = new ArrayList<String>();
|
||||||
while (iterator.hasNext()) {
|
while (iterator.hasNext()) {
|
||||||
CompressAwarePath next = iterator.next();
|
CompressAwarePath next = iterator.next();
|
||||||
readOnDiskMapOutput(conf, fs, next, keys, values);
|
readOnDiskMapOutput(jobConf, fs, next, keys, values);
|
||||||
paths.add(next);
|
paths.add(next);
|
||||||
}
|
}
|
||||||
Assert.assertEquals(keys, Arrays.asList("apple", "banana", "carrot", "apple", "banana", "carrot"));
|
Assert.assertEquals(keys, Arrays.asList("apple", "banana", "carrot", "apple", "banana", "carrot"));
|
||||||
|
@ -186,8 +207,8 @@ public class TestMerger {
|
||||||
mergeManager.close();
|
mergeManager.close();
|
||||||
|
|
||||||
mergeManager = new MergeManagerImpl<Text, Text>(
|
mergeManager = new MergeManagerImpl<Text, Text>(
|
||||||
reduceId2, jobConf, fs, lda, Reporter.NULL, null, null, null, null, null,
|
reduceId2, jobConf, fs, lda, Reporter.NULL, null, null, null, null,
|
||||||
null, null, new Progress(), new MROutputFiles());
|
null, null, null, new Progress(), new MROutputFiles());
|
||||||
|
|
||||||
MergeThread<CompressAwarePath,Text,Text> onDiskMerger = mergeManager.createOnDiskMerger();
|
MergeThread<CompressAwarePath,Text,Text> onDiskMerger = mergeManager.createOnDiskMerger();
|
||||||
onDiskMerger.merge(paths);
|
onDiskMerger.merge(paths);
|
||||||
|
@ -196,7 +217,8 @@ public class TestMerger {
|
||||||
|
|
||||||
keys = new ArrayList<String>();
|
keys = new ArrayList<String>();
|
||||||
values = new ArrayList<String>();
|
values = new ArrayList<String>();
|
||||||
readOnDiskMapOutput(conf, fs, mergeManager.onDiskMapOutputs.iterator().next(), keys, values);
|
readOnDiskMapOutput(jobConf, fs,
|
||||||
|
mergeManager.onDiskMapOutputs.iterator().next(), keys, values);
|
||||||
Assert.assertEquals(keys, Arrays.asList("apple", "apple", "banana", "banana", "carrot", "carrot"));
|
Assert.assertEquals(keys, Arrays.asList("apple", "apple", "banana", "banana", "carrot", "carrot"));
|
||||||
Assert.assertEquals(values, Arrays.asList("awesome", "disgusting", "pretty good", "bla", "amazing", "delicious"));
|
Assert.assertEquals(values, Arrays.asList("awesome", "disgusting", "pretty good", "bla", "amazing", "delicious"));
|
||||||
|
|
||||||
|
@ -222,7 +244,8 @@ public class TestMerger {
|
||||||
|
|
||||||
private void readOnDiskMapOutput(Configuration conf, FileSystem fs, Path path,
|
private void readOnDiskMapOutput(Configuration conf, FileSystem fs, Path path,
|
||||||
List<String> keys, List<String> values) throws IOException {
|
List<String> keys, List<String> values) throws IOException {
|
||||||
FSDataInputStream in = CryptoUtils.wrapIfNecessary(conf, fs.open(path));
|
FSDataInputStream in =
|
||||||
|
IntermediateEncryptedStream.wrapIfNecessary(conf, fs.open(path), path);
|
||||||
|
|
||||||
IFile.Reader<Text, Text> reader = new IFile.Reader<Text, Text>(conf, in,
|
IFile.Reader<Text, Text> reader = new IFile.Reader<Text, Text>(conf, in,
|
||||||
fs.getFileStatus(path).getLen(), null, null);
|
fs.getFileStatus(path).getLen(), null, null);
|
||||||
|
@ -252,14 +275,15 @@ public class TestMerger {
|
||||||
@SuppressWarnings( { "unchecked" })
|
@SuppressWarnings( { "unchecked" })
|
||||||
public void testMergeShouldReturnProperProgress(
|
public void testMergeShouldReturnProperProgress(
|
||||||
List<Segment<Text, Text>> segments) throws IOException {
|
List<Segment<Text, Text>> segments) throws IOException {
|
||||||
Path tmpDir = new Path("localpath");
|
Path tmpDir = new Path(jobConf.get("mapreduce.cluster.temp.dir"),
|
||||||
|
"localpath");
|
||||||
Class<Text> keyClass = (Class<Text>) jobConf.getMapOutputKeyClass();
|
Class<Text> keyClass = (Class<Text>) jobConf.getMapOutputKeyClass();
|
||||||
Class<Text> valueClass = (Class<Text>) jobConf.getMapOutputValueClass();
|
Class<Text> valueClass = (Class<Text>) jobConf.getMapOutputValueClass();
|
||||||
RawComparator<Text> comparator = jobConf.getOutputKeyComparator();
|
RawComparator<Text> comparator = jobConf.getOutputKeyComparator();
|
||||||
Counter readsCounter = new Counter();
|
Counter readsCounter = new Counter();
|
||||||
Counter writesCounter = new Counter();
|
Counter writesCounter = new Counter();
|
||||||
Progress mergePhase = new Progress();
|
Progress mergePhase = new Progress();
|
||||||
RawKeyValueIterator mergeQueue = Merger.merge(conf, fs, keyClass,
|
RawKeyValueIterator mergeQueue = Merger.merge(jobConf, fs, keyClass,
|
||||||
valueClass, segments, 2, tmpDir, comparator, getReporter(),
|
valueClass, segments, 2, tmpDir, comparator, getReporter(),
|
||||||
readsCounter, writesCounter, mergePhase);
|
readsCounter, writesCounter, mergePhase);
|
||||||
final float epsilon = 0.00001f;
|
final float epsilon = 0.00001f;
|
||||||
|
|
|
@ -31,8 +31,20 @@ import org.apache.hadoop.mapreduce.Job;
|
||||||
import org.apache.hadoop.mapreduce.MRConfig;
|
import org.apache.hadoop.mapreduce.MRConfig;
|
||||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||||
import org.apache.hadoop.mapreduce.SleepJob;
|
import org.apache.hadoop.mapreduce.SleepJob;
|
||||||
|
import org.apache.hadoop.mapreduce.security.IntermediateEncryptedStream;
|
||||||
|
import org.apache.hadoop.mapreduce.security.SpillCallBackPathsFinder;
|
||||||
|
import org.apache.hadoop.mapreduce.util.MRJobConfUtil;
|
||||||
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.apache.hadoop.util.ToolRunner;
|
import org.apache.hadoop.util.ToolRunner;
|
||||||
|
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.Rule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import org.junit.rules.TestName;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import static org.junit.Assert.*;
|
import static org.junit.Assert.*;
|
||||||
|
|
||||||
|
@ -41,8 +53,39 @@ import static org.junit.Assert.*;
|
||||||
* -jt local -libjars
|
* -jt local -libjars
|
||||||
*/
|
*/
|
||||||
public class TestLocalJobSubmission {
|
public class TestLocalJobSubmission {
|
||||||
private static Path TEST_ROOT_DIR =
|
private static final Logger LOG =
|
||||||
new Path(System.getProperty("test.build.data","/tmp"));
|
LoggerFactory.getLogger(TestLocalJobSubmission.class);
|
||||||
|
|
||||||
|
private static File testRootDir;
|
||||||
|
|
||||||
|
@Rule
|
||||||
|
public TestName unitTestName = new TestName();
|
||||||
|
private File unitTestDir;
|
||||||
|
private Path jarPath;
|
||||||
|
private Configuration config;
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setupClass() throws Exception {
|
||||||
|
// setup the test root directory
|
||||||
|
testRootDir =
|
||||||
|
GenericTestUtils.setupTestRootDir(TestLocalJobSubmission.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setup() throws IOException {
|
||||||
|
unitTestDir = new File(testRootDir, unitTestName.getMethodName());
|
||||||
|
unitTestDir.mkdirs();
|
||||||
|
config = createConfig();
|
||||||
|
jarPath = makeJar(new Path(unitTestDir.getAbsolutePath(), "test.jar"));
|
||||||
|
}
|
||||||
|
|
||||||
|
private Configuration createConfig() {
|
||||||
|
// Set the temp directories a subdir of the test directory.
|
||||||
|
Configuration conf =
|
||||||
|
MRJobConfUtil.setLocalDirectoriesConfigForTesting(null, unitTestDir);
|
||||||
|
conf.set(MRConfig.FRAMEWORK_NAME, "local");
|
||||||
|
return conf;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test the local job submission options of -jt local -libjars.
|
* Test the local job submission options of -jt local -libjars.
|
||||||
|
@ -51,12 +94,9 @@ public class TestLocalJobSubmission {
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testLocalJobLibjarsOption() throws IOException {
|
public void testLocalJobLibjarsOption() throws IOException {
|
||||||
Configuration conf = new Configuration();
|
testLocalJobLibjarsOption(config);
|
||||||
|
config.setBoolean(Job.USE_WILDCARD_FOR_LIBJARS, false);
|
||||||
testLocalJobLibjarsOption(conf);
|
testLocalJobLibjarsOption(config);
|
||||||
|
|
||||||
conf.setBoolean(Job.USE_WILDCARD_FOR_LIBJARS, false);
|
|
||||||
testLocalJobLibjarsOption(conf);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -67,8 +107,6 @@ public class TestLocalJobSubmission {
|
||||||
*/
|
*/
|
||||||
private void testLocalJobLibjarsOption(Configuration conf)
|
private void testLocalJobLibjarsOption(Configuration conf)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
Path jarPath = makeJar(new Path(TEST_ROOT_DIR, "test.jar"));
|
|
||||||
|
|
||||||
conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "hdfs://localhost:9000");
|
conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "hdfs://localhost:9000");
|
||||||
conf.set(MRConfig.FRAMEWORK_NAME, "local");
|
conf.set(MRConfig.FRAMEWORK_NAME, "local");
|
||||||
final String[] args = {
|
final String[] args = {
|
||||||
|
@ -79,8 +117,7 @@ public class TestLocalJobSubmission {
|
||||||
try {
|
try {
|
||||||
res = ToolRunner.run(conf, new SleepJob(), args);
|
res = ToolRunner.run(conf, new SleepJob(), args);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
System.out.println("Job failed with " + e.getLocalizedMessage());
|
LOG.error("Job failed with {}", e.getLocalizedMessage(), e);
|
||||||
e.printStackTrace(System.out);
|
|
||||||
fail("Job failed");
|
fail("Job failed");
|
||||||
}
|
}
|
||||||
assertEquals("dist job res is not 0:", 0, res);
|
assertEquals("dist job res is not 0:", 0, res);
|
||||||
|
@ -93,18 +130,20 @@ public class TestLocalJobSubmission {
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testLocalJobEncryptedIntermediateData() throws IOException {
|
public void testLocalJobEncryptedIntermediateData() throws IOException {
|
||||||
Configuration conf = new Configuration();
|
config = MRJobConfUtil.initEncryptedIntermediateConfigsForTesting(config);
|
||||||
conf.set(MRConfig.FRAMEWORK_NAME, "local");
|
|
||||||
conf.setBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA, true);
|
|
||||||
final String[] args = {
|
final String[] args = {
|
||||||
"-m", "1", "-r", "1", "-mt", "1", "-rt", "1"
|
"-m", "1", "-r", "1", "-mt", "1", "-rt", "1"
|
||||||
};
|
};
|
||||||
int res = -1;
|
int res = -1;
|
||||||
try {
|
try {
|
||||||
res = ToolRunner.run(conf, new SleepJob(), args);
|
SpillCallBackPathsFinder spillInjector =
|
||||||
|
(SpillCallBackPathsFinder) IntermediateEncryptedStream
|
||||||
|
.setSpillCBInjector(new SpillCallBackPathsFinder());
|
||||||
|
res = ToolRunner.run(config, new SleepJob(), args);
|
||||||
|
Assert.assertTrue("No spill occurred",
|
||||||
|
spillInjector.getEncryptedSpilledFiles().size() > 0);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
System.out.println("Job failed with " + e.getLocalizedMessage());
|
LOG.error("Job failed with {}", e.getLocalizedMessage(), e);
|
||||||
e.printStackTrace(System.out);
|
|
||||||
fail("Job failed");
|
fail("Job failed");
|
||||||
}
|
}
|
||||||
assertEquals("dist job res is not 0:", 0, res);
|
assertEquals("dist job res is not 0:", 0, res);
|
||||||
|
@ -116,15 +155,13 @@ public class TestLocalJobSubmission {
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testJobMaxMapConfig() throws Exception {
|
public void testJobMaxMapConfig() throws Exception {
|
||||||
Configuration conf = new Configuration();
|
config.setInt(MRJobConfig.JOB_MAX_MAP, 0);
|
||||||
conf.set(MRConfig.FRAMEWORK_NAME, "local");
|
|
||||||
conf.setInt(MRJobConfig.JOB_MAX_MAP, 0);
|
|
||||||
final String[] args = {
|
final String[] args = {
|
||||||
"-m", "1", "-r", "1", "-mt", "1", "-rt", "1"
|
"-m", "1", "-r", "1", "-mt", "1", "-rt", "1"
|
||||||
};
|
};
|
||||||
int res = -1;
|
int res = -1;
|
||||||
try {
|
try {
|
||||||
res = ToolRunner.run(conf, new SleepJob(), args);
|
res = ToolRunner.run(config, new SleepJob(), args);
|
||||||
fail("Job should fail");
|
fail("Job should fail");
|
||||||
} catch (IllegalArgumentException e) {
|
} catch (IllegalArgumentException e) {
|
||||||
assertTrue(e.getLocalizedMessage().contains(
|
assertTrue(e.getLocalizedMessage().contains(
|
||||||
|
@ -139,20 +176,16 @@ public class TestLocalJobSubmission {
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testLocalJobFilesOption() throws IOException {
|
public void testLocalJobFilesOption() throws IOException {
|
||||||
Path jarPath = makeJar(new Path(TEST_ROOT_DIR, "test.jar"));
|
config.set(FileSystem.FS_DEFAULT_NAME_KEY, "hdfs://localhost:9000");
|
||||||
|
final String[] args = {
|
||||||
Configuration conf = new Configuration();
|
"-jt", "local", "-files", jarPath.toString(),
|
||||||
conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "hdfs://localhost:9000");
|
"-m", "1", "-r", "1", "-mt", "1", "-rt", "1"
|
||||||
conf.set(MRConfig.FRAMEWORK_NAME, "local");
|
};
|
||||||
final String[] args =
|
|
||||||
{"-jt", "local", "-files", jarPath.toString(), "-m", "1", "-r", "1",
|
|
||||||
"-mt", "1", "-rt", "1"};
|
|
||||||
int res = -1;
|
int res = -1;
|
||||||
try {
|
try {
|
||||||
res = ToolRunner.run(conf, new SleepJob(), args);
|
res = ToolRunner.run(config, new SleepJob(), args);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
System.out.println("Job failed with " + e.getLocalizedMessage());
|
LOG.error("Job failed with {}", e.getLocalizedMessage(), e);
|
||||||
e.printStackTrace(System.out);
|
|
||||||
fail("Job failed");
|
fail("Job failed");
|
||||||
}
|
}
|
||||||
assertEquals("dist job res is not 0:", 0, res);
|
assertEquals("dist job res is not 0:", 0, res);
|
||||||
|
@ -165,27 +198,22 @@ public class TestLocalJobSubmission {
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testLocalJobArchivesOption() throws IOException {
|
public void testLocalJobArchivesOption() throws IOException {
|
||||||
Path jarPath = makeJar(new Path(TEST_ROOT_DIR, "test.jar"));
|
config.set(FileSystem.FS_DEFAULT_NAME_KEY, "hdfs://localhost:9000");
|
||||||
|
|
||||||
Configuration conf = new Configuration();
|
|
||||||
conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "hdfs://localhost:9000");
|
|
||||||
conf.set(MRConfig.FRAMEWORK_NAME, "local");
|
|
||||||
final String[] args =
|
final String[] args =
|
||||||
{"-jt", "local", "-archives", jarPath.toString(), "-m", "1", "-r",
|
{"-jt", "local", "-archives", jarPath.toString(), "-m", "1", "-r",
|
||||||
"1", "-mt", "1", "-rt", "1"};
|
"1", "-mt", "1", "-rt", "1"};
|
||||||
int res = -1;
|
int res = -1;
|
||||||
try {
|
try {
|
||||||
res = ToolRunner.run(conf, new SleepJob(), args);
|
res = ToolRunner.run(config, new SleepJob(), args);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
System.out.println("Job failed with " + e.getLocalizedMessage());
|
LOG.error("Job failed with {}" + e.getLocalizedMessage(), e);
|
||||||
e.printStackTrace(System.out);
|
|
||||||
fail("Job failed");
|
fail("Job failed");
|
||||||
}
|
}
|
||||||
assertEquals("dist job res is not 0:", 0, res);
|
assertEquals("dist job res is not 0:", 0, res);
|
||||||
}
|
}
|
||||||
|
|
||||||
private Path makeJar(Path p) throws IOException {
|
private Path makeJar(Path p) throws IOException {
|
||||||
FileOutputStream fos = new FileOutputStream(new File(p.toString()));
|
FileOutputStream fos = new FileOutputStream(p.toString());
|
||||||
JarOutputStream jos = new JarOutputStream(fos);
|
JarOutputStream jos = new JarOutputStream(fos);
|
||||||
ZipEntry ze = new ZipEntry("test.jar.inside");
|
ZipEntry ze = new ZipEntry("test.jar.inside");
|
||||||
jos.putNextEntry(ze);
|
jos.putNextEntry(ze);
|
||||||
|
|
|
@ -1,327 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.mapred;
|
|
||||||
|
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.Collection;
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hadoop.fs.FSDataInputStream;
|
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
|
||||||
import org.apache.hadoop.fs.FileUtil;
|
|
||||||
import org.apache.hadoop.fs.Path;
|
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
|
||||||
import org.apache.hadoop.io.LongWritable;
|
|
||||||
import org.apache.hadoop.io.Text;
|
|
||||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
|
||||||
import org.junit.After;
|
|
||||||
import org.junit.AfterClass;
|
|
||||||
import org.junit.Before;
|
|
||||||
import org.junit.BeforeClass;
|
|
||||||
import org.junit.Test;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.io.OutputStream;
|
|
||||||
import java.io.OutputStreamWriter;
|
|
||||||
import java.io.Writer;
|
|
||||||
import org.junit.runner.RunWith;
|
|
||||||
import org.junit.runners.Parameterized;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
import static org.junit.Assert.*;
|
|
||||||
|
|
||||||
@SuppressWarnings(value={"unchecked", "deprecation"})
|
|
||||||
/**
|
|
||||||
* This test tests the support for a merge operation in Hadoop. The input files
|
|
||||||
* are already sorted on the key. This test implements an external
|
|
||||||
* MapOutputCollector implementation that just copies the records to different
|
|
||||||
* partitions while maintaining the sort order in each partition. The Hadoop
|
|
||||||
* framework's merge on the reduce side will merge the partitions created to
|
|
||||||
* generate the final output which is sorted on the key.
|
|
||||||
*/
|
|
||||||
@RunWith(Parameterized.class)
|
|
||||||
public class TestMRIntermediateDataEncryption {
|
|
||||||
private static final Logger LOG =
|
|
||||||
LoggerFactory.getLogger(TestMRIntermediateDataEncryption.class);
|
|
||||||
/**
|
|
||||||
* Use urandom to avoid the YarnChild process from hanging on low entropy
|
|
||||||
* systems.
|
|
||||||
*/
|
|
||||||
private static final String JVM_SECURITY_EGD_OPT =
|
|
||||||
"-Djava.security.egd=file:/dev/./urandom";
|
|
||||||
// Where MR job's input will reside.
|
|
||||||
private static final Path INPUT_DIR = new Path("/test/input");
|
|
||||||
// Where output goes.
|
|
||||||
private static final Path OUTPUT = new Path("/test/output");
|
|
||||||
private static final int NUM_LINES = 1000;
|
|
||||||
private static MiniMRClientCluster mrCluster = null;
|
|
||||||
private static MiniDFSCluster dfsCluster = null;
|
|
||||||
private static FileSystem fs = null;
|
|
||||||
private static final int NUM_NODES = 2;
|
|
||||||
|
|
||||||
private final String testTitle;
|
|
||||||
private final int numMappers;
|
|
||||||
private final int numReducers;
|
|
||||||
private final boolean isUber;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* List of arguments to run the JunitTest.
|
|
||||||
* @return
|
|
||||||
*/
|
|
||||||
@Parameterized.Parameters(
|
|
||||||
name = "{index}: TestMRIntermediateDataEncryption.{0} .. "
|
|
||||||
+ "mappers:{1}, reducers:{2}, isUber:{3})")
|
|
||||||
public static Collection<Object[]> getTestParameters() {
|
|
||||||
return Arrays.asList(new Object[][]{
|
|
||||||
{"testSingleReducer", 3, 1, false},
|
|
||||||
{"testUberMode", 3, 1, true},
|
|
||||||
{"testMultipleMapsPerNode", 8, 1, false},
|
|
||||||
{"testMultipleReducers", 2, 4, false}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Initialized the parametrized JUnit test.
|
|
||||||
* @param testName the name of the unit test to be executed.
|
|
||||||
* @param mappers number of mappers in the tests.
|
|
||||||
* @param reducers number of the reducers.
|
|
||||||
* @param uberEnabled boolean flag for isUber
|
|
||||||
*/
|
|
||||||
public TestMRIntermediateDataEncryption(String testName, int mappers,
|
|
||||||
int reducers, boolean uberEnabled) {
|
|
||||||
this.testTitle = testName;
|
|
||||||
this.numMappers = mappers;
|
|
||||||
this.numReducers = reducers;
|
|
||||||
this.isUber = uberEnabled;
|
|
||||||
}
|
|
||||||
|
|
||||||
@BeforeClass
|
|
||||||
public static void setupClass() throws Exception {
|
|
||||||
Configuration conf = new Configuration();
|
|
||||||
conf.setBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA, true);
|
|
||||||
|
|
||||||
// Set the jvm arguments.
|
|
||||||
conf.set(MRJobConfig.MR_AM_ADMIN_COMMAND_OPTS,
|
|
||||||
JVM_SECURITY_EGD_OPT);
|
|
||||||
final String childJVMOpts = JVM_SECURITY_EGD_OPT
|
|
||||||
+ " " + conf.get("mapred.child.java.opts", " ");
|
|
||||||
conf.set("mapred.child.java.opts", childJVMOpts);
|
|
||||||
|
|
||||||
|
|
||||||
// Start the mini-MR and mini-DFS clusters.
|
|
||||||
dfsCluster = new MiniDFSCluster.Builder(conf)
|
|
||||||
.numDataNodes(NUM_NODES).build();
|
|
||||||
mrCluster =
|
|
||||||
MiniMRClientClusterFactory.create(
|
|
||||||
TestMRIntermediateDataEncryption.class, NUM_NODES, conf);
|
|
||||||
mrCluster.start();
|
|
||||||
}
|
|
||||||
|
|
||||||
@AfterClass
|
|
||||||
public static void tearDown() throws IOException {
|
|
||||||
if (fs != null) {
|
|
||||||
fs.close();
|
|
||||||
}
|
|
||||||
if (mrCluster != null) {
|
|
||||||
mrCluster.stop();
|
|
||||||
}
|
|
||||||
if (dfsCluster != null) {
|
|
||||||
dfsCluster.shutdown();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Before
|
|
||||||
public void setup() throws Exception {
|
|
||||||
LOG.info("Starting TestMRIntermediateDataEncryption#{}.......", testTitle);
|
|
||||||
fs = dfsCluster.getFileSystem();
|
|
||||||
if (fs.exists(INPUT_DIR) && !fs.delete(INPUT_DIR, true)) {
|
|
||||||
throw new IOException("Could not delete " + INPUT_DIR);
|
|
||||||
}
|
|
||||||
if (fs.exists(OUTPUT) && !fs.delete(OUTPUT, true)) {
|
|
||||||
throw new IOException("Could not delete " + OUTPUT);
|
|
||||||
}
|
|
||||||
// Generate input.
|
|
||||||
createInput(fs, numMappers, NUM_LINES);
|
|
||||||
}
|
|
||||||
|
|
||||||
@After
|
|
||||||
public void cleanup() throws IOException {
|
|
||||||
if (fs != null) {
|
|
||||||
if (fs.exists(OUTPUT)) {
|
|
||||||
fs.delete(OUTPUT, true);
|
|
||||||
}
|
|
||||||
if (fs.exists(INPUT_DIR)) {
|
|
||||||
fs.delete(INPUT_DIR, true);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test(timeout=600000)
|
|
||||||
public void testMerge() throws Exception {
|
|
||||||
JobConf job = new JobConf(mrCluster.getConfig());
|
|
||||||
job.setJobName("Test");
|
|
||||||
JobClient client = new JobClient(job);
|
|
||||||
RunningJob submittedJob = null;
|
|
||||||
FileInputFormat.setInputPaths(job, INPUT_DIR);
|
|
||||||
FileOutputFormat.setOutputPath(job, OUTPUT);
|
|
||||||
job.set("mapreduce.output.textoutputformat.separator", " ");
|
|
||||||
job.setInputFormat(TextInputFormat.class);
|
|
||||||
job.setMapOutputKeyClass(Text.class);
|
|
||||||
job.setMapOutputValueClass(Text.class);
|
|
||||||
job.setOutputKeyClass(Text.class);
|
|
||||||
job.setOutputValueClass(Text.class);
|
|
||||||
job.setMapperClass(TestMRIntermediateDataEncryption.MyMapper.class);
|
|
||||||
job.setPartitionerClass(
|
|
||||||
TestMRIntermediateDataEncryption.MyPartitioner.class);
|
|
||||||
job.setOutputFormat(TextOutputFormat.class);
|
|
||||||
job.setNumReduceTasks(numReducers);
|
|
||||||
job.setInt("mapreduce.map.maxattempts", 1);
|
|
||||||
job.setInt("mapreduce.reduce.maxattempts", 1);
|
|
||||||
job.setInt("mapred.test.num_lines", NUM_LINES);
|
|
||||||
job.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, isUber);
|
|
||||||
job.setBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA, true);
|
|
||||||
submittedJob = client.submitJob(job);
|
|
||||||
submittedJob.waitForCompletion();
|
|
||||||
assertTrue("The submitted job is completed", submittedJob.isComplete());
|
|
||||||
assertTrue("The submitted job is successful", submittedJob.isSuccessful());
|
|
||||||
verifyOutput(fs, numMappers, NUM_LINES);
|
|
||||||
client.close();
|
|
||||||
// wait for short period to cool down.
|
|
||||||
Thread.sleep(1000);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void createInput(FileSystem filesystem, int mappers, int numLines)
|
|
||||||
throws Exception {
|
|
||||||
for (int i = 0; i < mappers; i++) {
|
|
||||||
OutputStream os =
|
|
||||||
filesystem.create(new Path(INPUT_DIR, "input_" + i + ".txt"));
|
|
||||||
Writer writer = new OutputStreamWriter(os);
|
|
||||||
for (int j = 0; j < numLines; j++) {
|
|
||||||
// Create sorted key, value pairs.
|
|
||||||
int k = j + 1;
|
|
||||||
String formattedNumber = String.format("%09d", k);
|
|
||||||
writer.write(formattedNumber + " " + formattedNumber + "\n");
|
|
||||||
}
|
|
||||||
writer.close();
|
|
||||||
os.close();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void verifyOutput(FileSystem fileSystem,
|
|
||||||
int mappers, int numLines)
|
|
||||||
throws Exception {
|
|
||||||
FSDataInputStream dis = null;
|
|
||||||
long numValidRecords = 0;
|
|
||||||
long numInvalidRecords = 0;
|
|
||||||
String prevKeyValue = "000000000";
|
|
||||||
Path[] fileList =
|
|
||||||
FileUtil.stat2Paths(fileSystem.listStatus(OUTPUT,
|
|
||||||
new Utils.OutputFileUtils.OutputFilesFilter()));
|
|
||||||
for (Path outFile : fileList) {
|
|
||||||
try {
|
|
||||||
dis = fileSystem.open(outFile);
|
|
||||||
String record;
|
|
||||||
while((record = dis.readLine()) != null) {
|
|
||||||
// Split the line into key and value.
|
|
||||||
int blankPos = record.indexOf(" ");
|
|
||||||
String keyString = record.substring(0, blankPos);
|
|
||||||
String valueString = record.substring(blankPos+1);
|
|
||||||
// Check for sorted output and correctness of record.
|
|
||||||
if (keyString.compareTo(prevKeyValue) >= 0
|
|
||||||
&& keyString.equals(valueString)) {
|
|
||||||
prevKeyValue = keyString;
|
|
||||||
numValidRecords++;
|
|
||||||
} else {
|
|
||||||
numInvalidRecords++;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
if (dis != null) {
|
|
||||||
dis.close();
|
|
||||||
dis = null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// Make sure we got all input records in the output in sorted order.
|
|
||||||
assertEquals((long)(mappers * numLines), numValidRecords);
|
|
||||||
// Make sure there is no extraneous invalid record.
|
|
||||||
assertEquals(0, numInvalidRecords);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* A mapper implementation that assumes that key text contains valid integers
|
|
||||||
* in displayable form.
|
|
||||||
*/
|
|
||||||
public static class MyMapper extends MapReduceBase
|
|
||||||
implements Mapper<LongWritable, Text, Text, Text> {
|
|
||||||
private Text keyText;
|
|
||||||
private Text valueText;
|
|
||||||
|
|
||||||
public MyMapper() {
|
|
||||||
keyText = new Text();
|
|
||||||
valueText = new Text();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void map(LongWritable key, Text value,
|
|
||||||
OutputCollector<Text, Text> output,
|
|
||||||
Reporter reporter) throws IOException {
|
|
||||||
String record = value.toString();
|
|
||||||
int blankPos = record.indexOf(" ");
|
|
||||||
keyText.set(record.substring(0, blankPos));
|
|
||||||
valueText.set(record.substring(blankPos + 1));
|
|
||||||
output.collect(keyText, valueText);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void close() throws IOException {
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Partitioner implementation to make sure that output is in total sorted
|
|
||||||
* order. We basically route key ranges to different reducers such that
|
|
||||||
* key values monotonically increase with the partition number. For example,
|
|
||||||
* in this test, the keys are numbers from 1 to 1000 in the form "000000001"
|
|
||||||
* to "000001000" in each input file. The keys "000000001" to "000000250" are
|
|
||||||
* routed to partition 0, "000000251" to "000000500" are routed to partition 1
|
|
||||||
* and so on since we have 4 reducers.
|
|
||||||
*/
|
|
||||||
static class MyPartitioner implements Partitioner<Text, Text> {
|
|
||||||
|
|
||||||
private JobConf job;
|
|
||||||
|
|
||||||
public MyPartitioner() {
|
|
||||||
}
|
|
||||||
|
|
||||||
public void configure(JobConf job) {
|
|
||||||
this.job = job;
|
|
||||||
}
|
|
||||||
|
|
||||||
public int getPartition(Text key, Text value, int numPartitions) {
|
|
||||||
int keyValue = 0;
|
|
||||||
try {
|
|
||||||
keyValue = Integer.parseInt(key.toString());
|
|
||||||
} catch (NumberFormatException nfe) {
|
|
||||||
keyValue = 0;
|
|
||||||
}
|
|
||||||
int partitionNumber = (numPartitions * (Math.max(0, keyValue - 1))) / job
|
|
||||||
.getInt("mapred.test.num_lines", 10000);
|
|
||||||
return partitionNumber;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -26,6 +26,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.io.LongWritable;
|
import org.apache.hadoop.io.LongWritable;
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||||
|
import org.apache.hadoop.mapreduce.util.MRJobConfUtil;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
@ -79,7 +80,8 @@ public class TestMROpportunisticMaps {
|
||||||
MiniMRClientCluster mrCluster = null;
|
MiniMRClientCluster mrCluster = null;
|
||||||
FileSystem fileSystem = null;
|
FileSystem fileSystem = null;
|
||||||
try {
|
try {
|
||||||
Configuration conf = new Configuration();
|
Configuration conf =
|
||||||
|
MRJobConfUtil.initEncryptedIntermediateConfigsForTesting(null);
|
||||||
// Start the mini-MR and mini-DFS clusters
|
// Start the mini-MR and mini-DFS clusters
|
||||||
conf.setBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, true);
|
conf.setBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, true);
|
||||||
conf.setBoolean(YarnConfiguration.
|
conf.setBoolean(YarnConfiguration.
|
||||||
|
@ -149,7 +151,6 @@ public class TestMROpportunisticMaps {
|
||||||
job.setInt("mapreduce.map.maxattempts", 1);
|
job.setInt("mapreduce.map.maxattempts", 1);
|
||||||
job.setInt("mapreduce.reduce.maxattempts", 1);
|
job.setInt("mapreduce.reduce.maxattempts", 1);
|
||||||
job.setInt("mapred.test.num_lines", numLines);
|
job.setInt("mapred.test.num_lines", numLines);
|
||||||
job.setBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA, true);
|
|
||||||
try {
|
try {
|
||||||
submittedJob = client.submitJob(job);
|
submittedJob = client.submitJob(job);
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -87,12 +87,12 @@ public class TestMerge {
|
||||||
// Run the test.
|
// Run the test.
|
||||||
runMergeTest(new JobConf(mrCluster.getConfig()), fileSystem);
|
runMergeTest(new JobConf(mrCluster.getConfig()), fileSystem);
|
||||||
} finally {
|
} finally {
|
||||||
if (dfsCluster != null) {
|
|
||||||
dfsCluster.shutdown();
|
|
||||||
}
|
|
||||||
if (mrCluster != null) {
|
if (mrCluster != null) {
|
||||||
mrCluster.stop();
|
mrCluster.stop();
|
||||||
}
|
}
|
||||||
|
if (dfsCluster != null) {
|
||||||
|
dfsCluster.shutdown();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -22,7 +22,7 @@ import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Date;
|
import java.util.Date;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Random;
|
import java.util.concurrent.ThreadLocalRandom;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.conf.Configured;
|
import org.apache.hadoop.conf.Configured;
|
||||||
|
@ -30,7 +30,6 @@ import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
import org.apache.hadoop.mapred.ClusterStatus;
|
import org.apache.hadoop.mapred.ClusterStatus;
|
||||||
import org.apache.hadoop.mapred.JobClient;
|
import org.apache.hadoop.mapred.JobClient;
|
||||||
import org.apache.hadoop.mapreduce.*;
|
|
||||||
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
|
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
|
||||||
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
|
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
|
||||||
import org.apache.hadoop.util.Tool;
|
import org.apache.hadoop.util.Tool;
|
||||||
|
@ -99,6 +98,15 @@ public class RandomTextWriter extends Configured implements Tool {
|
||||||
*/
|
*/
|
||||||
enum Counters { RECORDS_WRITTEN, BYTES_WRITTEN }
|
enum Counters { RECORDS_WRITTEN, BYTES_WRITTEN }
|
||||||
|
|
||||||
|
public static String generateSentenceWithRand(ThreadLocalRandom rand,
|
||||||
|
int noWords) {
|
||||||
|
StringBuffer sentence = new StringBuffer(words[rand.nextInt(words.length)]);
|
||||||
|
for (int i = 1; i < noWords; i++) {
|
||||||
|
sentence.append(" ").append(words[rand.nextInt(words.length)]);
|
||||||
|
}
|
||||||
|
return sentence.toString();
|
||||||
|
}
|
||||||
|
|
||||||
static class RandomTextMapper extends Mapper<Text, Text, Text, Text> {
|
static class RandomTextMapper extends Mapper<Text, Text, Text, Text> {
|
||||||
|
|
||||||
private long numBytesToWrite;
|
private long numBytesToWrite;
|
||||||
|
@ -106,7 +114,6 @@ public class RandomTextWriter extends Configured implements Tool {
|
||||||
private int wordsInKeyRange;
|
private int wordsInKeyRange;
|
||||||
private int minWordsInValue;
|
private int minWordsInValue;
|
||||||
private int wordsInValueRange;
|
private int wordsInValueRange;
|
||||||
private Random random = new Random();
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Save the configuration value that we need to write the data.
|
* Save the configuration value that we need to write the data.
|
||||||
|
@ -127,12 +134,13 @@ public class RandomTextWriter extends Configured implements Tool {
|
||||||
public void map(Text key, Text value,
|
public void map(Text key, Text value,
|
||||||
Context context) throws IOException,InterruptedException {
|
Context context) throws IOException,InterruptedException {
|
||||||
int itemCount = 0;
|
int itemCount = 0;
|
||||||
|
ThreadLocalRandom rand = ThreadLocalRandom.current();
|
||||||
while (numBytesToWrite > 0) {
|
while (numBytesToWrite > 0) {
|
||||||
// Generate the key/value
|
// Generate the key/value
|
||||||
int noWordsKey = minWordsInKey +
|
int noWordsKey = minWordsInKey +
|
||||||
(wordsInKeyRange != 0 ? random.nextInt(wordsInKeyRange) : 0);
|
(wordsInKeyRange != 0 ? rand.nextInt(wordsInKeyRange) : 0);
|
||||||
int noWordsValue = minWordsInValue +
|
int noWordsValue = minWordsInValue +
|
||||||
(wordsInValueRange != 0 ? random.nextInt(wordsInValueRange) : 0);
|
(wordsInValueRange != 0 ? rand.nextInt(wordsInValueRange) : 0);
|
||||||
Text keyWords = generateSentence(noWordsKey);
|
Text keyWords = generateSentence(noWordsKey);
|
||||||
Text valueWords = generateSentence(noWordsValue);
|
Text valueWords = generateSentence(noWordsValue);
|
||||||
|
|
||||||
|
@ -154,13 +162,9 @@ public class RandomTextWriter extends Configured implements Tool {
|
||||||
}
|
}
|
||||||
|
|
||||||
private Text generateSentence(int noWords) {
|
private Text generateSentence(int noWords) {
|
||||||
StringBuffer sentence = new StringBuffer();
|
String sentence =
|
||||||
String space = " ";
|
generateSentenceWithRand(ThreadLocalRandom.current(), noWords);
|
||||||
for (int i=0; i < noWords; ++i) {
|
return new Text(sentence);
|
||||||
sentence.append(words[random.nextInt(words.length)]);
|
|
||||||
sentence.append(space);
|
|
||||||
}
|
|
||||||
return new Text(sentence.toString());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,411 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.mapreduce;
|
||||||
|
|
||||||
|
import java.io.BufferedWriter;
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.FileWriter;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.StringTokenizer;
|
||||||
|
import java.util.concurrent.Callable;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.Future;
|
||||||
|
import java.util.concurrent.ThreadLocalRandom;
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.junit.runners.Parameterized;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
|
import org.apache.hadoop.io.LongWritable;
|
||||||
|
import org.apache.hadoop.io.Text;
|
||||||
|
import org.apache.hadoop.mapred.MiniMRClientCluster;
|
||||||
|
import org.apache.hadoop.mapred.MiniMRClientClusterFactory;
|
||||||
|
import org.apache.hadoop.mapred.Utils;
|
||||||
|
|
||||||
|
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
|
||||||
|
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
|
||||||
|
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
|
||||||
|
import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer;
|
||||||
|
import org.apache.hadoop.mapreduce.security.IntermediateEncryptedStream;
|
||||||
|
import org.apache.hadoop.mapreduce.security.SpillCallBackPathsFinder;
|
||||||
|
import org.apache.hadoop.mapreduce.util.MRJobConfUtil;
|
||||||
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
|
import org.apache.hadoop.util.Time;
|
||||||
|
import org.apache.hadoop.util.ToolRunner;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This class tests the support of Intermediate data encryption
|
||||||
|
* (Spill data encryption).
|
||||||
|
* It starts by generating random input text file ({@link RandomTextWriter})
|
||||||
|
* using the {@link ToolRunner}.
|
||||||
|
* A wordCount job consumes the generated input. The final job is configured in
|
||||||
|
* a way to guarantee that data is spilled.
|
||||||
|
* mbs-per-map specifies the amount of data (in MBs) to generate per map.
|
||||||
|
* By default, this is twice the value of <code>mapreduce.task.io.sort.mb</code>
|
||||||
|
* <code>map-tasks</code> specifies the number of map tasks to run.
|
||||||
|
*/
|
||||||
|
@RunWith(Parameterized.class)
|
||||||
|
public class TestMRIntermediateDataEncryption {
|
||||||
|
/**
|
||||||
|
* The number of bytes generated by the input generator.
|
||||||
|
*/
|
||||||
|
public static final long TOTAL_MBS_DEFAULT = 128L;
|
||||||
|
public static final long BLOCK_SIZE_DEFAULT = 32 * 1024 * 1024L;
|
||||||
|
public static final int INPUT_GEN_NUM_THREADS = 16;
|
||||||
|
public static final long TASK_SORT_IO_MB_DEFAULT = 128L;
|
||||||
|
public static final String JOB_DIR_PATH = "jobs-data-path";
|
||||||
|
private static final Logger LOG =
|
||||||
|
LoggerFactory.getLogger(TestMRIntermediateDataEncryption.class);
|
||||||
|
/**
|
||||||
|
* Directory of the test data.
|
||||||
|
*/
|
||||||
|
private static File testRootDir;
|
||||||
|
private static volatile BufferedWriter inputBufferedWriter;
|
||||||
|
private static Configuration commonConfig;
|
||||||
|
private static MiniDFSCluster dfsCluster;
|
||||||
|
private static MiniMRClientCluster mrCluster;
|
||||||
|
private static FileSystem fs;
|
||||||
|
private static Path jobInputDirPath;
|
||||||
|
private static long inputFileSize;
|
||||||
|
/**
|
||||||
|
* Test parameters.
|
||||||
|
*/
|
||||||
|
private final String testTitleName;
|
||||||
|
private final int numMappers;
|
||||||
|
private final int numReducers;
|
||||||
|
private final boolean isUber;
|
||||||
|
private Configuration config;
|
||||||
|
private Path jobOutputPath;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Initialized the parametrized JUnit test.
|
||||||
|
* @param testName the name of the unit test to be executed.
|
||||||
|
* @param mappers number of mappers in the tests.
|
||||||
|
* @param reducers number of the reducers.
|
||||||
|
* @param uberEnabled boolean flag for isUber
|
||||||
|
*/
|
||||||
|
public TestMRIntermediateDataEncryption(String testName, int mappers,
|
||||||
|
int reducers, boolean uberEnabled) {
|
||||||
|
this.testTitleName = testName;
|
||||||
|
this.numMappers = mappers;
|
||||||
|
this.numReducers = reducers;
|
||||||
|
this.isUber = uberEnabled;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* List of arguments to run the JunitTest.
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
@Parameterized.Parameters(
|
||||||
|
name = "{index}: TestMRIntermediateDataEncryption.{0} .. "
|
||||||
|
+ "mappers:{1}, reducers:{2}, isUber:{3})")
|
||||||
|
public static Collection<Object[]> getTestParameters() {
|
||||||
|
return Arrays.asList(new Object[][]{
|
||||||
|
{"testSingleReducer", 3, 1, false},
|
||||||
|
{"testUberMode", 3, 1, true},
|
||||||
|
{"testMultipleMapsPerNode", 8, 1, false},
|
||||||
|
// TODO: The following configuration is commented out until
|
||||||
|
// MAPREDUCE-7325 is fixed.
|
||||||
|
// Setting multiple reducers breaks LocalJobRunner causing the
|
||||||
|
// unit test to fail.
|
||||||
|
// {"testMultipleReducers", 2, 4, false}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setupClass() throws Exception {
|
||||||
|
// setup the test root directory
|
||||||
|
testRootDir =
|
||||||
|
GenericTestUtils.setupTestRootDir(
|
||||||
|
TestMRIntermediateDataEncryption.class);
|
||||||
|
// setup the base configurations and the clusters
|
||||||
|
final File dfsFolder = new File(testRootDir, "dfs");
|
||||||
|
final Path jobsDirPath = new Path(JOB_DIR_PATH);
|
||||||
|
|
||||||
|
commonConfig = createBaseConfiguration();
|
||||||
|
dfsCluster =
|
||||||
|
new MiniDFSCluster.Builder(commonConfig, dfsFolder)
|
||||||
|
.numDataNodes(2).build();
|
||||||
|
dfsCluster.waitActive();
|
||||||
|
mrCluster = MiniMRClientClusterFactory.create(
|
||||||
|
TestMRIntermediateDataEncryption.class, 2, commonConfig);
|
||||||
|
mrCluster.start();
|
||||||
|
fs = dfsCluster.getFileSystem();
|
||||||
|
if (fs.exists(jobsDirPath) && !fs.delete(jobsDirPath, true)) {
|
||||||
|
throw new IOException("Could not delete JobsDirPath" + jobsDirPath);
|
||||||
|
}
|
||||||
|
fs.mkdirs(jobsDirPath);
|
||||||
|
jobInputDirPath = new Path(jobsDirPath, "in-dir");
|
||||||
|
// run the input generator job.
|
||||||
|
Assert.assertEquals("Generating input should succeed", 0,
|
||||||
|
generateInputTextFile());
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void tearDown() throws IOException {
|
||||||
|
// shutdown clusters
|
||||||
|
if (mrCluster != null) {
|
||||||
|
mrCluster.stop();
|
||||||
|
}
|
||||||
|
if (dfsCluster != null) {
|
||||||
|
dfsCluster.shutdown();
|
||||||
|
}
|
||||||
|
// make sure that generated input file is deleted
|
||||||
|
final File textInputFile = new File(testRootDir, "input.txt");
|
||||||
|
if (textInputFile.exists()) {
|
||||||
|
textInputFile.delete();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a configuration object setting the common properties before
|
||||||
|
* initializing the clusters.
|
||||||
|
* @return configuration to be used as a base for the unit tests.
|
||||||
|
*/
|
||||||
|
private static Configuration createBaseConfiguration() {
|
||||||
|
// Set the jvm arguments to enable intermediate encryption.
|
||||||
|
Configuration conf =
|
||||||
|
MRJobConfUtil.initEncryptedIntermediateConfigsForTesting(null);
|
||||||
|
// Set the temp directories a subdir of the test directory.
|
||||||
|
conf = MRJobConfUtil.setLocalDirectoriesConfigForTesting(conf, testRootDir);
|
||||||
|
conf.setLong("dfs.blocksize", BLOCK_SIZE_DEFAULT);
|
||||||
|
return conf;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a thread safe BufferedWriter to be used among the task generators.
|
||||||
|
* @return A synchronized <code>BufferedWriter</code> to the input file.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
private static synchronized BufferedWriter getTextInputWriter()
|
||||||
|
throws IOException {
|
||||||
|
if (inputBufferedWriter == null) {
|
||||||
|
final File textInputFile = new File(testRootDir, "input.txt");
|
||||||
|
inputBufferedWriter = new BufferedWriter(new FileWriter(textInputFile));
|
||||||
|
}
|
||||||
|
return inputBufferedWriter;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Generates input text file of size <code>TOTAL_MBS_DEFAULT</code>.
|
||||||
|
* It creates a total <code>INPUT_GEN_NUM_THREADS</code> future tasks.
|
||||||
|
*
|
||||||
|
* @return the result of the input generation. 0 for success.
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
private static int generateInputTextFile() throws Exception {
|
||||||
|
final File textInputFile = new File(testRootDir, "input.txt");
|
||||||
|
final AtomicLong actualWrittenBytes = new AtomicLong(0);
|
||||||
|
// create INPUT_GEN_NUM_THREADS callables
|
||||||
|
final ExecutorService executor =
|
||||||
|
Executors.newFixedThreadPool(INPUT_GEN_NUM_THREADS);
|
||||||
|
//create a list to hold the Future object associated with Callable
|
||||||
|
final List<Future<Long>> inputGenerators = new ArrayList<>();
|
||||||
|
final Callable<Long> callableGen = new InputGeneratorTask();
|
||||||
|
final long startTime = Time.monotonicNow();
|
||||||
|
for (int i = 0; i < INPUT_GEN_NUM_THREADS; i++) {
|
||||||
|
//submit Callable tasks to be executed by thread pool
|
||||||
|
Future<Long> genFutureTask = executor.submit(callableGen);
|
||||||
|
inputGenerators.add(genFutureTask);
|
||||||
|
}
|
||||||
|
for (Future<Long> genFutureTask : inputGenerators) {
|
||||||
|
// print the return value of Future, notice the output delay in console
|
||||||
|
// because Future.get() waits for task to get completed
|
||||||
|
LOG.info("Received one task. Current total bytes: {}",
|
||||||
|
actualWrittenBytes.addAndGet(genFutureTask.get()));
|
||||||
|
}
|
||||||
|
getTextInputWriter().close();
|
||||||
|
final long endTime = Time.monotonicNow();
|
||||||
|
LOG.info("Finished generating input. Wrote {} bytes in {} seconds",
|
||||||
|
actualWrittenBytes.get(), ((endTime - startTime) * 1.0) / 1000);
|
||||||
|
executor.shutdown();
|
||||||
|
// copy text file to HDFS deleting the source.
|
||||||
|
fs.mkdirs(jobInputDirPath);
|
||||||
|
Path textInputPath =
|
||||||
|
fs.makeQualified(new Path(jobInputDirPath, "input.txt"));
|
||||||
|
fs.copyFromLocalFile(true, new Path(textInputFile.getAbsolutePath()),
|
||||||
|
textInputPath);
|
||||||
|
if (!fs.exists(textInputPath)) {
|
||||||
|
// the file was not generated. Fail.
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
// update the input size.
|
||||||
|
FileStatus[] fileStatus =
|
||||||
|
fs.listStatus(textInputPath);
|
||||||
|
inputFileSize = fileStatus[0].getLen();
|
||||||
|
LOG.info("Text input file; path: {}, size: {}",
|
||||||
|
textInputPath, inputFileSize);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setup() throws Exception {
|
||||||
|
LOG.info("Starting TestMRIntermediateDataEncryption#{}.......",
|
||||||
|
testTitleName);
|
||||||
|
final Path jobDirPath = new Path(JOB_DIR_PATH, testTitleName);
|
||||||
|
if (fs.exists(jobDirPath) && !fs.delete(jobDirPath, true)) {
|
||||||
|
throw new IOException("Could not delete " + jobDirPath);
|
||||||
|
}
|
||||||
|
fs.mkdirs(jobDirPath);
|
||||||
|
jobOutputPath = new Path(jobDirPath, "out-dir");
|
||||||
|
// Set the configuration for the job.
|
||||||
|
config = new Configuration(commonConfig);
|
||||||
|
config.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, isUber);
|
||||||
|
config.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 1.0F);
|
||||||
|
// set the configuration to make sure that we get spilled files
|
||||||
|
long ioSortMb = TASK_SORT_IO_MB_DEFAULT;
|
||||||
|
config.setLong(MRJobConfig.IO_SORT_MB, ioSortMb);
|
||||||
|
long mapMb = Math.max(2 * ioSortMb, config.getInt(MRJobConfig.MAP_MEMORY_MB,
|
||||||
|
MRJobConfig.DEFAULT_MAP_MEMORY_MB));
|
||||||
|
// make sure the map tasks will spill to disk.
|
||||||
|
config.setLong(MRJobConfig.MAP_MEMORY_MB, mapMb);
|
||||||
|
config.set(MRJobConfig.MAP_JAVA_OPTS, "-Xmx" + (mapMb - 200) + "m");
|
||||||
|
config.setInt(MRJobConfig.NUM_MAPS, numMappers);
|
||||||
|
// max attempts have to be set to 1 when intermediate encryption is enabled.
|
||||||
|
config.setInt("mapreduce.map.maxattempts", 1);
|
||||||
|
config.setInt("mapreduce.reduce.maxattempts", 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testWordCount() throws Exception {
|
||||||
|
LOG.info("........Starting main Job Driver #{} starting at {}.......",
|
||||||
|
testTitleName, Time.formatTime(System.currentTimeMillis()));
|
||||||
|
Job job = Job.getInstance(config);
|
||||||
|
job.getConfiguration().setInt(MRJobConfig.NUM_MAPS, numMappers);
|
||||||
|
job.setJarByClass(TestMRIntermediateDataEncryption.class);
|
||||||
|
job.setJobName("mr-spill-" + testTitleName);
|
||||||
|
// Mapper configuration
|
||||||
|
job.setMapperClass(TokenizerMapper.class);
|
||||||
|
job.setInputFormatClass(TextInputFormat.class);
|
||||||
|
job.setCombinerClass(LongSumReducer.class);
|
||||||
|
FileInputFormat.setMinInputSplitSize(job,
|
||||||
|
(inputFileSize + numMappers) / numMappers);
|
||||||
|
// Reducer configuration
|
||||||
|
job.setReducerClass(LongSumReducer.class);
|
||||||
|
job.setNumReduceTasks(numReducers);
|
||||||
|
job.setOutputKeyClass(Text.class);
|
||||||
|
job.setOutputValueClass(LongWritable.class);
|
||||||
|
// Set the IO paths for the job.
|
||||||
|
FileInputFormat.addInputPath(job, jobInputDirPath);
|
||||||
|
FileOutputFormat.setOutputPath(job, jobOutputPath);
|
||||||
|
SpillCallBackPathsFinder spillInjector =
|
||||||
|
(SpillCallBackPathsFinder) IntermediateEncryptedStream
|
||||||
|
.setSpillCBInjector(new SpillCallBackPathsFinder());
|
||||||
|
StringBuilder testSummary =
|
||||||
|
new StringBuilder(String.format("%n ===== test %s summary ======",
|
||||||
|
testTitleName));
|
||||||
|
try {
|
||||||
|
long startTime = Time.monotonicNow();
|
||||||
|
testSummary.append(String.format("%nJob %s ended at %s",
|
||||||
|
testTitleName, Time.formatTime(System.currentTimeMillis())));
|
||||||
|
Assert.assertTrue(job.waitForCompletion(true));
|
||||||
|
long endTime = Time.monotonicNow();
|
||||||
|
testSummary.append(String.format("%nJob %s ended at %s",
|
||||||
|
job.getJobName(), Time.formatTime(System.currentTimeMillis())));
|
||||||
|
testSummary.append(String.format("%n\tThe job took %.3f seconds",
|
||||||
|
(1.0 * (endTime - startTime)) / 1000));
|
||||||
|
long spilledRecords =
|
||||||
|
job.getCounters().findCounter(TaskCounter.SPILLED_RECORDS).getValue();
|
||||||
|
Assert.assertFalse(
|
||||||
|
"The encrypted spilled files should not be empty.",
|
||||||
|
spillInjector.getEncryptedSpilledFiles().isEmpty());
|
||||||
|
Assert.assertTrue("Spill records must be greater than 0",
|
||||||
|
spilledRecords > 0);
|
||||||
|
Assert.assertTrue("Job Output path [" + jobOutputPath + "] should exist",
|
||||||
|
fs.exists(jobOutputPath));
|
||||||
|
Assert.assertTrue("Invalid access to spill file positions",
|
||||||
|
spillInjector.getInvalidSpillEntries().isEmpty());
|
||||||
|
FileStatus[] fileStatus =
|
||||||
|
fs.listStatus(jobOutputPath,
|
||||||
|
new Utils.OutputFileUtils.OutputFilesFilter());
|
||||||
|
for (FileStatus fStatus : fileStatus) {
|
||||||
|
long fileSize = fStatus.getLen();
|
||||||
|
testSummary.append(
|
||||||
|
String.format("%n\tOutput file %s: %d",
|
||||||
|
fStatus.getPath(), fileSize));
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
testSummary.append(spillInjector.getSpilledFileReport());
|
||||||
|
LOG.info(testSummary.toString());
|
||||||
|
IntermediateEncryptedStream.resetSpillCBInjector();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A callable implementation that generates a portion of the
|
||||||
|
* <code>TOTAL_MBS_DEFAULT</code> into {@link #inputBufferedWriter}.
|
||||||
|
*/
|
||||||
|
static class InputGeneratorTask implements Callable<Long> {
|
||||||
|
@Override
|
||||||
|
public Long call() throws Exception {
|
||||||
|
long bytesWritten = 0;
|
||||||
|
final ThreadLocalRandom rand = ThreadLocalRandom.current();
|
||||||
|
final long totalBytes = 1024 * 1024 * TOTAL_MBS_DEFAULT;
|
||||||
|
final long bytesPerTask = totalBytes / INPUT_GEN_NUM_THREADS;
|
||||||
|
final String newLine = System.lineSeparator();
|
||||||
|
final BufferedWriter writer = getTextInputWriter();
|
||||||
|
while (bytesWritten < bytesPerTask) {
|
||||||
|
String sentence =
|
||||||
|
RandomTextWriter.generateSentenceWithRand(rand, rand.nextInt(5, 20))
|
||||||
|
.concat(newLine);
|
||||||
|
writer.write(sentence);
|
||||||
|
bytesWritten += sentence.length();
|
||||||
|
}
|
||||||
|
writer.flush();
|
||||||
|
LOG.info("Task {} finished. Wrote {} bytes.",
|
||||||
|
Thread.currentThread().getName(), bytesWritten);
|
||||||
|
return bytesWritten;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A Test tokenizer Mapper.
|
||||||
|
*/
|
||||||
|
public static class TokenizerMapper
|
||||||
|
extends Mapper<Object, Text, Text, LongWritable> {
|
||||||
|
|
||||||
|
private final static LongWritable ONE = new LongWritable(1);
|
||||||
|
private final Text word = new Text();
|
||||||
|
|
||||||
|
public void map(Object key, Text value,
|
||||||
|
Context context) throws IOException, InterruptedException {
|
||||||
|
StringTokenizer itr = new StringTokenizer(value.toString());
|
||||||
|
while (itr.hasMoreTokens()) {
|
||||||
|
word.set(itr.nextToken());
|
||||||
|
context.write(word, ONE);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue