MAPREDUCE-6166. Reducers do not validate checksum of map outputs when fetching directly to disk. (Eric Payne via gera)

(cherry picked from commit af006937e8)
(cherry picked from commit 7c7bccfc31)
(cherry picked from commit 2e9dca584ba25809d9c6269e5f9326e09f55ed99)
This commit is contained in:
Gera Shegalov 2014-12-15 19:08:59 -08:00 committed by Vinod Kumar Vavilapalli
parent 1698110acc
commit 1931fa5f4b
3 changed files with 81 additions and 3 deletions

View File

@ -15,6 +15,9 @@ Release 2.6.1 - UNRELEASED
MAPREDUCE-6237. Multiple mappers with DBInputFormat don't work because of
reusing conections. (Kannan Rajah via ozawa)
MAPREDUCE-6166. Reducers do not validate checksum of map outputs when
fetching directly to disk. (Eric Payne via gera)
Release 2.6.0 - 2014-11-18
INCOMPATIBLE CHANGES

View File

@ -24,6 +24,7 @@ import java.io.OutputStream;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -32,6 +33,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.mapred.IFileInputStream;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.MapOutputFile;
@ -52,6 +54,7 @@ class OnDiskMapOutput<K, V> extends MapOutput<K, V> {
private final MergeManagerImpl<K, V> merger;
private final OutputStream disk;
private long compressedSize;
private final Configuration conf;
public OnDiskMapOutput(TaskAttemptID mapId, TaskAttemptID reduceId,
MergeManagerImpl<K,V> merger, long size,
@ -60,7 +63,7 @@ class OnDiskMapOutput<K, V> extends MapOutput<K, V> {
int fetcher, boolean primaryMapOutput)
throws IOException {
this(mapId, reduceId, merger, size, conf, mapOutputFile, fetcher,
primaryMapOutput, FileSystem.getLocal(conf),
primaryMapOutput, FileSystem.getLocal(conf).getRaw(),
mapOutputFile.getInputFileForWrite(mapId.getTaskID(), size));
}
@ -77,6 +80,7 @@ class OnDiskMapOutput<K, V> extends MapOutput<K, V> {
this.outputPath = outputPath;
tmpOutputPath = getTempPath(outputPath, fetcher);
disk = CryptoUtils.wrapIfNecessary(conf, fs.create(tmpOutputPath));
this.conf = conf;
}
@VisibleForTesting
@ -89,13 +93,14 @@ class OnDiskMapOutput<K, V> extends MapOutput<K, V> {
long compressedLength, long decompressedLength,
ShuffleClientMetrics metrics,
Reporter reporter) throws IOException {
input = new IFileInputStream(input, compressedLength, conf);
// Copy data to local-disk
long bytesLeft = compressedLength;
try {
final int BYTES_TO_READ = 64 * 1024;
byte[] buf = new byte[BYTES_TO_READ];
while (bytesLeft > 0) {
int n = input.read(buf, 0, (int) Math.min(bytesLeft, BYTES_TO_READ));
int n = ((IFileInputStream)input).readWithChecksum(buf, 0, (int) Math.min(bytesLeft, BYTES_TO_READ));
if (n < 0) {
throw new IOException("read past end of stream reading " +
getMapId());

View File

@ -24,6 +24,7 @@ import java.lang.Void;
import java.net.HttpURLConnection;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.MapOutputFile;
@ -54,6 +55,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.IFileInputStream;
import org.apache.hadoop.mapred.IFileOutputStream;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reporter;
@ -88,6 +90,7 @@ public class TestFetcher {
final MapHost host = new MapHost("localhost", "http://localhost:8080/");
final TaskAttemptID map1ID = TaskAttemptID.forName("attempt_0_1_m_1_1");
final TaskAttemptID map2ID = TaskAttemptID.forName("attempt_0_1_m_2_1");
FileSystem fs = null;
@Rule public TestName name = new TestName();
@ -118,8 +121,11 @@ public class TestFetcher {
}
@After
public void teardown() {
public void teardown() throws IllegalArgumentException, IOException {
LOG.info("<<<< " + name.getMethodName());
if (fs != null) {
fs.delete(new Path(name.getMethodName()),true);
}
}
@Test
@ -432,6 +438,70 @@ public class TestFetcher {
verify(ss).putBackKnownMapOutput(any(MapHost.class), eq(map2ID));
}
@Test
public void testCorruptedIFile() throws Exception {
final int fetcher = 7;
Path onDiskMapOutputPath = new Path(name.getMethodName() + "/foo");
Path shuffledToDisk =
OnDiskMapOutput.getTempPath(onDiskMapOutputPath, fetcher);
fs = FileSystem.getLocal(job).getRaw();
MapOutputFile mof = mock(MapOutputFile.class);
OnDiskMapOutput<Text,Text> odmo = new OnDiskMapOutput<Text,Text>(map1ID,
id, mm, 100L, job, mof, fetcher, true, fs, onDiskMapOutputPath);
String mapData = "MAPDATA12345678901234567890";
ShuffleHeader header = new ShuffleHeader(map1ID.toString(), 14, 10, 1);
ByteArrayOutputStream bout = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bout);
IFileOutputStream ios = new IFileOutputStream(dos);
header.write(dos);
int headerSize = dos.size();
try {
ios.write(mapData.getBytes());
} finally {
ios.close();
}
int dataSize = bout.size() - headerSize;
// Ensure that the OnDiskMapOutput shuffler can successfully read the data.
MapHost host = new MapHost("TestHost", "http://test/url");
ByteArrayInputStream bin = new ByteArrayInputStream(bout.toByteArray());
try {
// Read past the shuffle header.
bin.read(new byte[headerSize], 0, headerSize);
odmo.shuffle(host, bin, dataSize, dataSize, metrics, Reporter.NULL);
} finally {
bin.close();
}
// Now corrupt the IFile data.
byte[] corrupted = bout.toByteArray();
corrupted[headerSize + (dataSize / 2)] = 0x0;
try {
bin = new ByteArrayInputStream(corrupted);
// Read past the shuffle header.
bin.read(new byte[headerSize], 0, headerSize);
odmo.shuffle(host, bin, dataSize, dataSize, metrics, Reporter.NULL);
fail("OnDiskMapOutput.shuffle didn't detect the corrupted map partition file");
} catch(ChecksumException e) {
LOG.info("The expected checksum exception was thrown.", e);
} finally {
bin.close();
}
// Ensure that the shuffled file can be read.
IFileInputStream iFin = new IFileInputStream(fs.open(shuffledToDisk), dataSize, job);
try {
iFin.read(new byte[dataSize], 0, dataSize);
} finally {
iFin.close();
}
}
@Test(timeout=10000)
public void testInterruptInMemory() throws Exception {
final int FETCHER = 2;