diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index 2291e58d76a..0e672144693 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -259,6 +259,10 @@ Release 2.4.0 - UNRELEASED
override HADOOP_ROOT_LOGGER or HADOOP_CLIENT_OPTS. (Varun Vasudev via
vinodkv)
+ MAPREDUCE-5791. Shuffle phase is slow in Windows -
+ FadviseFileRegion::transferTo does not read disks efficiently.
+ (Nikola Vujic via cnauroth)
+
Release 2.3.1 - UNRELEASED
INCOMPATIBLE CHANGES
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
index a849347fa69..3c6bf239ae0 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
@@ -342,6 +342,30 @@
+
+ mapreduce.shuffle.transferTo.allowed
+
+ This option can enable/disable using nio transferTo method in
+ the shuffle phase. NIO transferTo does not perform well on windows in the
+ shuffle phase. Thus, with this configuration property it is possible to
+ disable it, in which case custom transfer method will be used. Recommended
+ value is false when running Hadoop on Windows. For Linux, it is recommended
+ to set it to true. If nothing is set then the default value is false for
+ Windows, and true for Linux.
+
+
+
+
+ mapreduce.shuffle.transfer.buffer.size
+ 131072
+ This property is used only if
+ mapreduce.shuffle.transferTo.allowed is set to false. In that case,
+ this property defines the size of the buffer used in the buffer copy code
+ for the shuffle phase. The size of this buffer determines the size of the IO
+ requests.
+
+
+
mapreduce.reduce.markreset.buffer.percent
0.0
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedFileRegion.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedFileRegion.java
index b9d9a1152e4..f1acfff9b25 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedFileRegion.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedFileRegion.java
@@ -21,6 +21,8 @@ package org.apache.hadoop.mapred;
import java.io.FileDescriptor;
import java.io.IOException;
import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
import java.nio.channels.WritableByteChannel;
import org.apache.commons.logging.Log;
@@ -30,6 +32,8 @@ import org.apache.hadoop.io.ReadaheadPool.ReadaheadRequest;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.jboss.netty.channel.DefaultFileRegion;
+import com.google.common.annotations.VisibleForTesting;
+
public class FadvisedFileRegion extends DefaultFileRegion {
private static final Log LOG = LogFactory.getLog(FadvisedFileRegion.class);
@@ -39,18 +43,29 @@ public class FadvisedFileRegion extends DefaultFileRegion {
private final ReadaheadPool readaheadPool;
private final FileDescriptor fd;
private final String identifier;
-
+ private final long count;
+ private final long position;
+ private final int shuffleBufferSize;
+ private final boolean shuffleTransferToAllowed;
+ private final FileChannel fileChannel;
+
private ReadaheadRequest readaheadRequest;
public FadvisedFileRegion(RandomAccessFile file, long position, long count,
boolean manageOsCache, int readaheadLength, ReadaheadPool readaheadPool,
- String identifier) throws IOException {
+ String identifier, int shuffleBufferSize,
+ boolean shuffleTransferToAllowed) throws IOException {
super(file.getChannel(), position, count);
this.manageOsCache = manageOsCache;
this.readaheadLength = readaheadLength;
this.readaheadPool = readaheadPool;
this.fd = file.getFD();
this.identifier = identifier;
+ this.fileChannel = file.getChannel();
+ this.count = count;
+ this.position = position;
+ this.shuffleBufferSize = shuffleBufferSize;
+ this.shuffleTransferToAllowed = shuffleTransferToAllowed;
}
@Override
@@ -61,9 +76,69 @@ public class FadvisedFileRegion extends DefaultFileRegion {
getPosition() + position, readaheadLength,
getPosition() + getCount(), readaheadRequest);
}
- return super.transferTo(target, position);
+
+ if(this.shuffleTransferToAllowed) {
+ return super.transferTo(target, position);
+ } else {
+ return customShuffleTransfer(target, position);
+ }
}
+ /**
+ * This method transfers data using local buffer. It transfers data from
+ * a disk to a local buffer in memory, and then it transfers data from the
+ * buffer to the target. This is used only if transferTo is disallowed in
+ * the configuration file. super.TransferTo does not perform well on Windows
+ * due to a small IO request generated. customShuffleTransfer can control
+ * the size of the IO requests by changing the size of the intermediate
+ * buffer.
+ */
+ @VisibleForTesting
+ long customShuffleTransfer(WritableByteChannel target, long position)
+ throws IOException {
+ long actualCount = this.count - position;
+ if (actualCount < 0 || position < 0) {
+ throw new IllegalArgumentException(
+ "position out of range: " + position +
+ " (expected: 0 - " + (this.count - 1) + ')');
+ }
+ if (actualCount == 0) {
+ return 0L;
+ }
+
+ long trans = actualCount;
+ int readSize;
+ ByteBuffer byteBuffer = ByteBuffer.allocate(this.shuffleBufferSize);
+
+ while(trans > 0L &&
+ (readSize = fileChannel.read(byteBuffer, this.position+position)) > 0) {
+ //adjust counters and buffer limit
+ if(readSize < trans) {
+ trans -= readSize;
+ position += readSize;
+ byteBuffer.flip();
+ } else {
+ //We can read more than we need if the actualCount is not multiple
+ //of the byteBuffer size and file is big enough. In that case we cannot
+ //use flip method but we need to set buffer limit manually to trans.
+ byteBuffer.limit((int)trans);
+ byteBuffer.position(0);
+ position += trans;
+ trans = 0;
+ }
+
+ //write data to the target
+ while(byteBuffer.hasRemaining()) {
+ target.write(byteBuffer);
+ }
+
+ byteBuffer.clear();
+ }
+
+ return actualCount - trans;
+ }
+
+
@Override
public void releaseExternalResources() {
if (readaheadRequest != null) {
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
index 6c0a74f6988..c803a7f1634 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
@@ -74,6 +74,7 @@ import org.apache.hadoop.metrics2.lib.MutableCounterLong;
import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
import org.apache.hadoop.security.ssl.SSLFactory;
import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
@@ -144,6 +145,8 @@ public class ShuffleHandler extends AuxiliaryService {
private boolean manageOsCache;
private int readaheadLength;
private int maxShuffleConnections;
+ private int shuffleBufferSize;
+ private boolean shuffleTransferToAllowed;
private ReadaheadPool readaheadPool = ReadaheadPool.getInstance();
public static final String MAPREDUCE_SHUFFLE_SERVICEID =
@@ -183,6 +186,17 @@ public class ShuffleHandler extends AuxiliaryService {
public static final String MAX_SHUFFLE_THREADS = "mapreduce.shuffle.max.threads";
// 0 implies Netty default of 2 * number of available processors
public static final int DEFAULT_MAX_SHUFFLE_THREADS = 0;
+
+ public static final String SHUFFLE_BUFFER_SIZE =
+ "mapreduce.shuffle.transfer.buffer.size";
+ public static final int DEFAULT_SHUFFLE_BUFFER_SIZE = 128 * 1024;
+
+ public static final String SHUFFLE_TRANSFERTO_ALLOWED =
+ "mapreduce.shuffle.transferTo.allowed";
+ public static final boolean DEFAULT_SHUFFLE_TRANSFERTO_ALLOWED = true;
+ public static final boolean WINDOWS_DEFAULT_SHUFFLE_TRANSFERTO_ALLOWED =
+ false;
+
boolean connectionKeepAliveEnabled = false;
int connectionKeepAliveTimeOut;
int mapOutputMetaInfoCacheSize;
@@ -310,6 +324,13 @@ public class ShuffleHandler extends AuxiliaryService {
if (maxShuffleThreads == 0) {
maxShuffleThreads = 2 * Runtime.getRuntime().availableProcessors();
}
+
+ shuffleBufferSize = conf.getInt(SHUFFLE_BUFFER_SIZE,
+ DEFAULT_SHUFFLE_BUFFER_SIZE);
+
+ shuffleTransferToAllowed = conf.getBoolean(SHUFFLE_TRANSFERTO_ALLOWED,
+ (Shell.WINDOWS)?WINDOWS_DEFAULT_SHUFFLE_TRANSFERTO_ALLOWED:
+ DEFAULT_SHUFFLE_TRANSFERTO_ALLOWED);
ThreadFactory bossFactory = new ThreadFactoryBuilder()
.setNameFormat("ShuffleHandler Netty Boss #%d")
@@ -746,7 +767,8 @@ public class ShuffleHandler extends AuxiliaryService {
if (ch.getPipeline().get(SslHandler.class) == null) {
final FadvisedFileRegion partition = new FadvisedFileRegion(spill,
info.startOffset, info.partLength, manageOsCache, readaheadLength,
- readaheadPool, spillfile.getAbsolutePath());
+ readaheadPool, spillfile.getAbsolutePath(),
+ shuffleBufferSize, shuffleTransferToAllowed);
writeFuture = ch.write(partition);
writeFuture.addListener(new ChannelFutureListener() {
// TODO error handling; distinguish IO/connection failures,
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestFadvisedFileRegion.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestFadvisedFileRegion.java
new file mode 100644
index 00000000000..13d260ff7f2
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestFadvisedFileRegion.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.channels.WritableByteChannel;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestFadvisedFileRegion {
+ private final int FILE_SIZE = 16*1024*1024;
+ private static final Log LOG =
+ LogFactory.getLog(TestFadvisedFileRegion.class);
+
+ @Test(timeout = 100000)
+ public void testCustomShuffleTransfer() throws IOException {
+ File absLogDir = new File("target",
+ TestFadvisedFileRegion.class.getSimpleName() +
+ "LocDir").getAbsoluteFile();
+
+ String testDirPath =
+ StringUtils.join(Path.SEPARATOR,
+ new String[] { absLogDir.getAbsolutePath(),
+ "testCustomShuffleTransfer"});
+ File testDir = new File(testDirPath);
+ testDir.mkdirs();
+
+ System.out.println(testDir.getAbsolutePath());
+
+ File inFile = new File(testDir, "fileIn.out");
+ File outFile = new File(testDir, "fileOut.out");
+
+
+ //Initialize input file
+ byte [] initBuff = new byte[FILE_SIZE];
+ Random rand = new Random();
+ rand.nextBytes(initBuff);
+
+ FileOutputStream out = new FileOutputStream(inFile);
+ try{
+ out.write(initBuff);
+ } finally {
+ IOUtils.cleanup(LOG, out);
+ }
+
+
+ //define position and count to read from a file region.
+ int position = 2*1024*1024;
+ int count = 4*1024*1024 - 1;
+
+ RandomAccessFile inputFile = null;
+ RandomAccessFile targetFile = null;
+ WritableByteChannel target = null;
+ FadvisedFileRegion fileRegion = null;
+
+ try {
+ inputFile = new RandomAccessFile(inFile.getAbsolutePath(), "r");
+ targetFile = new RandomAccessFile(outFile.getAbsolutePath(), "rw");
+ target = targetFile.getChannel();
+
+ Assert.assertEquals(FILE_SIZE, inputFile.length());
+
+ //create FadvisedFileRegion
+ fileRegion = new FadvisedFileRegion(
+ inputFile, position, count, false, 0, null, null, 1024, false);
+
+ //test corner cases
+ customShuffleTransferCornerCases(fileRegion, target, count);
+
+ long pos = 0;
+ long size;
+ while((size = fileRegion.customShuffleTransfer(target, pos)) > 0) {
+ pos += size;
+ }
+
+ //assert size
+ Assert.assertEquals(count, (int)pos);
+ Assert.assertEquals(count, targetFile.length());
+ } finally {
+ if (fileRegion != null) {
+ fileRegion.releaseExternalResources();
+ }
+ IOUtils.cleanup(LOG, target);
+ IOUtils.cleanup(LOG, targetFile);
+ IOUtils.cleanup(LOG, inputFile);
+ }
+
+ //Read the target file and verify that copy is done correctly
+ byte [] buff = new byte[FILE_SIZE];
+ FileInputStream in = new FileInputStream(outFile);
+ try {
+ int total = in.read(buff, 0, count);
+
+ Assert.assertEquals(count, total);
+
+ for(int i = 0; i < count; i++) {
+ Assert.assertEquals(initBuff[position+i], buff[i]);
+ }
+ } finally {
+ IOUtils.cleanup(LOG, in);
+ }
+
+ //delete files and folders
+ inFile.delete();
+ outFile.delete();
+ testDir.delete();
+ absLogDir.delete();
+ }
+
+ private static void customShuffleTransferCornerCases(
+ FadvisedFileRegion fileRegion, WritableByteChannel target, int count) {
+ try {
+ fileRegion.customShuffleTransfer(target, -1);
+ Assert.fail("Expected a IllegalArgumentException");
+ } catch (IllegalArgumentException ie) {
+ LOG.info("Expected - illegal argument is passed.");
+ } catch (Exception e) {
+ Assert.fail("Expected a IllegalArgumentException");
+ }
+
+ //test corner cases
+ try {
+ fileRegion.customShuffleTransfer(target, count + 1);
+ Assert.fail("Expected a IllegalArgumentException");
+ } catch (IllegalArgumentException ie) {
+ LOG.info("Expected - illegal argument is passed.");
+ } catch (Exception e) {
+ Assert.fail("Expected a IllegalArgumentException");
+ }
+ }
+}