MAPREDUCE-6166. Reducers do not validate checksum of map outputs when fetching directly to disk. (Eric Payne via gera)

(cherry picked from commit af006937e8)
This commit is contained in:
Gera Shegalov 2014-12-15 19:08:59 -08:00
parent 19c8f7b72c
commit 7c7bccfc31
3 changed files with 81 additions and 3 deletions

View File

@ -52,6 +52,9 @@ Release 2.7.0 - UNRELEASED
MAPREDUCE-4879. TeraOutputFormat may overwrite an existing output MAPREDUCE-4879. TeraOutputFormat may overwrite an existing output
directory. (gera) directory. (gera)
MAPREDUCE-6166. Reducers do not validate checksum of map outputs when
fetching directly to disk. (Eric Payne via gera)
Release 2.6.0 - 2014-11-18 Release 2.6.0 - 2014-11-18
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -24,6 +24,7 @@ import java.io.OutputStream;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -32,6 +33,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.mapred.IFileInputStream;
import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.MapOutputFile; import org.apache.hadoop.mapred.MapOutputFile;
@ -52,6 +54,7 @@ class OnDiskMapOutput<K, V> extends MapOutput<K, V> {
private final MergeManagerImpl<K, V> merger; private final MergeManagerImpl<K, V> merger;
private final OutputStream disk; private final OutputStream disk;
private long compressedSize; private long compressedSize;
private final Configuration conf;
public OnDiskMapOutput(TaskAttemptID mapId, TaskAttemptID reduceId, public OnDiskMapOutput(TaskAttemptID mapId, TaskAttemptID reduceId,
MergeManagerImpl<K,V> merger, long size, MergeManagerImpl<K,V> merger, long size,
@ -60,7 +63,7 @@ class OnDiskMapOutput<K, V> extends MapOutput<K, V> {
int fetcher, boolean primaryMapOutput) int fetcher, boolean primaryMapOutput)
throws IOException { throws IOException {
this(mapId, reduceId, merger, size, conf, mapOutputFile, fetcher, this(mapId, reduceId, merger, size, conf, mapOutputFile, fetcher,
primaryMapOutput, FileSystem.getLocal(conf), primaryMapOutput, FileSystem.getLocal(conf).getRaw(),
mapOutputFile.getInputFileForWrite(mapId.getTaskID(), size)); mapOutputFile.getInputFileForWrite(mapId.getTaskID(), size));
} }
@ -77,6 +80,7 @@ class OnDiskMapOutput<K, V> extends MapOutput<K, V> {
this.outputPath = outputPath; this.outputPath = outputPath;
tmpOutputPath = getTempPath(outputPath, fetcher); tmpOutputPath = getTempPath(outputPath, fetcher);
disk = CryptoUtils.wrapIfNecessary(conf, fs.create(tmpOutputPath)); disk = CryptoUtils.wrapIfNecessary(conf, fs.create(tmpOutputPath));
this.conf = conf;
} }
@VisibleForTesting @VisibleForTesting
@ -89,13 +93,14 @@ class OnDiskMapOutput<K, V> extends MapOutput<K, V> {
long compressedLength, long decompressedLength, long compressedLength, long decompressedLength,
ShuffleClientMetrics metrics, ShuffleClientMetrics metrics,
Reporter reporter) throws IOException { Reporter reporter) throws IOException {
input = new IFileInputStream(input, compressedLength, conf);
// Copy data to local-disk // Copy data to local-disk
long bytesLeft = compressedLength; long bytesLeft = compressedLength;
try { try {
final int BYTES_TO_READ = 64 * 1024; final int BYTES_TO_READ = 64 * 1024;
byte[] buf = new byte[BYTES_TO_READ]; byte[] buf = new byte[BYTES_TO_READ];
while (bytesLeft > 0) { while (bytesLeft > 0) {
int n = input.read(buf, 0, (int) Math.min(bytesLeft, BYTES_TO_READ)); int n = ((IFileInputStream)input).readWithChecksum(buf, 0, (int) Math.min(bytesLeft, BYTES_TO_READ));
if (n < 0) { if (n < 0) {
throw new IOException("read past end of stream reading " + throw new IOException("read past end of stream reading " +
getMapId()); getMapId());

View File

@ -24,6 +24,7 @@ import java.lang.Void;
import java.net.HttpURLConnection; import java.net.HttpURLConnection;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.MapOutputFile; import org.apache.hadoop.mapred.MapOutputFile;
@ -54,6 +55,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.Counters; import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.IFileInputStream;
import org.apache.hadoop.mapred.IFileOutputStream; import org.apache.hadoop.mapred.IFileOutputStream;
import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.Reporter;
@ -88,6 +90,7 @@ public class TestFetcher {
final MapHost host = new MapHost("localhost", "http://localhost:8080/"); final MapHost host = new MapHost("localhost", "http://localhost:8080/");
final TaskAttemptID map1ID = TaskAttemptID.forName("attempt_0_1_m_1_1"); final TaskAttemptID map1ID = TaskAttemptID.forName("attempt_0_1_m_1_1");
final TaskAttemptID map2ID = TaskAttemptID.forName("attempt_0_1_m_2_1"); final TaskAttemptID map2ID = TaskAttemptID.forName("attempt_0_1_m_2_1");
FileSystem fs = null;
@Rule public TestName name = new TestName(); @Rule public TestName name = new TestName();
@ -118,8 +121,11 @@ public class TestFetcher {
} }
@After @After
public void teardown() { public void teardown() throws IllegalArgumentException, IOException {
LOG.info("<<<< " + name.getMethodName()); LOG.info("<<<< " + name.getMethodName());
if (fs != null) {
fs.delete(new Path(name.getMethodName()),true);
}
} }
@Test @Test
@ -432,6 +438,70 @@ public class TestFetcher {
verify(ss).putBackKnownMapOutput(any(MapHost.class), eq(map2ID)); verify(ss).putBackKnownMapOutput(any(MapHost.class), eq(map2ID));
} }
@Test
public void testCorruptedIFile() throws Exception {
final int fetcher = 7;
Path onDiskMapOutputPath = new Path(name.getMethodName() + "/foo");
Path shuffledToDisk =
OnDiskMapOutput.getTempPath(onDiskMapOutputPath, fetcher);
fs = FileSystem.getLocal(job).getRaw();
MapOutputFile mof = mock(MapOutputFile.class);
OnDiskMapOutput<Text,Text> odmo = new OnDiskMapOutput<Text,Text>(map1ID,
id, mm, 100L, job, mof, fetcher, true, fs, onDiskMapOutputPath);
String mapData = "MAPDATA12345678901234567890";
ShuffleHeader header = new ShuffleHeader(map1ID.toString(), 14, 10, 1);
ByteArrayOutputStream bout = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bout);
IFileOutputStream ios = new IFileOutputStream(dos);
header.write(dos);
int headerSize = dos.size();
try {
ios.write(mapData.getBytes());
} finally {
ios.close();
}
int dataSize = bout.size() - headerSize;
// Ensure that the OnDiskMapOutput shuffler can successfully read the data.
MapHost host = new MapHost("TestHost", "http://test/url");
ByteArrayInputStream bin = new ByteArrayInputStream(bout.toByteArray());
try {
// Read past the shuffle header.
bin.read(new byte[headerSize], 0, headerSize);
odmo.shuffle(host, bin, dataSize, dataSize, metrics, Reporter.NULL);
} finally {
bin.close();
}
// Now corrupt the IFile data.
byte[] corrupted = bout.toByteArray();
corrupted[headerSize + (dataSize / 2)] = 0x0;
try {
bin = new ByteArrayInputStream(corrupted);
// Read past the shuffle header.
bin.read(new byte[headerSize], 0, headerSize);
odmo.shuffle(host, bin, dataSize, dataSize, metrics, Reporter.NULL);
fail("OnDiskMapOutput.shuffle didn't detect the corrupted map partition file");
} catch(ChecksumException e) {
LOG.info("The expected checksum exception was thrown.", e);
} finally {
bin.close();
}
// Ensure that the shuffled file can be read.
IFileInputStream iFin = new IFileInputStream(fs.open(shuffledToDisk), dataSize, job);
try {
iFin.read(new byte[dataSize], 0, dataSize);
} finally {
iFin.close();
}
}
@Test(timeout=10000) @Test(timeout=10000)
public void testInterruptInMemory() throws Exception { public void testInterruptInMemory() throws Exception {
final int FETCHER = 2; final int FETCHER = 2;