MAPREDUCE-4511. Add IFile readahead (ahmed via tucu)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1373671 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Alejandro Abdelnur 2012-08-15 23:11:00 +00:00
parent 3f1998b7ba
commit fc783dcc1e
7 changed files with 101 additions and 10 deletions

View File

@ -16,6 +16,8 @@ Branch-2 ( Unreleased changes )
MAPREDUCE-4157. ResourceManager should not kill apps that are well behaved MAPREDUCE-4157. ResourceManager should not kill apps that are well behaved
(Jason Lowe via bobby) (Jason Lowe via bobby)
MAPREDUCE-4511. Add IFile readahead (ahmed via tucu)
BUG FIXES BUG FIXES
MAPREDUCE-4422. YARN_APPLICATION_CLASSPATH needs a documented default value in MAPREDUCE-4422. YARN_APPLICATION_CLASSPATH needs a documented default value in

View File

@ -340,7 +340,7 @@ public class IFile {
CompressionCodec codec, CompressionCodec codec,
Counters.Counter readsCounter) throws IOException { Counters.Counter readsCounter) throws IOException {
readRecordsCounter = readsCounter; readRecordsCounter = readsCounter;
checksumIn = new IFileInputStream(in,length); checksumIn = new IFileInputStream(in,length, conf);
if (codec != null) { if (codec != null) {
decompressor = CodecPool.getDecompressor(codec); decompressor = CodecPool.getDecompressor(codec);
if (decompressor != null) { if (decompressor != null) {

View File

@ -19,13 +19,22 @@
package org.apache.hadoop.mapred; package org.apache.hadoop.mapred;
import java.io.EOFException; import java.io.EOFException;
import java.io.FileDescriptor;
import java.io.FileInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.ChecksumException; import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.HasFileDescriptor;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.ReadaheadPool;
import org.apache.hadoop.io.ReadaheadPool.ReadaheadRequest;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum;
/** /**
* A checksum input stream, used for IFiles. * A checksum input stream, used for IFiles.
@ -36,6 +45,7 @@ import org.apache.hadoop.util.DataChecksum;
public class IFileInputStream extends InputStream { public class IFileInputStream extends InputStream {
private final InputStream in; //The input stream to be verified for checksum. private final InputStream in; //The input stream to be verified for checksum.
private final FileDescriptor inFd; // the file descriptor, if it is known
private final long length; //The total length of the input file private final long length; //The total length of the input file
private final long dataLength; private final long dataLength;
private DataChecksum sum; private DataChecksum sum;
@ -44,6 +54,13 @@ public class IFileInputStream extends InputStream {
private byte csum[] = null; private byte csum[] = null;
private int checksumSize; private int checksumSize;
private ReadaheadRequest curReadahead = null;
private ReadaheadPool raPool = ReadaheadPool.getInstance();
private boolean readahead;
private int readaheadLength;
public static final Log LOG = LogFactory.getLog(IFileInputStream.class);
private boolean disableChecksumValidation = false; private boolean disableChecksumValidation = false;
/** /**
@ -51,13 +68,36 @@ public class IFileInputStream extends InputStream {
* @param in The input stream to be verified for checksum. * @param in The input stream to be verified for checksum.
* @param len The length of the input stream including checksum bytes. * @param len The length of the input stream including checksum bytes.
*/ */
public IFileInputStream(InputStream in, long len) { public IFileInputStream(InputStream in, long len, Configuration conf) {
this.in = in; this.in = in;
this.inFd = getFileDescriptorIfAvail(in);
sum = DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_CRC32, sum = DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_CRC32,
Integer.MAX_VALUE); Integer.MAX_VALUE);
checksumSize = sum.getChecksumSize(); checksumSize = sum.getChecksumSize();
length = len; length = len;
dataLength = length - checksumSize; dataLength = length - checksumSize;
conf = (conf != null) ? conf : new Configuration();
readahead = conf.getBoolean(MRConfig.MAPRED_IFILE_READAHEAD,
MRConfig.DEFAULT_MAPRED_IFILE_READAHEAD);
readaheadLength = conf.getInt(MRConfig.MAPRED_IFILE_READAHEAD_BYTES,
MRConfig.DEFAULT_MAPRED_IFILE_READAHEAD_BYTES);
doReadahead();
}
private static FileDescriptor getFileDescriptorIfAvail(InputStream in) {
FileDescriptor fd = null;
try {
if (in instanceof HasFileDescriptor) {
fd = ((HasFileDescriptor)in).getFileDescriptor();
} else if (in instanceof FileInputStream) {
fd = ((FileInputStream)in).getFD();
}
} catch (IOException e) {
LOG.info("Unable to determine FileDescriptor", e);
}
return fd;
} }
/** /**
@ -66,6 +106,10 @@ public class IFileInputStream extends InputStream {
*/ */
@Override @Override
public void close() throws IOException { public void close() throws IOException {
if (curReadahead != null) {
curReadahead.cancel();
}
if (currentOffset < dataLength) { if (currentOffset < dataLength) {
byte[] t = new byte[Math.min((int) byte[] t = new byte[Math.min((int)
(Integer.MAX_VALUE & (dataLength - currentOffset)), 32 * 1024)]; (Integer.MAX_VALUE & (dataLength - currentOffset)), 32 * 1024)];
@ -103,9 +147,20 @@ public class IFileInputStream extends InputStream {
return -1; return -1;
} }
doReadahead();
return doRead(b,off,len); return doRead(b,off,len);
} }
private void doReadahead() {
if (raPool != null && inFd != null && readahead) {
curReadahead = raPool.readaheadStream(
"ifile", inFd,
currentOffset, readaheadLength, dataLength,
curReadahead);
}
}
/** /**
* Read bytes from the stream. * Read bytes from the stream.
* At EOF, checksum is validated and sent back * At EOF, checksum is validated and sent back

View File

@ -84,4 +84,20 @@ public interface MRConfig {
"mapreduce.shuffle.ssl.enabled"; "mapreduce.shuffle.ssl.enabled";
public static final boolean SHUFFLE_SSL_ENABLED_DEFAULT = false; public static final boolean SHUFFLE_SSL_ENABLED_DEFAULT = false;
}
/**
* Configuration key to enable/disable IFile readahead.
*/
public static final String MAPRED_IFILE_READAHEAD =
"mapreduce.ifile.readahead";
public static final boolean DEFAULT_MAPRED_IFILE_READAHEAD = true;
/**
* Configuration key to set the IFile readahead length in bytes.
*/
public static final String MAPRED_IFILE_READAHEAD_BYTES =
"mapreduce.ifile.readahead.bytes";
public static final int DEFAULT_MAPRED_IFILE_READAHEAD_BYTES =
4 * 1024 * 1024;}

View File

@ -98,6 +98,8 @@ class Fetcher<K,V> extends Thread {
private volatile boolean stopped = false; private volatile boolean stopped = false;
private JobConf job;
private static boolean sslShuffle; private static boolean sslShuffle;
private static SSLFactory sslFactory; private static SSLFactory sslFactory;
@ -105,6 +107,7 @@ class Fetcher<K,V> extends Thread {
ShuffleScheduler<K,V> scheduler, MergeManager<K,V> merger, ShuffleScheduler<K,V> scheduler, MergeManager<K,V> merger,
Reporter reporter, ShuffleClientMetrics metrics, Reporter reporter, ShuffleClientMetrics metrics,
ExceptionReporter exceptionReporter, SecretKey jobTokenSecret) { ExceptionReporter exceptionReporter, SecretKey jobTokenSecret) {
this.job = job;
this.reporter = reporter; this.reporter = reporter;
this.scheduler = scheduler; this.scheduler = scheduler;
this.merger = merger; this.merger = merger;
@ -539,7 +542,7 @@ class Fetcher<K,V> extends Thread {
int decompressedLength, int decompressedLength,
int compressedLength) throws IOException { int compressedLength) throws IOException {
IFileInputStream checksumIn = IFileInputStream checksumIn =
new IFileInputStream(input, compressedLength); new IFileInputStream(input, compressedLength, job);
input = checksumIn; input = checksumIn;

View File

@ -960,6 +960,20 @@
</description> </description>
</property> </property>
<property>
<name>mapreduce.ifile.readahead</name>
<value>true</value>
<description>Configuration key to enable/disable IFile readahead.
</description>
</property>
<property>
<name>mapreduce.ifile.readahead.bytes</name>
<value>4194304</value>
<description>Configuration key to set the IFile readahead length in bytes.
</description>
</property>
<!-- Job Notification Configuration --> <!-- Job Notification Configuration -->
<!-- <!--

View File

@ -17,6 +17,7 @@
*/ */
package org.apache.hadoop.mapred; package org.apache.hadoop.mapred;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ChecksumException; import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.DataOutputBuffer;
@ -35,7 +36,7 @@ public class TestIFileStreams extends TestCase {
ifos.close(); ifos.close();
DataInputBuffer dib = new DataInputBuffer(); DataInputBuffer dib = new DataInputBuffer();
dib.reset(dob.getData(), DLEN + 4); dib.reset(dob.getData(), DLEN + 4);
IFileInputStream ifis = new IFileInputStream(dib, 104); IFileInputStream ifis = new IFileInputStream(dib, 104, new Configuration());
for (int i = 0; i < DLEN; ++i) { for (int i = 0; i < DLEN; ++i) {
assertEquals(i, ifis.read()); assertEquals(i, ifis.read());
} }
@ -54,7 +55,7 @@ public class TestIFileStreams extends TestCase {
final byte[] b = dob.getData(); final byte[] b = dob.getData();
++b[17]; ++b[17];
dib.reset(b, DLEN + 4); dib.reset(b, DLEN + 4);
IFileInputStream ifis = new IFileInputStream(dib, 104); IFileInputStream ifis = new IFileInputStream(dib, 104, new Configuration());
int i = 0; int i = 0;
try { try {
while (i < DLEN) { while (i < DLEN) {
@ -83,7 +84,7 @@ public class TestIFileStreams extends TestCase {
ifos.close(); ifos.close();
DataInputBuffer dib = new DataInputBuffer(); DataInputBuffer dib = new DataInputBuffer();
dib.reset(dob.getData(), DLEN + 4); dib.reset(dob.getData(), DLEN + 4);
IFileInputStream ifis = new IFileInputStream(dib, 100); IFileInputStream ifis = new IFileInputStream(dib, 100, new Configuration());
int i = 0; int i = 0;
try { try {
while (i < DLEN - 8) { while (i < DLEN - 8) {