From 8fcad7e8e9fd8c80207d9593115901d53b3b7d42 Mon Sep 17 00:00:00 2001 From: Alejandro Abdelnur Date: Wed, 15 Aug 2012 23:08:40 +0000 Subject: [PATCH] MAPREDUCE-4511. Add IFile readahead (ahmed via tucu) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1373669 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 2 + .../java/org/apache/hadoop/mapred/IFile.java | 2 +- .../hadoop/mapred/IFileInputStream.java | 63 +++++++++++++++++-- .../org/apache/hadoop/mapreduce/MRConfig.java | 18 +++++- .../hadoop/mapreduce/task/reduce/Fetcher.java | 5 +- .../src/main/resources/mapred-default.xml | 14 +++++ .../hadoop/mapred/TestIFileStreams.java | 7 ++- 7 files changed, 101 insertions(+), 10 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 9f3ddc0ec8b..71361695b4e 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -142,6 +142,8 @@ Branch-2 ( Unreleased changes ) MAPREDUCE-4157. ResourceManager should not kill apps that are well behaved (Jason Lowe via bobby) + MAPREDUCE-4511. Add IFile readahead (ahmed via tucu) + BUG FIXES MAPREDUCE-4422. YARN_APPLICATION_CLASSPATH needs a documented default value in diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IFile.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IFile.java index 936cfc06c39..a410c975578 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IFile.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IFile.java @@ -340,7 +340,7 @@ public class IFile { CompressionCodec codec, Counters.Counter readsCounter) throws IOException { readRecordsCounter = readsCounter; - checksumIn = new IFileInputStream(in,length); + checksumIn = new IFileInputStream(in,length, conf); if (codec != null) { decompressor = CodecPool.getDecompressor(codec); if (decompressor != null) { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IFileInputStream.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IFileInputStream.java index 734b33a73f5..b171fb0e474 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IFileInputStream.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IFileInputStream.java @@ -19,13 +19,22 @@ package org.apache.hadoop.mapred; import java.io.EOFException; +import java.io.FileDescriptor; +import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; +import org.apache.hadoop.conf.Configuration; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.ChecksumException; +import org.apache.hadoop.fs.HasFileDescriptor; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.ReadaheadPool; +import org.apache.hadoop.io.ReadaheadPool.ReadaheadRequest; +import org.apache.hadoop.mapreduce.MRConfig; import org.apache.hadoop.util.DataChecksum; /** * A checksum input stream, used for IFiles. @@ -35,7 +44,8 @@ import org.apache.hadoop.util.DataChecksum; @InterfaceStability.Unstable public class IFileInputStream extends InputStream { - private final InputStream in; //The input stream to be verified for checksum. + private final InputStream in; //The input stream to be verified for checksum. + private final FileDescriptor inFd; // the file descriptor, if it is known private final long length; //The total length of the input file private final long dataLength; private DataChecksum sum; @@ -43,7 +53,14 @@ public class IFileInputStream extends InputStream { private final byte b[] = new byte[1]; private byte csum[] = null; private int checksumSize; - + + private ReadaheadRequest curReadahead = null; + private ReadaheadPool raPool = ReadaheadPool.getInstance(); + private boolean readahead; + private int readaheadLength; + + public static final Log LOG = LogFactory.getLog(IFileInputStream.class); + private boolean disableChecksumValidation = false; /** @@ -51,13 +68,36 @@ public class IFileInputStream extends InputStream { * @param in The input stream to be verified for checksum. * @param len The length of the input stream including checksum bytes. */ - public IFileInputStream(InputStream in, long len) { + public IFileInputStream(InputStream in, long len, Configuration conf) { this.in = in; + this.inFd = getFileDescriptorIfAvail(in); sum = DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_CRC32, Integer.MAX_VALUE); checksumSize = sum.getChecksumSize(); length = len; dataLength = length - checksumSize; + + conf = (conf != null) ? conf : new Configuration(); + readahead = conf.getBoolean(MRConfig.MAPRED_IFILE_READAHEAD, + MRConfig.DEFAULT_MAPRED_IFILE_READAHEAD); + readaheadLength = conf.getInt(MRConfig.MAPRED_IFILE_READAHEAD_BYTES, + MRConfig.DEFAULT_MAPRED_IFILE_READAHEAD_BYTES); + + doReadahead(); + } + + private static FileDescriptor getFileDescriptorIfAvail(InputStream in) { + FileDescriptor fd = null; + try { + if (in instanceof HasFileDescriptor) { + fd = ((HasFileDescriptor)in).getFileDescriptor(); + } else if (in instanceof FileInputStream) { + fd = ((FileInputStream)in).getFD(); + } + } catch (IOException e) { + LOG.info("Unable to determine FileDescriptor", e); + } + return fd; } /** @@ -66,6 +106,10 @@ public class IFileInputStream extends InputStream { */ @Override public void close() throws IOException { + + if (curReadahead != null) { + curReadahead.cancel(); + } if (currentOffset < dataLength) { byte[] t = new byte[Math.min((int) (Integer.MAX_VALUE & (dataLength - currentOffset)), 32 * 1024)]; @@ -102,10 +146,21 @@ public class IFileInputStream extends InputStream { if (currentOffset >= dataLength) { return -1; } - + + doReadahead(); + return doRead(b,off,len); } + private void doReadahead() { + if (raPool != null && inFd != null && readahead) { + curReadahead = raPool.readaheadStream( + "ifile", inFd, + currentOffset, readaheadLength, dataLength, + curReadahead); + } + } + /** * Read bytes from the stream. * At EOF, checksum is validated and sent back diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java index fb9a1ff6f67..d758e00483e 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java @@ -84,4 +84,20 @@ public interface MRConfig { "mapreduce.shuffle.ssl.enabled"; public static final boolean SHUFFLE_SSL_ENABLED_DEFAULT = false; -} + + /** + * Configuration key to enable/disable IFile readahead. + */ + public static final String MAPRED_IFILE_READAHEAD = + "mapreduce.ifile.readahead"; + + public static final boolean DEFAULT_MAPRED_IFILE_READAHEAD = true; + + /** + * Configuration key to set the IFile readahead length in bytes. + */ + public static final String MAPRED_IFILE_READAHEAD_BYTES = + "mapreduce.ifile.readahead.bytes"; + + public static final int DEFAULT_MAPRED_IFILE_READAHEAD_BYTES = + 4 * 1024 * 1024;} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java index 6c527ae1ce5..27c7a49069e 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java @@ -98,6 +98,8 @@ class Fetcher extends Thread { private volatile boolean stopped = false; + private JobConf job; + private static boolean sslShuffle; private static SSLFactory sslFactory; @@ -105,6 +107,7 @@ class Fetcher extends Thread { ShuffleScheduler scheduler, MergeManager merger, Reporter reporter, ShuffleClientMetrics metrics, ExceptionReporter exceptionReporter, SecretKey jobTokenSecret) { + this.job = job; this.reporter = reporter; this.scheduler = scheduler; this.merger = merger; @@ -539,7 +542,7 @@ class Fetcher extends Thread { int decompressedLength, int compressedLength) throws IOException { IFileInputStream checksumIn = - new IFileInputStream(input, compressedLength); + new IFileInputStream(input, compressedLength, job); input = checksumIn; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml index 5fee954bbaf..b2b1f061c84 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml @@ -959,6 +959,20 @@ acceptable. + + + mapreduce.ifile.readahead + true + Configuration key to enable/disable IFile readahead. + + + + + mapreduce.ifile.readahead.bytes + 4194304 + Configuration key to set the IFile readahead length in bytes. + + diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestIFileStreams.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestIFileStreams.java index f2a19f6a55f..86431e5c135 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestIFileStreams.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestIFileStreams.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.mapred; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.ChecksumException; import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.io.DataOutputBuffer; @@ -35,7 +36,7 @@ public class TestIFileStreams extends TestCase { ifos.close(); DataInputBuffer dib = new DataInputBuffer(); dib.reset(dob.getData(), DLEN + 4); - IFileInputStream ifis = new IFileInputStream(dib, 104); + IFileInputStream ifis = new IFileInputStream(dib, 104, new Configuration()); for (int i = 0; i < DLEN; ++i) { assertEquals(i, ifis.read()); } @@ -54,7 +55,7 @@ public class TestIFileStreams extends TestCase { final byte[] b = dob.getData(); ++b[17]; dib.reset(b, DLEN + 4); - IFileInputStream ifis = new IFileInputStream(dib, 104); + IFileInputStream ifis = new IFileInputStream(dib, 104, new Configuration()); int i = 0; try { while (i < DLEN) { @@ -83,7 +84,7 @@ public class TestIFileStreams extends TestCase { ifos.close(); DataInputBuffer dib = new DataInputBuffer(); dib.reset(dob.getData(), DLEN + 4); - IFileInputStream ifis = new IFileInputStream(dib, 100); + IFileInputStream ifis = new IFileInputStream(dib, 100, new Configuration()); int i = 0; try { while (i < DLEN - 8) {