From 299b8062f133f93f0fcd069cca169d63c37c177c Mon Sep 17 00:00:00 2001 From: Jim Brennan Date: Mon, 15 Mar 2021 20:13:17 +0000 Subject: [PATCH] MAPREDUCE-7322. revisiting TestMRIntermediateDataEncryption. Contributed by Ahmed Hussein. --- .../org/apache/hadoop/util/JarFinder.java | 5 +- .../hadoop/mapreduce/v2/app/TestRecovery.java | 23 +- .../org/apache/hadoop/mapred/BackupStore.java | 5 +- .../org/apache/hadoop/mapred/MapTask.java | 34 +- .../java/org/apache/hadoop/mapred/Merger.java | 6 +- .../security/IntermediateEncryptedStream.java | 89 ++++ .../security/SpillCallBackInjector.java | 86 ++++ .../security/SpillCallBackPathsFinder.java | 193 ++++++++ .../mapreduce/security/package-info.java | 25 ++ .../hadoop/mapreduce/task/reduce/Fetcher.java | 5 +- .../mapreduce/task/reduce/LocalFetcher.java | 6 +- .../task/reduce/MergeManagerImpl.java | 14 +- .../task/reduce/OnDiskMapOutput.java | 5 +- .../hadoop/mapreduce/util/MRJobConfUtil.java | 58 +++ .../mapreduce/task/reduce/TestMerger.java | 81 ++-- .../hadoop/mapred/TestLocalJobSubmission.java | 112 +++-- .../TestMRIntermediateDataEncryption.java | 327 -------------- .../mapred/TestMROpportunisticMaps.java | 5 +- .../org/apache/hadoop/mapred/TestMerge.java | 6 +- .../hadoop/mapreduce/RandomTextWriter.java | 32 +- .../TestMRIntermediateDataEncryption.java | 411 ++++++++++++++++++ 21 files changed, 1081 insertions(+), 447 deletions(-) create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/IntermediateEncryptedStream.java create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/SpillCallBackInjector.java create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/SpillCallBackPathsFinder.java create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/package-info.java delete mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRIntermediateDataEncryption.java create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestMRIntermediateDataEncryption.java diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/JarFinder.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/JarFinder.java index 3f1bb2d61dc..85d95738b5e 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/JarFinder.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/JarFinder.java @@ -165,10 +165,11 @@ public class JarFinder { if (!testDir.exists()) { testDir.mkdirs(); } - File tempJar = File.createTempFile("hadoop-", "", testDir); - tempJar = new File(tempJar.getAbsolutePath() + ".jar"); + File tempFile = File.createTempFile("hadoop-", "", testDir); + File tempJar = new File(tempFile.getAbsolutePath() + ".jar"); createJar(baseDir, tempJar); tempJar.deleteOnExit(); + tempFile.deleteOnExit(); return tempJar.getAbsolutePath(); } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java index 00c9b3aeefc..5a23b58875a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java @@ -40,6 +40,7 @@ import java.util.Map; import java.util.concurrent.TimeoutException; +import org.apache.hadoop.mapreduce.util.MRJobConfUtil; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptFailEvent; import org.junit.Assert; @@ -105,6 +106,8 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.SystemClock; + +import org.junit.BeforeClass; import org.junit.Test; import org.mockito.ArgumentCaptor; import org.slf4j.Logger; @@ -114,15 +117,24 @@ import org.slf4j.LoggerFactory; public class TestRecovery { private static final Logger LOG = LoggerFactory.getLogger(TestRecovery.class); - private static Path outputDir = new Path(new File("target", - TestRecovery.class.getName()).getAbsolutePath() + - Path.SEPARATOR + "out"); + + private static File testRootDir; + private static Path outputDir; private static String partFile = "part-r-00000"; private Text key1 = new Text("key1"); private Text key2 = new Text("key2"); private Text val1 = new Text("val1"); private Text val2 = new Text("val2"); + @BeforeClass + public static void setupClass() throws Exception { + // setup the test root directory + testRootDir = + GenericTestUtils.setupTestRootDir( + TestRecovery.class); + outputDir = new Path(testRootDir.getAbsolutePath(), "out"); + } + /** * AM with 2 maps and 1 reduce. For 1st map, one attempt fails, one attempt * completely disappears because of failed launch, one attempt gets killed and @@ -600,14 +612,13 @@ public class TestRecovery { MRApp app = new MRAppWithHistory(1, 1, false, this.getClass().getName(), true, ++runCount) { }; - Configuration conf = new Configuration(); + Configuration conf = + MRJobConfUtil.initEncryptedIntermediateConfigsForTesting(null); conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true); conf.setBoolean("mapred.mapper.new-api", true); conf.setBoolean("mapred.reducer.new-api", true); conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false); conf.set(FileOutputFormat.OUTDIR, outputDir.toString()); - conf.setBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA, true); - // run the MR job at the first attempt Job jobAttempt1 = app.submit(conf); app.waitForState(jobAttempt1, JobState.RUNNING); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/BackupStore.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/BackupStore.java index 94ad9e0187e..5bd26883af0 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/BackupStore.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/BackupStore.java @@ -42,7 +42,8 @@ import org.apache.hadoop.mapred.Merger.Segment; import org.apache.hadoop.mapreduce.MRConfig; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TaskAttemptID; -import org.apache.hadoop.mapreduce.CryptoUtils; +import org.apache.hadoop.mapreduce.security.IntermediateEncryptedStream; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -576,7 +577,7 @@ public class BackupStore { file = lDirAlloc.getLocalPathForWrite(tmp.toUri().getPath(), -1, conf); FSDataOutputStream out = fs.create(file); - out = CryptoUtils.wrapIfNecessary(conf, out); + out = IntermediateEncryptedStream.wrapIfNecessary(conf, out, tmp); return new Writer(conf, out, null, null, null, null, true); } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java index 17461b196b3..fa4396d77f4 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java @@ -63,6 +63,7 @@ import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter; import org.apache.hadoop.mapreduce.lib.map.WrappedMapper; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter; +import org.apache.hadoop.mapreduce.security.IntermediateEncryptedStream; import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex; import org.apache.hadoop.mapreduce.task.MapContextImpl; import org.apache.hadoop.mapreduce.CryptoUtils; @@ -1630,7 +1631,9 @@ public class MapTask extends Task { IFile.Writer writer = null; try { long segmentStart = out.getPos(); - partitionOut = CryptoUtils.wrapIfNecessary(job, out, false); + partitionOut = + IntermediateEncryptedStream.wrapIfNecessary(job, out, false, + filename); writer = new Writer(job, partitionOut, keyClass, valClass, codec, spilledRecordsCounter); if (combinerRunner == null) { @@ -1687,6 +1690,7 @@ public class MapTask extends Task { Path indexFilename = mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH); + IntermediateEncryptedStream.addSpillIndexFile(indexFilename, job); spillRec.writeToFile(indexFilename, job); } else { indexCacheList.add(spillRec); @@ -1727,7 +1731,9 @@ public class MapTask extends Task { try { long segmentStart = out.getPos(); // Create a new codec, don't care! - partitionOut = CryptoUtils.wrapIfNecessary(job, out, false); + partitionOut = + IntermediateEncryptedStream.wrapIfNecessary(job, out, false, + filename); writer = new IFile.Writer(job, partitionOut, keyClass, valClass, codec, spilledRecordsCounter); @@ -1761,6 +1767,7 @@ public class MapTask extends Task { Path indexFilename = mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH); + IntermediateEncryptedStream.addSpillIndexFile(indexFilename, job); spillRec.writeToFile(indexFilename, job); } else { indexCacheList.add(spillRec); @@ -1854,15 +1861,19 @@ public class MapTask extends Task { finalOutFileSize += rfs.getFileStatus(filename[i]).getLen(); } if (numSpills == 1) { //the spill is the final output + Path indexFileOutput = + mapOutputFile.getOutputIndexFileForWriteInVolume(filename[0]); sameVolRename(filename[0], mapOutputFile.getOutputFileForWriteInVolume(filename[0])); if (indexCacheList.size() == 0) { - sameVolRename(mapOutputFile.getSpillIndexFile(0), - mapOutputFile.getOutputIndexFileForWriteInVolume(filename[0])); + Path indexFilePath = mapOutputFile.getSpillIndexFile(0); + IntermediateEncryptedStream.validateSpillIndexFile( + indexFilePath, job); + sameVolRename(indexFilePath, indexFileOutput); } else { - indexCacheList.get(0).writeToFile( - mapOutputFile.getOutputIndexFileForWriteInVolume(filename[0]), job); + indexCacheList.get(0).writeToFile(indexFileOutput, job); } + IntermediateEncryptedStream.addSpillIndexFile(indexFileOutput, job); sortPhase.complete(); return; } @@ -1870,6 +1881,7 @@ public class MapTask extends Task { // read in paged indices for (int i = indexCacheList.size(); i < numSpills; ++i) { Path indexFileName = mapOutputFile.getSpillIndexFile(i); + IntermediateEncryptedStream.validateSpillIndexFile(indexFileName, job); indexCacheList.add(new SpillRecord(indexFileName, job)); } @@ -1881,7 +1893,7 @@ public class MapTask extends Task { mapOutputFile.getOutputFileForWrite(finalOutFileSize); Path finalIndexFile = mapOutputFile.getOutputIndexFileForWrite(finalIndexFileSize); - + IntermediateEncryptedStream.addSpillIndexFile(finalIndexFile, job); //The output stream for the final single output file FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096); FSDataOutputStream finalPartitionOut = null; @@ -1893,8 +1905,9 @@ public class MapTask extends Task { try { for (int i = 0; i < partitions; i++) { long segmentStart = finalOut.getPos(); - finalPartitionOut = CryptoUtils.wrapIfNecessary(job, finalOut, - false); + finalPartitionOut = + IntermediateEncryptedStream.wrapIfNecessary(job, finalOut, + false, finalOutputFile); Writer writer = new Writer(job, finalPartitionOut, keyClass, valClass, codec, null); writer.close(); @@ -1957,7 +1970,8 @@ public class MapTask extends Task { //write merged output to disk long segmentStart = finalOut.getPos(); - finalPartitionOut = CryptoUtils.wrapIfNecessary(job, finalOut, false); + finalPartitionOut = IntermediateEncryptedStream.wrapIfNecessary(job, + finalOut, false, finalOutputFile); Writer writer = new Writer(job, finalPartitionOut, keyClass, valClass, codec, spilledRecordsCounter); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java index 16f88370bb6..d783752cf71 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java @@ -40,6 +40,7 @@ import org.apache.hadoop.mapred.IFile.Writer; import org.apache.hadoop.mapreduce.MRConfig; import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.mapreduce.CryptoUtils; +import org.apache.hadoop.mapreduce.security.IntermediateEncryptedStream; import org.apache.hadoop.util.PriorityQueue; import org.apache.hadoop.util.Progress; import org.apache.hadoop.util.Progressable; @@ -302,7 +303,7 @@ public class Merger { FSDataInputStream in = fs.open(file); in.seek(segmentOffset); - in = CryptoUtils.wrapIfNecessary(conf, in); + in = IntermediateEncryptedStream.wrapIfNecessary(conf, in, file); reader = new Reader(conf, in, segmentLength - CryptoUtils.cryptoPadding(conf), codec, readsCounter); @@ -730,7 +731,8 @@ public class Merger { approxOutputSize, conf); FSDataOutputStream out = fs.create(outputFile); - out = CryptoUtils.wrapIfNecessary(conf, out); + out = IntermediateEncryptedStream.wrapIfNecessary(conf, out, + outputFile); Writer writer = new Writer(conf, out, keyClass, valueClass, codec, writesCounter, true); writeFile(this, writer, reporter, conf); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/IntermediateEncryptedStream.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/IntermediateEncryptedStream.java new file mode 100644 index 00000000000..eb14a208c99 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/IntermediateEncryptedStream.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapreduce.security; + +import java.io.IOException; +import java.io.InputStream; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.CryptoUtils; + +/** + * Used to wrap helpers while spilling intermediate files. + * Setting the {@link SpillCallBackInjector} helps in: + * 1- adding callbacks to capture the path of the spilled files. + * 2- Verifying the encryption when intermediate encryption is enabled. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public final class IntermediateEncryptedStream { + + private static SpillCallBackInjector prevSpillCBInjector = null; + + public static FSDataOutputStream wrapIfNecessary(Configuration conf, + FSDataOutputStream out, Path outPath) throws IOException { + SpillCallBackInjector.get().writeSpillFileCB(outPath, out, conf); + return CryptoUtils.wrapIfNecessary(conf, out, true); + } + + public static FSDataOutputStream wrapIfNecessary(Configuration conf, + FSDataOutputStream out, boolean closeOutputStream, + Path outPath) throws IOException { + SpillCallBackInjector.get().writeSpillFileCB(outPath, out, conf); + return CryptoUtils.wrapIfNecessary(conf, out, closeOutputStream); + } + + public static FSDataInputStream wrapIfNecessary(Configuration conf, + FSDataInputStream in, Path inputPath) throws IOException { + SpillCallBackInjector.get().getSpillFileCB(inputPath, in, conf); + return CryptoUtils.wrapIfNecessary(conf, in); + } + + public static InputStream wrapIfNecessary(Configuration conf, + InputStream in, long length, Path inputPath) throws IOException { + SpillCallBackInjector.get().getSpillFileCB(inputPath, in, conf); + return CryptoUtils.wrapIfNecessary(conf, in, length); + } + + public static void addSpillIndexFile(Path indexFilename, Configuration conf) { + SpillCallBackInjector.get().addSpillIndexFileCB(indexFilename, conf); + } + + public static void validateSpillIndexFile(Path indexFilename, + Configuration conf) { + SpillCallBackInjector.get().validateSpillIndexFileCB(indexFilename, conf); + } + + public static SpillCallBackInjector resetSpillCBInjector() { + return setSpillCBInjector(prevSpillCBInjector); + } + + public synchronized static SpillCallBackInjector setSpillCBInjector( + SpillCallBackInjector spillInjector) { + prevSpillCBInjector = + SpillCallBackInjector.getAndSet(spillInjector); + return spillInjector; + } + + private IntermediateEncryptedStream() {} +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/SpillCallBackInjector.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/SpillCallBackInjector.java new file mode 100644 index 00000000000..9b23c518f1a --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/SpillCallBackInjector.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapreduce.security; + +import java.io.IOException; +import java.io.InputStream; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; + +/** + * Used for injecting callbacks while spilling files. + * Calls into this are a no-op in production code. + */ +@VisibleForTesting +@InterfaceAudience.Private +public class SpillCallBackInjector { + private static SpillCallBackInjector instance = new SpillCallBackInjector(); + public static SpillCallBackInjector get() { + return instance; + } + /** + * Sets the global SpillFilesCBInjector to the new value, returning the old + * value. + * + * @param spillInjector the new implementation for the spill injector. + * @return the previous implementation. + */ + public static SpillCallBackInjector getAndSet( + SpillCallBackInjector spillInjector) { + SpillCallBackInjector prev = instance; + instance = spillInjector; + return prev; + } + + public void writeSpillIndexFileCB(Path path) { + // do nothing + } + + public void writeSpillFileCB(Path path, FSDataOutputStream out, + Configuration conf) { + // do nothing + } + + public void getSpillFileCB(Path path, InputStream is, Configuration conf) { + // do nothing + } + + public String getSpilledFileReport() { + return null; + } + + public void handleErrorInSpillFill(Path path, Exception e) { + // do nothing + } + + public void corruptSpilledFile(Path fileName) throws IOException { + // do nothing + } + + public void addSpillIndexFileCB(Path path, Configuration conf) { + // do nothing + } + + public void validateSpillIndexFileCB(Path path, Configuration conf) { + // do nothing + } +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/SpillCallBackPathsFinder.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/SpillCallBackPathsFinder.java new file mode 100644 index 00000000000..7be99e556e5 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/SpillCallBackPathsFinder.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapreduce.security; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Collections; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.crypto.CryptoStreamUtils; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.CryptoUtils; + +/** + * An implementation class that keeps track of the spilled files. + */ +public class SpillCallBackPathsFinder extends SpillCallBackInjector { + private static final Logger LOG = + LoggerFactory.getLogger(SpillCallBackPathsFinder.class); + /** + * Encrypted spilled files. + */ + private final Map> encryptedSpillFiles = + Collections.synchronizedMap(new ConcurrentHashMap<>()); + /** + * Non-Encrypted spilled files. + */ + private final Map> spillFiles = + Collections.synchronizedMap(new ConcurrentHashMap<>()); + /** + * Invalid position access. + */ + private final Map> invalidAccessMap = + Collections.synchronizedMap(new ConcurrentHashMap<>()); + /** + * Index spill files. + */ + private final Set indexSpillFiles = ConcurrentHashMap.newKeySet(); + /** + * Paths that were not found in the maps. + */ + private final Set negativeCache = ConcurrentHashMap.newKeySet(); + + protected Map> getFilesMap(Configuration config) { + if (CryptoUtils.isEncryptedSpillEnabled(config)) { + return encryptedSpillFiles; + } + return spillFiles; + } + + @Override + public void writeSpillFileCB(Path path, FSDataOutputStream out, + Configuration conf) { + long outPos = out.getPos(); + getFilesMap(conf) + .computeIfAbsent(path, p -> ConcurrentHashMap.newKeySet()) + .add(outPos); + LOG.debug("writeSpillFileCB.. path:{}; pos:{}", path, outPos); + } + + @Override + public void getSpillFileCB(Path path, InputStream is, Configuration conf) { + if (path == null) { + return; + } + Set pathEntries = getFilesMap(conf).get(path); + if (pathEntries != null) { + try { + long isPos = CryptoStreamUtils.getInputStreamOffset(is); + if (pathEntries.contains(isPos)) { + LOG.debug("getSpillFileCB... Path {}; Pos: {}", path, isPos); + return; + } + invalidAccessMap + .computeIfAbsent(path, p -> ConcurrentHashMap.newKeySet()) + .add(isPos); + LOG.debug("getSpillFileCB... access incorrect position.. " + + "Path {}; Pos: {}", path, isPos); + } catch (IOException e) { + LOG.error("Could not get inputStream position.. Path {}", path, e); + // do nothing + } + return; + } + negativeCache.add(path); + LOG.warn("getSpillFileCB.. Could not find spilled file .. Path: {}", path); + } + + @Override + public String getSpilledFileReport() { + StringBuilder strBuilder = + new StringBuilder("\n++++++++ Spill Report ++++++++") + .append(dumpMapEntries("Encrypted Spilled Files", + encryptedSpillFiles)) + .append(dumpMapEntries("Non-Encrypted Spilled Files", + spillFiles)) + .append(dumpMapEntries("Invalid Spill Access", + invalidAccessMap)) + .append("\n ----- Spilled Index Files ----- ") + .append(indexSpillFiles.size()); + for (Path p : indexSpillFiles) { + strBuilder.append("\n\t index-path: ").append(p.toString()); + } + strBuilder.append("\n ----- Negative Cache files ----- ") + .append(negativeCache.size()); + for (Path p : negativeCache) { + strBuilder.append("\n\t path: ").append(p.toString()); + } + return strBuilder.toString(); + } + + @Override + public void addSpillIndexFileCB(Path path, Configuration conf) { + if (path == null) { + return; + } + indexSpillFiles.add(path); + LOG.debug("addSpillIndexFileCB... Path: {}", path); + } + + @Override + public void validateSpillIndexFileCB(Path path, Configuration conf) { + if (path == null) { + return; + } + if (indexSpillFiles.contains(path)) { + LOG.debug("validateSpillIndexFileCB.. Path: {}", path); + return; + } + LOG.warn("validateSpillIndexFileCB.. could not retrieve indexFile.. " + + "Path: {}", path); + negativeCache.add(path); + } + + public Set getEncryptedSpilledFiles() { + return Collections.unmodifiableSet(encryptedSpillFiles.keySet()); + } + + /** + * Gets the set of path:pos of the entries that were accessed incorrectly. + * @return a set of string in the format of {@literal Path[Pos]} + */ + public Set getInvalidSpillEntries() { + Set result = new LinkedHashSet<>(); + for (Entry> spillMapEntry: invalidAccessMap.entrySet()) { + for (Long singleEntry : spillMapEntry.getValue()) { + result.add(String.format("%s[%d]", + spillMapEntry.getKey(), singleEntry)); + } + } + return result; + } + + private String dumpMapEntries(String label, + Map> entriesMap) { + StringBuilder strBuilder = + new StringBuilder(String.format("%n ----- %s ----- %d", label, + entriesMap.size())); + for (Entry> encryptedSpillEntry + : entriesMap.entrySet()) { + strBuilder.append(String.format("%n\t\tpath: %s", + encryptedSpillEntry.getKey())); + for (Long singlePos : encryptedSpillEntry.getValue()) { + strBuilder.append(String.format("%n\t\t\tentry: %d", singlePos)); + } + } + return strBuilder.toString(); + } +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/package-info.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/package-info.java new file mode 100644 index 00000000000..451e6f65503 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/package-info.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +/** + * Helper classes for the shuffle/spill encryptions. + */ +package org.apache.hadoop.mapreduce.security; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java index 6e29fe47d88..d8bc68c6c71 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java @@ -42,6 +42,7 @@ import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapreduce.MRConfig; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.security.IntermediateEncryptedStream; import org.apache.hadoop.mapreduce.security.SecureShuffleUtils; import org.apache.hadoop.mapreduce.CryptoUtils; import org.apache.hadoop.security.ssl.SSLFactory; @@ -512,7 +513,9 @@ class Fetcher extends Thread { } InputStream is = input; - is = CryptoUtils.wrapIfNecessary(jobConf, is, compressedLength); + is = + IntermediateEncryptedStream.wrapIfNecessary(jobConf, is, + compressedLength, null); compressedLength -= CryptoUtils.cryptoPadding(jobConf); decompressedLength -= CryptoUtils.cryptoPadding(jobConf); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java index 90160cfa07c..3ae1e746fcb 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java @@ -36,6 +36,8 @@ import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.SpillRecord; import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.CryptoUtils; +import org.apache.hadoop.mapreduce.security.IntermediateEncryptedStream; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -151,7 +153,9 @@ class LocalFetcher extends Fetcher { FileSystem localFs = FileSystem.getLocal(job).getRaw(); FSDataInputStream inStream = localFs.open(mapOutputFileName); try { - inStream = CryptoUtils.wrapIfNecessary(job, inStream); + inStream = + IntermediateEncryptedStream.wrapIfNecessary(job, inStream, + mapOutputFileName); inStream.seek(ir.startOffset + CryptoUtils.cryptoPadding(job)); mapOutput.shuffle(LOCALHOST, inStream, compressedLength, decompressedLength, metrics, reporter); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java index ae44ba4a91d..29724de0eb9 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java @@ -53,7 +53,7 @@ import org.apache.hadoop.mapred.Task.CombineValuesIterator; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskID; -import org.apache.hadoop.mapreduce.CryptoUtils; +import org.apache.hadoop.mapreduce.security.IntermediateEncryptedStream; import org.apache.hadoop.mapreduce.task.reduce.MapOutput.MapOutputComparator; import org.apache.hadoop.util.Progress; import org.apache.hadoop.util.ReflectionUtils; @@ -468,7 +468,9 @@ public class MergeManagerImpl implements MergeManager { mergeOutputSize).suffix( Task.MERGED_OUTPUT_PREFIX); - FSDataOutputStream out = CryptoUtils.wrapIfNecessary(jobConf, rfs.create(outputPath)); + FSDataOutputStream out = + IntermediateEncryptedStream.wrapIfNecessary(jobConf, + rfs.create(outputPath), outputPath); Writer writer = new Writer(jobConf, out, (Class) jobConf.getMapOutputKeyClass(), (Class) jobConf.getMapOutputValueClass(), codec, null, true); @@ -552,7 +554,9 @@ public class MergeManagerImpl implements MergeManager { localDirAllocator.getLocalPathForWrite(inputs.get(0).toString(), approxOutputSize, jobConf).suffix(Task.MERGED_OUTPUT_PREFIX); - FSDataOutputStream out = CryptoUtils.wrapIfNecessary(jobConf, rfs.create(outputPath)); + FSDataOutputStream out = + IntermediateEncryptedStream.wrapIfNecessary(jobConf, + rfs.create(outputPath), outputPath); Writer writer = new Writer(jobConf, out, (Class) jobConf.getMapOutputKeyClass(), (Class) jobConf.getMapOutputValueClass(), codec, null, true); @@ -735,7 +739,9 @@ public class MergeManagerImpl implements MergeManager { tmpDir, comparator, reporter, spilledRecordsCounter, null, mergePhase); - FSDataOutputStream out = CryptoUtils.wrapIfNecessary(job, fs.create(outputPath)); + FSDataOutputStream out = + IntermediateEncryptedStream.wrapIfNecessary(job, + fs.create(outputPath), outputPath); Writer writer = new Writer(job, out, keyClass, valueClass, codec, null, true); try { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java index 5f96a030988..54a9522e2cc 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java @@ -37,7 +37,7 @@ import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.MapOutputFile; import org.apache.hadoop.mapreduce.TaskAttemptID; -import org.apache.hadoop.mapreduce.CryptoUtils; +import org.apache.hadoop.mapreduce.security.IntermediateEncryptedStream; import org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.CompressAwarePath; import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; @@ -84,7 +84,8 @@ class OnDiskMapOutput extends IFileWrappedMapOutput { this.fs = fs; this.outputPath = outputPath; tmpOutputPath = getTempPath(outputPath, fetcher); - disk = CryptoUtils.wrapIfNecessary(conf, fs.create(tmpOutputPath)); + disk = IntermediateEncryptedStream.wrapIfNecessary(conf, + fs.create(tmpOutputPath), tmpOutputPath); } @VisibleForTesting diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/MRJobConfUtil.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/MRJobConfUtil.java index 4e4e78e1e3c..4319e174168 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/MRJobConfUtil.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/MRJobConfUtil.java @@ -17,14 +17,22 @@ */ package org.apache.hadoop.mapreduce.util; +import java.io.File; import java.util.concurrent.TimeUnit; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.MRJobConfig; /** * A class that contains utility methods for MR Job configuration. */ public final class MRJobConfUtil { + private static final Logger LOG = + LoggerFactory.getLogger(MRJobConfUtil.class); public static final String REDACTION_REPLACEMENT_VAL = "*********(redacted)"; /** @@ -130,4 +138,54 @@ public final class MRJobConfUtil { public static double convertTaskProgressToFactor(final float progress) { return Math.floor(progress * MRJobConfUtil.PROGRESS_MIN_DELTA_FACTOR); } + + /** + * For unit tests, use urandom to avoid the YarnChild process from hanging + * on low entropy systems. + */ + private static final String TEST_JVM_SECURITY_EGD_OPT = + "-Djava.security.egd=file:/dev/./urandom"; + + public static Configuration initEncryptedIntermediateConfigsForTesting( + Configuration conf) { + Configuration config = + (conf == null) ? new Configuration(): conf; + final String childJVMOpts = + TEST_JVM_SECURITY_EGD_OPT.concat(" ") + .concat(config.get("mapred.child.java.opts", " ")); + // Set the jvm arguments. + config.set("yarn.app.mapreduce.am.admin-command-opts", + TEST_JVM_SECURITY_EGD_OPT); + config.set("mapred.child.java.opts", childJVMOpts); + config.setBoolean("mapreduce.job.encrypted-intermediate-data", true); + return config; + } + + /** + * Set local directories so that the generated folders is subdirectory of the + * test directories. + * @param conf + * @param testRootDir + * @return + */ + public static Configuration setLocalDirectoriesConfigForTesting( + Configuration conf, File testRootDir) { + Configuration config = + (conf == null) ? new Configuration(): conf; + final File hadoopLocalDir = new File(testRootDir, "hadoop-dir"); + // create the directory + if (!hadoopLocalDir.getAbsoluteFile().mkdirs()) { + LOG.info("{} directory already exists", hadoopLocalDir.getPath()); + } + Path mapredHadoopTempDir = new Path(hadoopLocalDir.getPath()); + Path mapredSystemDir = new Path(mapredHadoopTempDir, "system"); + Path stagingDir = new Path(mapredHadoopTempDir, "tmp/staging"); + // Set the temp directories a subdir of the test directory. + config.set("mapreduce.jobtracker.staging.root.dir", stagingDir.toString()); + config.set("mapreduce.jobtracker.system.dir", mapredSystemDir.toString()); + config.set("mapreduce.cluster.temp.dir", mapredHadoopTempDir.toString()); + config.set("mapreduce.cluster.local.dir", + new Path(mapredHadoopTempDir, "local").toString()); + return config; + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMerger.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMerger.java index 13cb6b32214..bd90941fa13 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMerger.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMerger.java @@ -24,6 +24,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import java.io.ByteArrayOutputStream; +import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -50,43 +51,60 @@ import org.apache.hadoop.mapred.Merger; import org.apache.hadoop.mapred.Merger.Segment; import org.apache.hadoop.mapred.RawKeyValueIterator; import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.mapreduce.CryptoUtils; import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.MRConfig; -import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.mapreduce.security.IntermediateEncryptedStream; import org.apache.hadoop.mapreduce.security.TokenCache; import org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.CompressAwarePath; +import org.apache.hadoop.mapreduce.util.MRJobConfUtil; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.Progress; import org.apache.hadoop.util.Progressable; import org.junit.Assert; import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TestName; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; public class TestMerger { - - private Configuration conf; + private static File testRootDir; + @Rule + public TestName unitTestName = new TestName(); + private File unitTestDir; private JobConf jobConf; private FileSystem fs; - @Before - public void setup() throws IOException { - conf = new Configuration(); - jobConf = new JobConf(); - fs = FileSystem.getLocal(conf); + @BeforeClass + public static void setupClass() throws Exception { + // setup the test root directory + testRootDir = + GenericTestUtils.setupTestRootDir( + TestMerger.class); } + @Before + public void setup() throws IOException { + unitTestDir = new File(testRootDir, unitTestName.getMethodName()); + unitTestDir.mkdirs(); + jobConf = new JobConf(); + // Set the temp directories a subdir of the test directory. + MRJobConfUtil.setLocalDirectoriesConfigForTesting(jobConf, unitTestDir); + jobConf.set(MRConfig.FRAMEWORK_NAME, "local"); + fs = FileSystem.getLocal(jobConf); + } @Test public void testEncryptedMerger() throws Throwable { - jobConf.setBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA, true); - conf.setBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA, true); + // Enable intermediate encryption. + MRJobConfUtil.initEncryptedIntermediateConfigsForTesting(jobConf); Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials(); TokenCache.setEncryptedSpillKey(new byte[16], credentials); UserGroupInformation.getCurrentUser().addCredentials(credentials); @@ -106,8 +124,8 @@ public class TestMerger { LocalDirAllocator lda = new LocalDirAllocator(MRConfig.LOCAL_DIR); MergeManagerImpl mergeManager = new MergeManagerImpl( - reduceId1, jobConf, fs, lda, Reporter.NULL, null, null, null, null, null, - null, null, new Progress(), new MROutputFiles()); + reduceId1, jobConf, fs, lda, Reporter.NULL, null, null, null, null, + null, null, null, new Progress(), new MROutputFiles()); // write map outputs Map map1 = new TreeMap(); @@ -115,12 +133,12 @@ public class TestMerger { map1.put("carrot", "delicious"); Map map2 = new TreeMap(); map1.put("banana", "pretty good"); - byte[] mapOutputBytes1 = writeMapOutput(conf, map1); - byte[] mapOutputBytes2 = writeMapOutput(conf, map2); + byte[] mapOutputBytes1 = writeMapOutput(jobConf, map1); + byte[] mapOutputBytes2 = writeMapOutput(jobConf, map2); InMemoryMapOutput mapOutput1 = new InMemoryMapOutput( - conf, mapId1, mergeManager, mapOutputBytes1.length, null, true); + jobConf, mapId1, mergeManager, mapOutputBytes1.length, null, true); InMemoryMapOutput mapOutput2 = new InMemoryMapOutput( - conf, mapId2, mergeManager, mapOutputBytes2.length, null, true); + jobConf, mapId2, mergeManager, mapOutputBytes2.length, null, true); System.arraycopy(mapOutputBytes1, 0, mapOutput1.getMemory(), 0, mapOutputBytes1.length); System.arraycopy(mapOutputBytes2, 0, mapOutput2.getMemory(), 0, @@ -150,12 +168,12 @@ public class TestMerger { map3.put("carrot", "amazing"); Map map4 = new TreeMap(); map4.put("banana", "bla"); - byte[] mapOutputBytes3 = writeMapOutput(conf, map3); - byte[] mapOutputBytes4 = writeMapOutput(conf, map4); + byte[] mapOutputBytes3 = writeMapOutput(jobConf, map3); + byte[] mapOutputBytes4 = writeMapOutput(jobConf, map4); InMemoryMapOutput mapOutput3 = new InMemoryMapOutput( - conf, mapId3, mergeManager, mapOutputBytes3.length, null, true); + jobConf, mapId3, mergeManager, mapOutputBytes3.length, null, true); InMemoryMapOutput mapOutput4 = new InMemoryMapOutput( - conf, mapId4, mergeManager, mapOutputBytes4.length, null, true); + jobConf, mapId4, mergeManager, mapOutputBytes4.length, null, true); System.arraycopy(mapOutputBytes3, 0, mapOutput3.getMemory(), 0, mapOutputBytes3.length); System.arraycopy(mapOutputBytes4, 0, mapOutput4.getMemory(), 0, @@ -174,12 +192,13 @@ public class TestMerger { Assert.assertEquals(2, mergeManager.onDiskMapOutputs.size()); List paths = new ArrayList(); - Iterator iterator = mergeManager.onDiskMapOutputs.iterator(); + Iterator iterator = + mergeManager.onDiskMapOutputs.iterator(); List keys = new ArrayList(); List values = new ArrayList(); while (iterator.hasNext()) { CompressAwarePath next = iterator.next(); - readOnDiskMapOutput(conf, fs, next, keys, values); + readOnDiskMapOutput(jobConf, fs, next, keys, values); paths.add(next); } assertThat(keys).isEqualTo(Arrays.asList("apple", "banana", "carrot", @@ -189,8 +208,8 @@ public class TestMerger { mergeManager.close(); mergeManager = new MergeManagerImpl( - reduceId2, jobConf, fs, lda, Reporter.NULL, null, null, null, null, null, - null, null, new Progress(), new MROutputFiles()); + reduceId2, jobConf, fs, lda, Reporter.NULL, null, null, null, null, + null, null, null, new Progress(), new MROutputFiles()); MergeThread onDiskMerger = mergeManager.createOnDiskMerger(); onDiskMerger.merge(paths); @@ -199,7 +218,8 @@ public class TestMerger { keys = new ArrayList(); values = new ArrayList(); - readOnDiskMapOutput(conf, fs, mergeManager.onDiskMapOutputs.iterator().next(), keys, values); + readOnDiskMapOutput(jobConf, fs, + mergeManager.onDiskMapOutputs.iterator().next(), keys, values); assertThat(keys).isEqualTo(Arrays.asList("apple", "apple", "banana", "banana", "carrot", "carrot")); assertThat(values).isEqualTo(Arrays.asList("awesome", "disgusting", @@ -227,7 +247,8 @@ public class TestMerger { private void readOnDiskMapOutput(Configuration conf, FileSystem fs, Path path, List keys, List values) throws IOException { - FSDataInputStream in = CryptoUtils.wrapIfNecessary(conf, fs.open(path)); + FSDataInputStream in = + IntermediateEncryptedStream.wrapIfNecessary(conf, fs.open(path), path); IFile.Reader reader = new IFile.Reader(conf, in, fs.getFileStatus(path).getLen(), null, null); @@ -257,14 +278,16 @@ public class TestMerger { @SuppressWarnings( { "unchecked" }) public void testMergeShouldReturnProperProgress( List> segments) throws IOException { - Path tmpDir = new Path("localpath"); + + Path tmpDir = new Path(jobConf.get("mapreduce.cluster.temp.dir"), + "localpath"); Class keyClass = (Class) jobConf.getMapOutputKeyClass(); Class valueClass = (Class) jobConf.getMapOutputValueClass(); RawComparator comparator = jobConf.getOutputKeyComparator(); Counter readsCounter = new Counter(); Counter writesCounter = new Counter(); Progress mergePhase = new Progress(); - RawKeyValueIterator mergeQueue = Merger.merge(conf, fs, keyClass, + RawKeyValueIterator mergeQueue = Merger.merge(jobConf, fs, keyClass, valueClass, segments, 2, tmpDir, comparator, getReporter(), readsCounter, writesCounter, mergePhase); final float epsilon = 0.00001f; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestLocalJobSubmission.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestLocalJobSubmission.java index a3ea26e81f0..c8b6c894d0c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestLocalJobSubmission.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestLocalJobSubmission.java @@ -31,8 +31,20 @@ import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.MRConfig; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.SleepJob; +import org.apache.hadoop.mapreduce.security.IntermediateEncryptedStream; +import org.apache.hadoop.mapreduce.security.SpillCallBackPathsFinder; +import org.apache.hadoop.mapreduce.util.MRJobConfUtil; +import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.ToolRunner; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import static org.junit.Assert.*; @@ -41,8 +53,39 @@ import static org.junit.Assert.*; * -jt local -libjars */ public class TestLocalJobSubmission { - private static Path TEST_ROOT_DIR = - new Path(System.getProperty("test.build.data","/tmp")); + private static final Logger LOG = + LoggerFactory.getLogger(TestLocalJobSubmission.class); + + private static File testRootDir; + + @Rule + public TestName unitTestName = new TestName(); + private File unitTestDir; + private Path jarPath; + private Configuration config; + + @BeforeClass + public static void setupClass() throws Exception { + // setup the test root directory + testRootDir = + GenericTestUtils.setupTestRootDir(TestLocalJobSubmission.class); + } + + @Before + public void setup() throws IOException { + unitTestDir = new File(testRootDir, unitTestName.getMethodName()); + unitTestDir.mkdirs(); + config = createConfig(); + jarPath = makeJar(new Path(unitTestDir.getAbsolutePath(), "test.jar")); + } + + private Configuration createConfig() { + // Set the temp directories a subdir of the test directory. + Configuration conf = + MRJobConfUtil.setLocalDirectoriesConfigForTesting(null, unitTestDir); + conf.set(MRConfig.FRAMEWORK_NAME, "local"); + return conf; + } /** * Test the local job submission options of -jt local -libjars. @@ -51,12 +94,9 @@ public class TestLocalJobSubmission { */ @Test public void testLocalJobLibjarsOption() throws IOException { - Configuration conf = new Configuration(); - - testLocalJobLibjarsOption(conf); - - conf.setBoolean(Job.USE_WILDCARD_FOR_LIBJARS, false); - testLocalJobLibjarsOption(conf); + testLocalJobLibjarsOption(config); + config.setBoolean(Job.USE_WILDCARD_FOR_LIBJARS, false); + testLocalJobLibjarsOption(config); } /** @@ -67,8 +107,6 @@ public class TestLocalJobSubmission { */ private void testLocalJobLibjarsOption(Configuration conf) throws IOException { - Path jarPath = makeJar(new Path(TEST_ROOT_DIR, "test.jar")); - conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "hdfs://localhost:9000"); conf.set(MRConfig.FRAMEWORK_NAME, "local"); final String[] args = { @@ -79,8 +117,7 @@ public class TestLocalJobSubmission { try { res = ToolRunner.run(conf, new SleepJob(), args); } catch (Exception e) { - System.out.println("Job failed with " + e.getLocalizedMessage()); - e.printStackTrace(System.out); + LOG.error("Job failed with {}", e.getLocalizedMessage(), e); fail("Job failed"); } assertEquals("dist job res is not 0:", 0, res); @@ -93,18 +130,20 @@ public class TestLocalJobSubmission { */ @Test public void testLocalJobEncryptedIntermediateData() throws IOException { - Configuration conf = new Configuration(); - conf.set(MRConfig.FRAMEWORK_NAME, "local"); - conf.setBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA, true); + config = MRJobConfUtil.initEncryptedIntermediateConfigsForTesting(config); final String[] args = { "-m", "1", "-r", "1", "-mt", "1", "-rt", "1" }; int res = -1; try { - res = ToolRunner.run(conf, new SleepJob(), args); + SpillCallBackPathsFinder spillInjector = + (SpillCallBackPathsFinder) IntermediateEncryptedStream + .setSpillCBInjector(new SpillCallBackPathsFinder()); + res = ToolRunner.run(config, new SleepJob(), args); + Assert.assertTrue("No spill occurred", + spillInjector.getEncryptedSpilledFiles().size() > 0); } catch (Exception e) { - System.out.println("Job failed with " + e.getLocalizedMessage()); - e.printStackTrace(System.out); + LOG.error("Job failed with {}", e.getLocalizedMessage(), e); fail("Job failed"); } assertEquals("dist job res is not 0:", 0, res); @@ -116,15 +155,13 @@ public class TestLocalJobSubmission { */ @Test public void testJobMaxMapConfig() throws Exception { - Configuration conf = new Configuration(); - conf.set(MRConfig.FRAMEWORK_NAME, "local"); - conf.setInt(MRJobConfig.JOB_MAX_MAP, 0); + config.setInt(MRJobConfig.JOB_MAX_MAP, 0); final String[] args = { "-m", "1", "-r", "1", "-mt", "1", "-rt", "1" }; int res = -1; try { - res = ToolRunner.run(conf, new SleepJob(), args); + res = ToolRunner.run(config, new SleepJob(), args); fail("Job should fail"); } catch (IllegalArgumentException e) { assertTrue(e.getLocalizedMessage().contains( @@ -139,20 +176,16 @@ public class TestLocalJobSubmission { */ @Test public void testLocalJobFilesOption() throws IOException { - Path jarPath = makeJar(new Path(TEST_ROOT_DIR, "test.jar")); - - Configuration conf = new Configuration(); - conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "hdfs://localhost:9000"); - conf.set(MRConfig.FRAMEWORK_NAME, "local"); - final String[] args = - {"-jt", "local", "-files", jarPath.toString(), "-m", "1", "-r", "1", - "-mt", "1", "-rt", "1"}; + config.set(FileSystem.FS_DEFAULT_NAME_KEY, "hdfs://localhost:9000"); + final String[] args = { + "-jt", "local", "-files", jarPath.toString(), + "-m", "1", "-r", "1", "-mt", "1", "-rt", "1" + }; int res = -1; try { - res = ToolRunner.run(conf, new SleepJob(), args); + res = ToolRunner.run(config, new SleepJob(), args); } catch (Exception e) { - System.out.println("Job failed with " + e.getLocalizedMessage()); - e.printStackTrace(System.out); + LOG.error("Job failed with {}", e.getLocalizedMessage(), e); fail("Job failed"); } assertEquals("dist job res is not 0:", 0, res); @@ -165,27 +198,22 @@ public class TestLocalJobSubmission { */ @Test public void testLocalJobArchivesOption() throws IOException { - Path jarPath = makeJar(new Path(TEST_ROOT_DIR, "test.jar")); - - Configuration conf = new Configuration(); - conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "hdfs://localhost:9000"); - conf.set(MRConfig.FRAMEWORK_NAME, "local"); + config.set(FileSystem.FS_DEFAULT_NAME_KEY, "hdfs://localhost:9000"); final String[] args = {"-jt", "local", "-archives", jarPath.toString(), "-m", "1", "-r", "1", "-mt", "1", "-rt", "1"}; int res = -1; try { - res = ToolRunner.run(conf, new SleepJob(), args); + res = ToolRunner.run(config, new SleepJob(), args); } catch (Exception e) { - System.out.println("Job failed with " + e.getLocalizedMessage()); - e.printStackTrace(System.out); + LOG.error("Job failed with {}" + e.getLocalizedMessage(), e); fail("Job failed"); } assertEquals("dist job res is not 0:", 0, res); } private Path makeJar(Path p) throws IOException { - FileOutputStream fos = new FileOutputStream(new File(p.toString())); + FileOutputStream fos = new FileOutputStream(p.toString()); JarOutputStream jos = new JarOutputStream(fos); ZipEntry ze = new ZipEntry("test.jar.inside"); jos.putNextEntry(ze); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRIntermediateDataEncryption.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRIntermediateDataEncryption.java deleted file mode 100644 index fa8dacf6dd5..00000000000 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRIntermediateDataEncryption.java +++ /dev/null @@ -1,327 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.mapred; - -import java.util.Arrays; -import java.util.Collection; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.FileUtil; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.MRJobConfig; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; - -import java.io.IOException; -import java.io.OutputStream; -import java.io.OutputStreamWriter; -import java.io.Writer; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static org.junit.Assert.*; - -@SuppressWarnings(value={"unchecked", "deprecation"}) -/** - * This test tests the support for a merge operation in Hadoop. The input files - * are already sorted on the key. This test implements an external - * MapOutputCollector implementation that just copies the records to different - * partitions while maintaining the sort order in each partition. The Hadoop - * framework's merge on the reduce side will merge the partitions created to - * generate the final output which is sorted on the key. - */ -@RunWith(Parameterized.class) -public class TestMRIntermediateDataEncryption { - private static final Logger LOG = - LoggerFactory.getLogger(TestMRIntermediateDataEncryption.class); - /** - * Use urandom to avoid the YarnChild process from hanging on low entropy - * systems. - */ - private static final String JVM_SECURITY_EGD_OPT = - "-Djava.security.egd=file:/dev/./urandom"; - // Where MR job's input will reside. - private static final Path INPUT_DIR = new Path("/test/input"); - // Where output goes. - private static final Path OUTPUT = new Path("/test/output"); - private static final int NUM_LINES = 1000; - private static MiniMRClientCluster mrCluster = null; - private static MiniDFSCluster dfsCluster = null; - private static FileSystem fs = null; - private static final int NUM_NODES = 2; - - private final String testTitle; - private final int numMappers; - private final int numReducers; - private final boolean isUber; - - /** - * List of arguments to run the JunitTest. - * @return - */ - @Parameterized.Parameters( - name = "{index}: TestMRIntermediateDataEncryption.{0} .. " - + "mappers:{1}, reducers:{2}, isUber:{3})") - public static Collection getTestParameters() { - return Arrays.asList(new Object[][]{ - {"testSingleReducer", 3, 1, false}, - {"testUberMode", 3, 1, true}, - {"testMultipleMapsPerNode", 8, 1, false}, - {"testMultipleReducers", 2, 4, false} - }); - } - - /** - * Initialized the parametrized JUnit test. - * @param testName the name of the unit test to be executed. - * @param mappers number of mappers in the tests. - * @param reducers number of the reducers. - * @param uberEnabled boolean flag for isUber - */ - public TestMRIntermediateDataEncryption(String testName, int mappers, - int reducers, boolean uberEnabled) { - this.testTitle = testName; - this.numMappers = mappers; - this.numReducers = reducers; - this.isUber = uberEnabled; - } - - @BeforeClass - public static void setupClass() throws Exception { - Configuration conf = new Configuration(); - conf.setBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA, true); - - // Set the jvm arguments. - conf.set(MRJobConfig.MR_AM_ADMIN_COMMAND_OPTS, - JVM_SECURITY_EGD_OPT); - final String childJVMOpts = JVM_SECURITY_EGD_OPT - + " " + conf.get("mapred.child.java.opts", " "); - conf.set("mapred.child.java.opts", childJVMOpts); - - - // Start the mini-MR and mini-DFS clusters. - dfsCluster = new MiniDFSCluster.Builder(conf) - .numDataNodes(NUM_NODES).build(); - mrCluster = - MiniMRClientClusterFactory.create( - TestMRIntermediateDataEncryption.class, NUM_NODES, conf); - mrCluster.start(); - } - - @AfterClass - public static void tearDown() throws IOException { - if (fs != null) { - fs.close(); - } - if (mrCluster != null) { - mrCluster.stop(); - } - if (dfsCluster != null) { - dfsCluster.shutdown(); - } - } - - @Before - public void setup() throws Exception { - LOG.info("Starting TestMRIntermediateDataEncryption#{}.......", testTitle); - fs = dfsCluster.getFileSystem(); - if (fs.exists(INPUT_DIR) && !fs.delete(INPUT_DIR, true)) { - throw new IOException("Could not delete " + INPUT_DIR); - } - if (fs.exists(OUTPUT) && !fs.delete(OUTPUT, true)) { - throw new IOException("Could not delete " + OUTPUT); - } - // Generate input. - createInput(fs, numMappers, NUM_LINES); - } - - @After - public void cleanup() throws IOException { - if (fs != null) { - if (fs.exists(OUTPUT)) { - fs.delete(OUTPUT, true); - } - if (fs.exists(INPUT_DIR)) { - fs.delete(INPUT_DIR, true); - } - } - } - - @Test(timeout=600000) - public void testMerge() throws Exception { - JobConf job = new JobConf(mrCluster.getConfig()); - job.setJobName("Test"); - JobClient client = new JobClient(job); - RunningJob submittedJob = null; - FileInputFormat.setInputPaths(job, INPUT_DIR); - FileOutputFormat.setOutputPath(job, OUTPUT); - job.set("mapreduce.output.textoutputformat.separator", " "); - job.setInputFormat(TextInputFormat.class); - job.setMapOutputKeyClass(Text.class); - job.setMapOutputValueClass(Text.class); - job.setOutputKeyClass(Text.class); - job.setOutputValueClass(Text.class); - job.setMapperClass(TestMRIntermediateDataEncryption.MyMapper.class); - job.setPartitionerClass( - TestMRIntermediateDataEncryption.MyPartitioner.class); - job.setOutputFormat(TextOutputFormat.class); - job.setNumReduceTasks(numReducers); - job.setInt("mapreduce.map.maxattempts", 1); - job.setInt("mapreduce.reduce.maxattempts", 1); - job.setInt("mapred.test.num_lines", NUM_LINES); - job.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, isUber); - job.setBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA, true); - submittedJob = client.submitJob(job); - submittedJob.waitForCompletion(); - assertTrue("The submitted job is completed", submittedJob.isComplete()); - assertTrue("The submitted job is successful", submittedJob.isSuccessful()); - verifyOutput(fs, numMappers, NUM_LINES); - client.close(); - // wait for short period to cool down. - Thread.sleep(1000); - } - - private void createInput(FileSystem filesystem, int mappers, int numLines) - throws Exception { - for (int i = 0; i < mappers; i++) { - OutputStream os = - filesystem.create(new Path(INPUT_DIR, "input_" + i + ".txt")); - Writer writer = new OutputStreamWriter(os); - for (int j = 0; j < numLines; j++) { - // Create sorted key, value pairs. - int k = j + 1; - String formattedNumber = String.format("%09d", k); - writer.write(formattedNumber + " " + formattedNumber + "\n"); - } - writer.close(); - os.close(); - } - } - - private void verifyOutput(FileSystem fileSystem, - int mappers, int numLines) - throws Exception { - FSDataInputStream dis = null; - long numValidRecords = 0; - long numInvalidRecords = 0; - String prevKeyValue = "000000000"; - Path[] fileList = - FileUtil.stat2Paths(fileSystem.listStatus(OUTPUT, - new Utils.OutputFileUtils.OutputFilesFilter())); - for (Path outFile : fileList) { - try { - dis = fileSystem.open(outFile); - String record; - while((record = dis.readLine()) != null) { - // Split the line into key and value. - int blankPos = record.indexOf(" "); - String keyString = record.substring(0, blankPos); - String valueString = record.substring(blankPos+1); - // Check for sorted output and correctness of record. - if (keyString.compareTo(prevKeyValue) >= 0 - && keyString.equals(valueString)) { - prevKeyValue = keyString; - numValidRecords++; - } else { - numInvalidRecords++; - } - } - } finally { - if (dis != null) { - dis.close(); - dis = null; - } - } - } - // Make sure we got all input records in the output in sorted order. - assertEquals((long)(mappers * numLines), numValidRecords); - // Make sure there is no extraneous invalid record. - assertEquals(0, numInvalidRecords); - } - - /** - * A mapper implementation that assumes that key text contains valid integers - * in displayable form. - */ - public static class MyMapper extends MapReduceBase - implements Mapper { - private Text keyText; - private Text valueText; - - public MyMapper() { - keyText = new Text(); - valueText = new Text(); - } - - @Override - public void map(LongWritable key, Text value, - OutputCollector output, - Reporter reporter) throws IOException { - String record = value.toString(); - int blankPos = record.indexOf(" "); - keyText.set(record.substring(0, blankPos)); - valueText.set(record.substring(blankPos + 1)); - output.collect(keyText, valueText); - } - - public void close() throws IOException { - } - } - - /** - * Partitioner implementation to make sure that output is in total sorted - * order. We basically route key ranges to different reducers such that - * key values monotonically increase with the partition number. For example, - * in this test, the keys are numbers from 1 to 1000 in the form "000000001" - * to "000001000" in each input file. The keys "000000001" to "000000250" are - * routed to partition 0, "000000251" to "000000500" are routed to partition 1 - * and so on since we have 4 reducers. - */ - static class MyPartitioner implements Partitioner { - - private JobConf job; - - public MyPartitioner() { - } - - public void configure(JobConf job) { - this.job = job; - } - - public int getPartition(Text key, Text value, int numPartitions) { - int keyValue = 0; - try { - keyValue = Integer.parseInt(key.toString()); - } catch (NumberFormatException nfe) { - keyValue = 0; - } - int partitionNumber = (numPartitions * (Math.max(0, keyValue - 1))) / job - .getInt("mapred.test.num_lines", 10000); - return partitionNumber; - } - } -} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMROpportunisticMaps.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMROpportunisticMaps.java index eed731ffd37..c2a966302cf 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMROpportunisticMaps.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMROpportunisticMaps.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.util.MRJobConfUtil; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.junit.Test; @@ -79,7 +80,8 @@ public class TestMROpportunisticMaps { MiniMRClientCluster mrCluster = null; FileSystem fileSystem = null; try { - Configuration conf = new Configuration(); + Configuration conf = + MRJobConfUtil.initEncryptedIntermediateConfigsForTesting(null); // Start the mini-MR and mini-DFS clusters conf.setBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, true); conf.setBoolean(YarnConfiguration. @@ -149,7 +151,6 @@ public class TestMROpportunisticMaps { job.setInt("mapreduce.map.maxattempts", 1); job.setInt("mapreduce.reduce.maxattempts", 1); job.setInt("mapred.test.num_lines", numLines); - job.setBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA, true); try { submittedJob = client.submitJob(job); try { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMerge.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMerge.java index a9e7f64c0b8..b8a16e146e9 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMerge.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMerge.java @@ -87,12 +87,12 @@ public class TestMerge { // Run the test. runMergeTest(new JobConf(mrCluster.getConfig()), fileSystem); } finally { - if (dfsCluster != null) { - dfsCluster.shutdown(); - } if (mrCluster != null) { mrCluster.stop(); } + if (dfsCluster != null) { + dfsCluster.shutdown(); + } } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/RandomTextWriter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/RandomTextWriter.java index 0bf30c830b2..dca39dfd71f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/RandomTextWriter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/RandomTextWriter.java @@ -22,7 +22,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Date; import java.util.List; -import java.util.Random; +import java.util.concurrent.ThreadLocalRandom; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; @@ -30,7 +30,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.ClusterStatus; import org.apache.hadoop.mapred.JobClient; -import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.hadoop.util.Tool; @@ -99,6 +98,15 @@ public class RandomTextWriter extends Configured implements Tool { */ enum Counters { RECORDS_WRITTEN, BYTES_WRITTEN } + public static String generateSentenceWithRand(ThreadLocalRandom rand, + int noWords) { + StringBuffer sentence = new StringBuffer(words[rand.nextInt(words.length)]); + for (int i = 1; i < noWords; i++) { + sentence.append(" ").append(words[rand.nextInt(words.length)]); + } + return sentence.toString(); + } + static class RandomTextMapper extends Mapper { private long numBytesToWrite; @@ -106,7 +114,6 @@ public class RandomTextWriter extends Configured implements Tool { private int wordsInKeyRange; private int minWordsInValue; private int wordsInValueRange; - private Random random = new Random(); /** * Save the configuration value that we need to write the data. @@ -127,12 +134,13 @@ public class RandomTextWriter extends Configured implements Tool { public void map(Text key, Text value, Context context) throws IOException,InterruptedException { int itemCount = 0; + ThreadLocalRandom rand = ThreadLocalRandom.current(); while (numBytesToWrite > 0) { // Generate the key/value - int noWordsKey = minWordsInKey + - (wordsInKeyRange != 0 ? random.nextInt(wordsInKeyRange) : 0); - int noWordsValue = minWordsInValue + - (wordsInValueRange != 0 ? random.nextInt(wordsInValueRange) : 0); + int noWordsKey = minWordsInKey + + (wordsInKeyRange != 0 ? rand.nextInt(wordsInKeyRange) : 0); + int noWordsValue = minWordsInValue + + (wordsInValueRange != 0 ? rand.nextInt(wordsInValueRange) : 0); Text keyWords = generateSentence(noWordsKey); Text valueWords = generateSentence(noWordsValue); @@ -154,13 +162,9 @@ public class RandomTextWriter extends Configured implements Tool { } private Text generateSentence(int noWords) { - StringBuffer sentence = new StringBuffer(); - String space = " "; - for (int i=0; i < noWords; ++i) { - sentence.append(words[random.nextInt(words.length)]); - sentence.append(space); - } - return new Text(sentence.toString()); + String sentence = + generateSentenceWithRand(ThreadLocalRandom.current(), noWords); + return new Text(sentence); } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestMRIntermediateDataEncryption.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestMRIntermediateDataEncryption.java new file mode 100644 index 00000000000..79fcd4110ca --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestMRIntermediateDataEncryption.java @@ -0,0 +1,411 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapreduce; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.StringTokenizer; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicLong; + +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.MiniMRClientCluster; +import org.apache.hadoop.mapred.MiniMRClientClusterFactory; +import org.apache.hadoop.mapred.Utils; + +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer; +import org.apache.hadoop.mapreduce.security.IntermediateEncryptedStream; +import org.apache.hadoop.mapreduce.security.SpillCallBackPathsFinder; +import org.apache.hadoop.mapreduce.util.MRJobConfUtil; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.util.Time; +import org.apache.hadoop.util.ToolRunner; + +/** + * This class tests the support of Intermediate data encryption + * (Spill data encryption). + * It starts by generating random input text file ({@link RandomTextWriter}) + * using the {@link ToolRunner}. + * A wordCount job consumes the generated input. The final job is configured in + * a way to guarantee that data is spilled. + * mbs-per-map specifies the amount of data (in MBs) to generate per map. + * By default, this is twice the value of mapreduce.task.io.sort.mb + * map-tasks specifies the number of map tasks to run. + */ +@RunWith(Parameterized.class) +public class TestMRIntermediateDataEncryption { + /** + * The number of bytes generated by the input generator. + */ + public static final long TOTAL_MBS_DEFAULT = 128L; + public static final long BLOCK_SIZE_DEFAULT = 32 * 1024 * 1024L; + public static final int INPUT_GEN_NUM_THREADS = 16; + public static final long TASK_SORT_IO_MB_DEFAULT = 128L; + public static final String JOB_DIR_PATH = "jobs-data-path"; + private static final Logger LOG = + LoggerFactory.getLogger(TestMRIntermediateDataEncryption.class); + /** + * Directory of the test data. + */ + private static File testRootDir; + private static volatile BufferedWriter inputBufferedWriter; + private static Configuration commonConfig; + private static MiniDFSCluster dfsCluster; + private static MiniMRClientCluster mrCluster; + private static FileSystem fs; + private static Path jobInputDirPath; + private static long inputFileSize; + /** + * Test parameters. + */ + private final String testTitleName; + private final int numMappers; + private final int numReducers; + private final boolean isUber; + private Configuration config; + private Path jobOutputPath; + + /** + * Initialized the parametrized JUnit test. + * @param testName the name of the unit test to be executed. + * @param mappers number of mappers in the tests. + * @param reducers number of the reducers. + * @param uberEnabled boolean flag for isUber + */ + public TestMRIntermediateDataEncryption(String testName, int mappers, + int reducers, boolean uberEnabled) { + this.testTitleName = testName; + this.numMappers = mappers; + this.numReducers = reducers; + this.isUber = uberEnabled; + } + + /** + * List of arguments to run the JunitTest. + * @return + */ + @Parameterized.Parameters( + name = "{index}: TestMRIntermediateDataEncryption.{0} .. " + + "mappers:{1}, reducers:{2}, isUber:{3})") + public static Collection getTestParameters() { + return Arrays.asList(new Object[][]{ + {"testSingleReducer", 3, 1, false}, + {"testUberMode", 3, 1, true}, + {"testMultipleMapsPerNode", 8, 1, false}, + // TODO: The following configuration is commented out until + // MAPREDUCE-7325 is fixed. + // Setting multiple reducers breaks LocalJobRunner causing the + // unit test to fail. + // {"testMultipleReducers", 2, 4, false} + }); + } + + @BeforeClass + public static void setupClass() throws Exception { + // setup the test root directory + testRootDir = + GenericTestUtils.setupTestRootDir( + TestMRIntermediateDataEncryption.class); + // setup the base configurations and the clusters + final File dfsFolder = new File(testRootDir, "dfs"); + final Path jobsDirPath = new Path(JOB_DIR_PATH); + + commonConfig = createBaseConfiguration(); + dfsCluster = + new MiniDFSCluster.Builder(commonConfig, dfsFolder) + .numDataNodes(2).build(); + dfsCluster.waitActive(); + mrCluster = MiniMRClientClusterFactory.create( + TestMRIntermediateDataEncryption.class, 2, commonConfig); + mrCluster.start(); + fs = dfsCluster.getFileSystem(); + if (fs.exists(jobsDirPath) && !fs.delete(jobsDirPath, true)) { + throw new IOException("Could not delete JobsDirPath" + jobsDirPath); + } + fs.mkdirs(jobsDirPath); + jobInputDirPath = new Path(jobsDirPath, "in-dir"); + // run the input generator job. + Assert.assertEquals("Generating input should succeed", 0, + generateInputTextFile()); + } + + @AfterClass + public static void tearDown() throws IOException { + // shutdown clusters + if (mrCluster != null) { + mrCluster.stop(); + } + if (dfsCluster != null) { + dfsCluster.shutdown(); + } + // make sure that generated input file is deleted + final File textInputFile = new File(testRootDir, "input.txt"); + if (textInputFile.exists()) { + textInputFile.delete(); + } + } + + /** + * Creates a configuration object setting the common properties before + * initializing the clusters. + * @return configuration to be used as a base for the unit tests. + */ + private static Configuration createBaseConfiguration() { + // Set the jvm arguments to enable intermediate encryption. + Configuration conf = + MRJobConfUtil.initEncryptedIntermediateConfigsForTesting(null); + // Set the temp directories a subdir of the test directory. + conf = MRJobConfUtil.setLocalDirectoriesConfigForTesting(conf, testRootDir); + conf.setLong("dfs.blocksize", BLOCK_SIZE_DEFAULT); + return conf; + } + + /** + * Creates a thread safe BufferedWriter to be used among the task generators. + * @return A synchronized BufferedWriter to the input file. + * @throws IOException + */ + private static synchronized BufferedWriter getTextInputWriter() + throws IOException { + if (inputBufferedWriter == null) { + final File textInputFile = new File(testRootDir, "input.txt"); + inputBufferedWriter = new BufferedWriter(new FileWriter(textInputFile)); + } + return inputBufferedWriter; + } + + /** + * Generates input text file of size TOTAL_MBS_DEFAULT. + * It creates a total INPUT_GEN_NUM_THREADS future tasks. + * + * @return the result of the input generation. 0 for success. + * @throws Exception + */ + private static int generateInputTextFile() throws Exception { + final File textInputFile = new File(testRootDir, "input.txt"); + final AtomicLong actualWrittenBytes = new AtomicLong(0); + // create INPUT_GEN_NUM_THREADS callables + final ExecutorService executor = + Executors.newFixedThreadPool(INPUT_GEN_NUM_THREADS); + //create a list to hold the Future object associated with Callable + final List> inputGenerators = new ArrayList<>(); + final Callable callableGen = new InputGeneratorTask(); + final long startTime = Time.monotonicNow(); + for (int i = 0; i < INPUT_GEN_NUM_THREADS; i++) { + //submit Callable tasks to be executed by thread pool + Future genFutureTask = executor.submit(callableGen); + inputGenerators.add(genFutureTask); + } + for (Future genFutureTask : inputGenerators) { + // print the return value of Future, notice the output delay in console + // because Future.get() waits for task to get completed + LOG.info("Received one task. Current total bytes: {}", + actualWrittenBytes.addAndGet(genFutureTask.get())); + } + getTextInputWriter().close(); + final long endTime = Time.monotonicNow(); + LOG.info("Finished generating input. Wrote {} bytes in {} seconds", + actualWrittenBytes.get(), ((endTime - startTime) * 1.0) / 1000); + executor.shutdown(); + // copy text file to HDFS deleting the source. + fs.mkdirs(jobInputDirPath); + Path textInputPath = + fs.makeQualified(new Path(jobInputDirPath, "input.txt")); + fs.copyFromLocalFile(true, new Path(textInputFile.getAbsolutePath()), + textInputPath); + if (!fs.exists(textInputPath)) { + // the file was not generated. Fail. + return 1; + } + // update the input size. + FileStatus[] fileStatus = + fs.listStatus(textInputPath); + inputFileSize = fileStatus[0].getLen(); + LOG.info("Text input file; path: {}, size: {}", + textInputPath, inputFileSize); + return 0; + } + + @Before + public void setup() throws Exception { + LOG.info("Starting TestMRIntermediateDataEncryption#{}.......", + testTitleName); + final Path jobDirPath = new Path(JOB_DIR_PATH, testTitleName); + if (fs.exists(jobDirPath) && !fs.delete(jobDirPath, true)) { + throw new IOException("Could not delete " + jobDirPath); + } + fs.mkdirs(jobDirPath); + jobOutputPath = new Path(jobDirPath, "out-dir"); + // Set the configuration for the job. + config = new Configuration(commonConfig); + config.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, isUber); + config.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 1.0F); + // set the configuration to make sure that we get spilled files + long ioSortMb = TASK_SORT_IO_MB_DEFAULT; + config.setLong(MRJobConfig.IO_SORT_MB, ioSortMb); + long mapMb = Math.max(2 * ioSortMb, config.getInt(MRJobConfig.MAP_MEMORY_MB, + MRJobConfig.DEFAULT_MAP_MEMORY_MB)); + // make sure the map tasks will spill to disk. + config.setLong(MRJobConfig.MAP_MEMORY_MB, mapMb); + config.set(MRJobConfig.MAP_JAVA_OPTS, "-Xmx" + (mapMb - 200) + "m"); + config.setInt(MRJobConfig.NUM_MAPS, numMappers); + // max attempts have to be set to 1 when intermediate encryption is enabled. + config.setInt("mapreduce.map.maxattempts", 1); + config.setInt("mapreduce.reduce.maxattempts", 1); + } + + @Test + public void testWordCount() throws Exception { + LOG.info("........Starting main Job Driver #{} starting at {}.......", + testTitleName, Time.formatTime(System.currentTimeMillis())); + Job job = Job.getInstance(config); + job.getConfiguration().setInt(MRJobConfig.NUM_MAPS, numMappers); + job.setJarByClass(TestMRIntermediateDataEncryption.class); + job.setJobName("mr-spill-" + testTitleName); + // Mapper configuration + job.setMapperClass(TokenizerMapper.class); + job.setInputFormatClass(TextInputFormat.class); + job.setCombinerClass(LongSumReducer.class); + FileInputFormat.setMinInputSplitSize(job, + (inputFileSize + numMappers) / numMappers); + // Reducer configuration + job.setReducerClass(LongSumReducer.class); + job.setNumReduceTasks(numReducers); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(LongWritable.class); + // Set the IO paths for the job. + FileInputFormat.addInputPath(job, jobInputDirPath); + FileOutputFormat.setOutputPath(job, jobOutputPath); + SpillCallBackPathsFinder spillInjector = + (SpillCallBackPathsFinder) IntermediateEncryptedStream + .setSpillCBInjector(new SpillCallBackPathsFinder()); + StringBuilder testSummary = + new StringBuilder(String.format("%n ===== test %s summary ======", + testTitleName)); + try { + long startTime = Time.monotonicNow(); + testSummary.append(String.format("%nJob %s ended at %s", + testTitleName, Time.formatTime(System.currentTimeMillis()))); + Assert.assertTrue(job.waitForCompletion(true)); + long endTime = Time.monotonicNow(); + testSummary.append(String.format("%nJob %s ended at %s", + job.getJobName(), Time.formatTime(System.currentTimeMillis()))); + testSummary.append(String.format("%n\tThe job took %.3f seconds", + (1.0 * (endTime - startTime)) / 1000)); + long spilledRecords = + job.getCounters().findCounter(TaskCounter.SPILLED_RECORDS).getValue(); + Assert.assertFalse( + "The encrypted spilled files should not be empty.", + spillInjector.getEncryptedSpilledFiles().isEmpty()); + Assert.assertTrue("Spill records must be greater than 0", + spilledRecords > 0); + Assert.assertTrue("Job Output path [" + jobOutputPath + "] should exist", + fs.exists(jobOutputPath)); + Assert.assertTrue("Invalid access to spill file positions", + spillInjector.getInvalidSpillEntries().isEmpty()); + FileStatus[] fileStatus = + fs.listStatus(jobOutputPath, + new Utils.OutputFileUtils.OutputFilesFilter()); + for (FileStatus fStatus : fileStatus) { + long fileSize = fStatus.getLen(); + testSummary.append( + String.format("%n\tOutput file %s: %d", + fStatus.getPath(), fileSize)); + } + } finally { + testSummary.append(spillInjector.getSpilledFileReport()); + LOG.info(testSummary.toString()); + IntermediateEncryptedStream.resetSpillCBInjector(); + } + } + + /** + * A callable implementation that generates a portion of the + * TOTAL_MBS_DEFAULT into {@link #inputBufferedWriter}. + */ + static class InputGeneratorTask implements Callable { + @Override + public Long call() throws Exception { + long bytesWritten = 0; + final ThreadLocalRandom rand = ThreadLocalRandom.current(); + final long totalBytes = 1024 * 1024 * TOTAL_MBS_DEFAULT; + final long bytesPerTask = totalBytes / INPUT_GEN_NUM_THREADS; + final String newLine = System.lineSeparator(); + final BufferedWriter writer = getTextInputWriter(); + while (bytesWritten < bytesPerTask) { + String sentence = + RandomTextWriter.generateSentenceWithRand(rand, rand.nextInt(5, 20)) + .concat(newLine); + writer.write(sentence); + bytesWritten += sentence.length(); + } + writer.flush(); + LOG.info("Task {} finished. Wrote {} bytes.", + Thread.currentThread().getName(), bytesWritten); + return bytesWritten; + } + } + + /** + * A Test tokenizer Mapper. + */ + public static class TokenizerMapper + extends Mapper { + + private final static LongWritable ONE = new LongWritable(1); + private final Text word = new Text(); + + public void map(Object key, Text value, + Context context) throws IOException, InterruptedException { + StringTokenizer itr = new StringTokenizer(value.toString()); + while (itr.hasMoreTokens()) { + word.set(itr.nextToken()); + context.write(word, ONE); + } + } + } +}