MAPREDUCE-5791. Shuffle phase is slow in Windows - FadviseFileRegion::transferTo does not read disks efficiently. Contributed by Nikola Vujic.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1580994 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Chris Nauroth 2014-03-24 18:57:52 +00:00
parent f89ede8d86
commit a90a5c2452
5 changed files with 286 additions and 4 deletions

View File

@ -259,6 +259,10 @@ Release 2.4.0 - UNRELEASED
override HADOOP_ROOT_LOGGER or HADOOP_CLIENT_OPTS. (Varun Vasudev via override HADOOP_ROOT_LOGGER or HADOOP_CLIENT_OPTS. (Varun Vasudev via
vinodkv) vinodkv)
MAPREDUCE-5791. Shuffle phase is slow in Windows -
FadviseFileRegion::transferTo does not read disks efficiently.
(Nikola Vujic via cnauroth)
Release 2.3.1 - UNRELEASED Release 2.3.1 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -342,6 +342,30 @@
</description> </description>
</property> </property>
<property>
<name>mapreduce.shuffle.transferTo.allowed</name>
<value></value>
<description>This option can enable/disable using nio transferTo method in
the shuffle phase. NIO transferTo does not perform well on windows in the
shuffle phase. Thus, with this configuration property it is possible to
disable it, in which case custom transfer method will be used. Recommended
value is false when running Hadoop on Windows. For Linux, it is recommended
to set it to true. If nothing is set then the default value is false for
Windows, and true for Linux.
</description>
</property>
<property>
<name>mapreduce.shuffle.transfer.buffer.size</name>
<value>131072</value>
<description>This property is used only if
mapreduce.shuffle.transferTo.allowed is set to false. In that case,
this property defines the size of the buffer used in the buffer copy code
for the shuffle phase. The size of this buffer determines the size of the IO
requests.
</description>
</property>
<property> <property>
<name>mapreduce.reduce.markreset.buffer.percent</name> <name>mapreduce.reduce.markreset.buffer.percent</name>
<value>0.0</value> <value>0.0</value>

View File

@ -21,6 +21,8 @@ package org.apache.hadoop.mapred;
import java.io.FileDescriptor; import java.io.FileDescriptor;
import java.io.IOException; import java.io.IOException;
import java.io.RandomAccessFile; import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.WritableByteChannel; import java.nio.channels.WritableByteChannel;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -30,6 +32,8 @@ import org.apache.hadoop.io.ReadaheadPool.ReadaheadRequest;
import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.io.nativeio.NativeIO;
import org.jboss.netty.channel.DefaultFileRegion; import org.jboss.netty.channel.DefaultFileRegion;
import com.google.common.annotations.VisibleForTesting;
public class FadvisedFileRegion extends DefaultFileRegion { public class FadvisedFileRegion extends DefaultFileRegion {
private static final Log LOG = LogFactory.getLog(FadvisedFileRegion.class); private static final Log LOG = LogFactory.getLog(FadvisedFileRegion.class);
@ -39,18 +43,29 @@ public class FadvisedFileRegion extends DefaultFileRegion {
private final ReadaheadPool readaheadPool; private final ReadaheadPool readaheadPool;
private final FileDescriptor fd; private final FileDescriptor fd;
private final String identifier; private final String identifier;
private final long count;
private final long position;
private final int shuffleBufferSize;
private final boolean shuffleTransferToAllowed;
private final FileChannel fileChannel;
private ReadaheadRequest readaheadRequest; private ReadaheadRequest readaheadRequest;
public FadvisedFileRegion(RandomAccessFile file, long position, long count, public FadvisedFileRegion(RandomAccessFile file, long position, long count,
boolean manageOsCache, int readaheadLength, ReadaheadPool readaheadPool, boolean manageOsCache, int readaheadLength, ReadaheadPool readaheadPool,
String identifier) throws IOException { String identifier, int shuffleBufferSize,
boolean shuffleTransferToAllowed) throws IOException {
super(file.getChannel(), position, count); super(file.getChannel(), position, count);
this.manageOsCache = manageOsCache; this.manageOsCache = manageOsCache;
this.readaheadLength = readaheadLength; this.readaheadLength = readaheadLength;
this.readaheadPool = readaheadPool; this.readaheadPool = readaheadPool;
this.fd = file.getFD(); this.fd = file.getFD();
this.identifier = identifier; this.identifier = identifier;
this.fileChannel = file.getChannel();
this.count = count;
this.position = position;
this.shuffleBufferSize = shuffleBufferSize;
this.shuffleTransferToAllowed = shuffleTransferToAllowed;
} }
@Override @Override
@ -61,9 +76,69 @@ public class FadvisedFileRegion extends DefaultFileRegion {
getPosition() + position, readaheadLength, getPosition() + position, readaheadLength,
getPosition() + getCount(), readaheadRequest); getPosition() + getCount(), readaheadRequest);
} }
return super.transferTo(target, position);
if(this.shuffleTransferToAllowed) {
return super.transferTo(target, position);
} else {
return customShuffleTransfer(target, position);
}
} }
/**
* This method transfers data using local buffer. It transfers data from
* a disk to a local buffer in memory, and then it transfers data from the
* buffer to the target. This is used only if transferTo is disallowed in
* the configuration file. super.TransferTo does not perform well on Windows
* due to a small IO request generated. customShuffleTransfer can control
* the size of the IO requests by changing the size of the intermediate
* buffer.
*/
@VisibleForTesting
long customShuffleTransfer(WritableByteChannel target, long position)
throws IOException {
long actualCount = this.count - position;
if (actualCount < 0 || position < 0) {
throw new IllegalArgumentException(
"position out of range: " + position +
" (expected: 0 - " + (this.count - 1) + ')');
}
if (actualCount == 0) {
return 0L;
}
long trans = actualCount;
int readSize;
ByteBuffer byteBuffer = ByteBuffer.allocate(this.shuffleBufferSize);
while(trans > 0L &&
(readSize = fileChannel.read(byteBuffer, this.position+position)) > 0) {
//adjust counters and buffer limit
if(readSize < trans) {
trans -= readSize;
position += readSize;
byteBuffer.flip();
} else {
//We can read more than we need if the actualCount is not multiple
//of the byteBuffer size and file is big enough. In that case we cannot
//use flip method but we need to set buffer limit manually to trans.
byteBuffer.limit((int)trans);
byteBuffer.position(0);
position += trans;
trans = 0;
}
//write data to the target
while(byteBuffer.hasRemaining()) {
target.write(byteBuffer);
}
byteBuffer.clear();
}
return actualCount - trans;
}
@Override @Override
public void releaseExternalResources() { public void releaseExternalResources() {
if (readaheadRequest != null) { if (readaheadRequest != null) {

View File

@ -74,6 +74,7 @@ import org.apache.hadoop.metrics2.lib.MutableCounterLong;
import org.apache.hadoop.metrics2.lib.MutableGaugeInt; import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
import org.apache.hadoop.security.ssl.SSLFactory; import org.apache.hadoop.security.ssl.SSLFactory;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext; import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
@ -144,6 +145,8 @@ public class ShuffleHandler extends AuxiliaryService {
private boolean manageOsCache; private boolean manageOsCache;
private int readaheadLength; private int readaheadLength;
private int maxShuffleConnections; private int maxShuffleConnections;
private int shuffleBufferSize;
private boolean shuffleTransferToAllowed;
private ReadaheadPool readaheadPool = ReadaheadPool.getInstance(); private ReadaheadPool readaheadPool = ReadaheadPool.getInstance();
public static final String MAPREDUCE_SHUFFLE_SERVICEID = public static final String MAPREDUCE_SHUFFLE_SERVICEID =
@ -183,6 +186,17 @@ public class ShuffleHandler extends AuxiliaryService {
public static final String MAX_SHUFFLE_THREADS = "mapreduce.shuffle.max.threads"; public static final String MAX_SHUFFLE_THREADS = "mapreduce.shuffle.max.threads";
// 0 implies Netty default of 2 * number of available processors // 0 implies Netty default of 2 * number of available processors
public static final int DEFAULT_MAX_SHUFFLE_THREADS = 0; public static final int DEFAULT_MAX_SHUFFLE_THREADS = 0;
public static final String SHUFFLE_BUFFER_SIZE =
"mapreduce.shuffle.transfer.buffer.size";
public static final int DEFAULT_SHUFFLE_BUFFER_SIZE = 128 * 1024;
public static final String SHUFFLE_TRANSFERTO_ALLOWED =
"mapreduce.shuffle.transferTo.allowed";
public static final boolean DEFAULT_SHUFFLE_TRANSFERTO_ALLOWED = true;
public static final boolean WINDOWS_DEFAULT_SHUFFLE_TRANSFERTO_ALLOWED =
false;
boolean connectionKeepAliveEnabled = false; boolean connectionKeepAliveEnabled = false;
int connectionKeepAliveTimeOut; int connectionKeepAliveTimeOut;
int mapOutputMetaInfoCacheSize; int mapOutputMetaInfoCacheSize;
@ -310,6 +324,13 @@ public class ShuffleHandler extends AuxiliaryService {
if (maxShuffleThreads == 0) { if (maxShuffleThreads == 0) {
maxShuffleThreads = 2 * Runtime.getRuntime().availableProcessors(); maxShuffleThreads = 2 * Runtime.getRuntime().availableProcessors();
} }
shuffleBufferSize = conf.getInt(SHUFFLE_BUFFER_SIZE,
DEFAULT_SHUFFLE_BUFFER_SIZE);
shuffleTransferToAllowed = conf.getBoolean(SHUFFLE_TRANSFERTO_ALLOWED,
(Shell.WINDOWS)?WINDOWS_DEFAULT_SHUFFLE_TRANSFERTO_ALLOWED:
DEFAULT_SHUFFLE_TRANSFERTO_ALLOWED);
ThreadFactory bossFactory = new ThreadFactoryBuilder() ThreadFactory bossFactory = new ThreadFactoryBuilder()
.setNameFormat("ShuffleHandler Netty Boss #%d") .setNameFormat("ShuffleHandler Netty Boss #%d")
@ -746,7 +767,8 @@ public class ShuffleHandler extends AuxiliaryService {
if (ch.getPipeline().get(SslHandler.class) == null) { if (ch.getPipeline().get(SslHandler.class) == null) {
final FadvisedFileRegion partition = new FadvisedFileRegion(spill, final FadvisedFileRegion partition = new FadvisedFileRegion(spill,
info.startOffset, info.partLength, manageOsCache, readaheadLength, info.startOffset, info.partLength, manageOsCache, readaheadLength,
readaheadPool, spillfile.getAbsolutePath()); readaheadPool, spillfile.getAbsolutePath(),
shuffleBufferSize, shuffleTransferToAllowed);
writeFuture = ch.write(partition); writeFuture = ch.write(partition);
writeFuture.addListener(new ChannelFutureListener() { writeFuture.addListener(new ChannelFutureListener() {
// TODO error handling; distinguish IO/connection failures, // TODO error handling; distinguish IO/connection failures,

View File

@ -0,0 +1,157 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.channels.WritableByteChannel;
import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.StringUtils;
import org.junit.Assert;
import org.junit.Test;
public class TestFadvisedFileRegion {
private final int FILE_SIZE = 16*1024*1024;
private static final Log LOG =
LogFactory.getLog(TestFadvisedFileRegion.class);
@Test(timeout = 100000)
public void testCustomShuffleTransfer() throws IOException {
File absLogDir = new File("target",
TestFadvisedFileRegion.class.getSimpleName() +
"LocDir").getAbsoluteFile();
String testDirPath =
StringUtils.join(Path.SEPARATOR,
new String[] { absLogDir.getAbsolutePath(),
"testCustomShuffleTransfer"});
File testDir = new File(testDirPath);
testDir.mkdirs();
System.out.println(testDir.getAbsolutePath());
File inFile = new File(testDir, "fileIn.out");
File outFile = new File(testDir, "fileOut.out");
//Initialize input file
byte [] initBuff = new byte[FILE_SIZE];
Random rand = new Random();
rand.nextBytes(initBuff);
FileOutputStream out = new FileOutputStream(inFile);
try{
out.write(initBuff);
} finally {
IOUtils.cleanup(LOG, out);
}
//define position and count to read from a file region.
int position = 2*1024*1024;
int count = 4*1024*1024 - 1;
RandomAccessFile inputFile = null;
RandomAccessFile targetFile = null;
WritableByteChannel target = null;
FadvisedFileRegion fileRegion = null;
try {
inputFile = new RandomAccessFile(inFile.getAbsolutePath(), "r");
targetFile = new RandomAccessFile(outFile.getAbsolutePath(), "rw");
target = targetFile.getChannel();
Assert.assertEquals(FILE_SIZE, inputFile.length());
//create FadvisedFileRegion
fileRegion = new FadvisedFileRegion(
inputFile, position, count, false, 0, null, null, 1024, false);
//test corner cases
customShuffleTransferCornerCases(fileRegion, target, count);
long pos = 0;
long size;
while((size = fileRegion.customShuffleTransfer(target, pos)) > 0) {
pos += size;
}
//assert size
Assert.assertEquals(count, (int)pos);
Assert.assertEquals(count, targetFile.length());
} finally {
if (fileRegion != null) {
fileRegion.releaseExternalResources();
}
IOUtils.cleanup(LOG, target);
IOUtils.cleanup(LOG, targetFile);
IOUtils.cleanup(LOG, inputFile);
}
//Read the target file and verify that copy is done correctly
byte [] buff = new byte[FILE_SIZE];
FileInputStream in = new FileInputStream(outFile);
try {
int total = in.read(buff, 0, count);
Assert.assertEquals(count, total);
for(int i = 0; i < count; i++) {
Assert.assertEquals(initBuff[position+i], buff[i]);
}
} finally {
IOUtils.cleanup(LOG, in);
}
//delete files and folders
inFile.delete();
outFile.delete();
testDir.delete();
absLogDir.delete();
}
private static void customShuffleTransferCornerCases(
FadvisedFileRegion fileRegion, WritableByteChannel target, int count) {
try {
fileRegion.customShuffleTransfer(target, -1);
Assert.fail("Expected a IllegalArgumentException");
} catch (IllegalArgumentException ie) {
LOG.info("Expected - illegal argument is passed.");
} catch (Exception e) {
Assert.fail("Expected a IllegalArgumentException");
}
//test corner cases
try {
fileRegion.customShuffleTransfer(target, count + 1);
Assert.fail("Expected a IllegalArgumentException");
} catch (IllegalArgumentException ie) {
LOG.info("Expected - illegal argument is passed.");
} catch (Exception e) {
Assert.fail("Expected a IllegalArgumentException");
}
}
}