From af006937e8ba82f98f468dc7375fe89c2e0a7912 Mon Sep 17 00:00:00 2001 From: Gera Shegalov Date: Mon, 15 Dec 2014 19:08:59 -0800 Subject: [PATCH] MAPREDUCE-6166. Reducers do not validate checksum of map outputs when fetching directly to disk. (Eric Payne via gera) --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../task/reduce/OnDiskMapOutput.java | 9 ++- .../mapreduce/task/reduce/TestFetcher.java | 72 ++++++++++++++++++- 3 files changed, 81 insertions(+), 3 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 191526a85b3..945d95c4785 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -279,6 +279,9 @@ Release 2.7.0 - UNRELEASED MAPREDUCE-4879. TeraOutputFormat may overwrite an existing output directory. (gera) + MAPREDUCE-6166. Reducers do not validate checksum of map outputs when + fetching directly to disk. (Eric Payne via gera) + Release 2.6.0 - 2014-11-18 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java index 6e0e92bd4df..8275fd0eba3 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java @@ -24,6 +24,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -32,6 +33,7 @@ import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.mapred.IFileInputStream; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.MapOutputFile; @@ -52,6 +54,7 @@ class OnDiskMapOutput extends MapOutput { private final MergeManagerImpl merger; private final OutputStream disk; private long compressedSize; + private final Configuration conf; public OnDiskMapOutput(TaskAttemptID mapId, TaskAttemptID reduceId, MergeManagerImpl merger, long size, @@ -60,7 +63,7 @@ public OnDiskMapOutput(TaskAttemptID mapId, TaskAttemptID reduceId, int fetcher, boolean primaryMapOutput) throws IOException { this(mapId, reduceId, merger, size, conf, mapOutputFile, fetcher, - primaryMapOutput, FileSystem.getLocal(conf), + primaryMapOutput, FileSystem.getLocal(conf).getRaw(), mapOutputFile.getInputFileForWrite(mapId.getTaskID(), size)); } @@ -77,6 +80,7 @@ public OnDiskMapOutput(TaskAttemptID mapId, TaskAttemptID reduceId, this.outputPath = outputPath; tmpOutputPath = getTempPath(outputPath, fetcher); disk = CryptoUtils.wrapIfNecessary(conf, fs.create(tmpOutputPath)); + this.conf = conf; } @VisibleForTesting @@ -89,13 +93,14 @@ public void shuffle(MapHost host, InputStream input, long compressedLength, long decompressedLength, ShuffleClientMetrics metrics, Reporter reporter) throws IOException { + input = new IFileInputStream(input, compressedLength, conf); // Copy data to local-disk long bytesLeft = compressedLength; try { final int BYTES_TO_READ = 64 * 1024; byte[] buf = new byte[BYTES_TO_READ]; while (bytesLeft > 0) { - int n = input.read(buf, 0, (int) Math.min(bytesLeft, BYTES_TO_READ)); + int n = ((IFileInputStream)input).readWithChecksum(buf, 0, (int) Math.min(bytesLeft, BYTES_TO_READ)); if (n < 0) { throw new IOException("read past end of stream reading " + getMapId()); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java index 7736c4854ff..929c0ae3352 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java @@ -24,6 +24,7 @@ import java.net.HttpURLConnection; +import org.apache.hadoop.fs.ChecksumException; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.MapOutputFile; @@ -54,6 +55,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.Counters; +import org.apache.hadoop.mapred.IFileInputStream; import org.apache.hadoop.mapred.IFileOutputStream; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.Reporter; @@ -88,6 +90,7 @@ public class TestFetcher { final MapHost host = new MapHost("localhost", "http://localhost:8080/"); final TaskAttemptID map1ID = TaskAttemptID.forName("attempt_0_1_m_1_1"); final TaskAttemptID map2ID = TaskAttemptID.forName("attempt_0_1_m_2_1"); + FileSystem fs = null; @Rule public TestName name = new TestName(); @@ -118,8 +121,11 @@ public void setup() { } @After - public void teardown() { + public void teardown() throws IllegalArgumentException, IOException { LOG.info("<<<< " + name.getMethodName()); + if (fs != null) { + fs.delete(new Path(name.getMethodName()),true); + } } @Test @@ -432,6 +438,70 @@ public void testCopyFromHostExtraBytes() throws Exception { verify(ss).putBackKnownMapOutput(any(MapHost.class), eq(map2ID)); } + @Test + public void testCorruptedIFile() throws Exception { + final int fetcher = 7; + Path onDiskMapOutputPath = new Path(name.getMethodName() + "/foo"); + Path shuffledToDisk = + OnDiskMapOutput.getTempPath(onDiskMapOutputPath, fetcher); + fs = FileSystem.getLocal(job).getRaw(); + MapOutputFile mof = mock(MapOutputFile.class); + OnDiskMapOutput odmo = new OnDiskMapOutput(map1ID, + id, mm, 100L, job, mof, fetcher, true, fs, onDiskMapOutputPath); + + String mapData = "MAPDATA12345678901234567890"; + + ShuffleHeader header = new ShuffleHeader(map1ID.toString(), 14, 10, 1); + ByteArrayOutputStream bout = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(bout); + IFileOutputStream ios = new IFileOutputStream(dos); + header.write(dos); + + int headerSize = dos.size(); + try { + ios.write(mapData.getBytes()); + } finally { + ios.close(); + } + + int dataSize = bout.size() - headerSize; + + // Ensure that the OnDiskMapOutput shuffler can successfully read the data. + MapHost host = new MapHost("TestHost", "http://test/url"); + ByteArrayInputStream bin = new ByteArrayInputStream(bout.toByteArray()); + try { + // Read past the shuffle header. + bin.read(new byte[headerSize], 0, headerSize); + odmo.shuffle(host, bin, dataSize, dataSize, metrics, Reporter.NULL); + } finally { + bin.close(); + } + + // Now corrupt the IFile data. + byte[] corrupted = bout.toByteArray(); + corrupted[headerSize + (dataSize / 2)] = 0x0; + + try { + bin = new ByteArrayInputStream(corrupted); + // Read past the shuffle header. + bin.read(new byte[headerSize], 0, headerSize); + odmo.shuffle(host, bin, dataSize, dataSize, metrics, Reporter.NULL); + fail("OnDiskMapOutput.shuffle didn't detect the corrupted map partition file"); + } catch(ChecksumException e) { + LOG.info("The expected checksum exception was thrown.", e); + } finally { + bin.close(); + } + + // Ensure that the shuffled file can be read. + IFileInputStream iFin = new IFileInputStream(fs.open(shuffledToDisk), dataSize, job); + try { + iFin.read(new byte[dataSize], 0, dataSize); + } finally { + iFin.close(); + } + } + @Test(timeout=10000) public void testInterruptInMemory() throws Exception { final int FETCHER = 2;