From f7545f076e36322995560e889e79b818860462b2 Mon Sep 17 00:00:00 2001 From: Siddharth Seth Date: Thu, 2 Aug 2012 21:57:42 +0000 Subject: [PATCH] Merge MAPREDUCE-3289 from trunk. Make use of fadvise in the NM's shuffle handler. (Contributed by Todd Lipcon and Siddharth Seth) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1368722 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../hadoop/mapred/FadvisedChunkedFile.java | 80 ++++++++++++++++++ .../hadoop/mapred/FadvisedFileRegion.java | 82 +++++++++++++++++++ .../apache/hadoop/mapred/ShuffleHandler.java | 52 ++++++++---- 4 files changed, 202 insertions(+), 15 deletions(-) create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedChunkedFile.java create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedFileRegion.java diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 154db319712..6fa04d5822d 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -68,6 +68,9 @@ Release 2.1.0-alpha - Unreleased MAPREDUCE-4427. Added an 'unmanaged' mode for AMs so as to ease development of new applications. (Bikas Saha via acmurthy) + MAPREDUCE-3289. Make use of fadvise in the NM's shuffle handler. + (Todd Lipcon and Siddharth Seth via sseth) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedChunkedFile.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedChunkedFile.java new file mode 100644 index 00000000000..d3b181b69b4 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedChunkedFile.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import java.io.FileDescriptor; +import java.io.IOException; +import java.io.RandomAccessFile; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.io.ReadaheadPool; +import org.apache.hadoop.io.ReadaheadPool.ReadaheadRequest; +import org.apache.hadoop.io.nativeio.NativeIO; +import org.jboss.netty.handler.stream.ChunkedFile; + +public class FadvisedChunkedFile extends ChunkedFile { + + private static final Log LOG = LogFactory.getLog(FadvisedChunkedFile.class); + + private final boolean manageOsCache; + private final int readaheadLength; + private final ReadaheadPool readaheadPool; + private final FileDescriptor fd; + private final String identifier; + + private ReadaheadRequest readaheadRequest; + + public FadvisedChunkedFile(RandomAccessFile file, long position, long count, + int chunkSize, boolean manageOsCache, int readaheadLength, + ReadaheadPool readaheadPool, String identifier) throws IOException { + super(file, position, count, chunkSize); + this.manageOsCache = manageOsCache; + this.readaheadLength = readaheadLength; + this.readaheadPool = readaheadPool; + this.fd = file.getFD(); + this.identifier = identifier; + } + + @Override + public Object nextChunk() throws Exception { + if (manageOsCache && readaheadPool != null) { + readaheadRequest = readaheadPool + .readaheadStream(identifier, fd, getCurrentOffset(), readaheadLength, + getEndOffset(), readaheadRequest); + } + return super.nextChunk(); + } + + @Override + public void close() throws Exception { + if (readaheadRequest != null) { + readaheadRequest.cancel(); + } + if (manageOsCache && getEndOffset() - getStartOffset() > 0) { + try { + NativeIO.posixFadviseIfPossible(fd, getStartOffset(), getEndOffset() + - getStartOffset(), NativeIO.POSIX_FADV_DONTNEED); + } catch (Throwable t) { + LOG.warn("Failed to manage OS cache for " + identifier, t); + } + } + super.close(); + } +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedFileRegion.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedFileRegion.java new file mode 100644 index 00000000000..6ccbe251f80 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedFileRegion.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import java.io.FileDescriptor; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.channels.WritableByteChannel; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.io.ReadaheadPool; +import org.apache.hadoop.io.ReadaheadPool.ReadaheadRequest; +import org.apache.hadoop.io.nativeio.NativeIO; +import org.jboss.netty.channel.DefaultFileRegion; + +public class FadvisedFileRegion extends DefaultFileRegion { + + private static final Log LOG = LogFactory.getLog(FadvisedFileRegion.class); + + private final boolean manageOsCache; + private final int readaheadLength; + private final ReadaheadPool readaheadPool; + private final FileDescriptor fd; + private final String identifier; + + private ReadaheadRequest readaheadRequest; + + public FadvisedFileRegion(RandomAccessFile file, long position, long count, + boolean manageOsCache, int readaheadLength, ReadaheadPool readaheadPool, + String identifier) throws IOException { + super(file.getChannel(), position, count); + this.manageOsCache = manageOsCache; + this.readaheadLength = readaheadLength; + this.readaheadPool = readaheadPool; + this.fd = file.getFD(); + this.identifier = identifier; + } + + @Override + public long transferTo(WritableByteChannel target, long position) + throws IOException { + if (manageOsCache && readaheadPool != null) { + readaheadRequest = readaheadPool.readaheadStream(identifier, fd, + getPosition() + position, readaheadLength, + getPosition() + getCount(), readaheadRequest); + } + return super.transferTo(target, position); + } + + @Override + public void releaseExternalResources() { + if (readaheadRequest != null) { + readaheadRequest.cancel(); + } + if (manageOsCache && getCount() > 0) { + try { + NativeIO.posixFadviseIfPossible(fd, getPosition(), getCount(), + NativeIO.POSIX_FADV_DONTNEED); + } catch (Throwable t) { + LOG.warn("Failed to manage OS cache for " + identifier, t); + } + } + super.releaseExternalResources(); + } +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java index a5717c99772..63da26c17be 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java @@ -55,6 +55,7 @@ import org.apache.hadoop.fs.LocalDirAllocator; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DataInputByteBuffer; import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.ReadaheadPool; import org.apache.hadoop.mapreduce.MRConfig; import org.apache.hadoop.mapreduce.security.SecureShuffleUtils; import org.apache.hadoop.security.ssl.SSLFactory; @@ -86,9 +87,7 @@ import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelPipelineFactory; import org.jboss.netty.channel.Channels; -import org.jboss.netty.channel.DefaultFileRegion; import org.jboss.netty.channel.ExceptionEvent; -import org.jboss.netty.channel.FileRegion; import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.SimpleChannelUpstreamHandler; import org.jboss.netty.channel.group.ChannelGroup; @@ -104,7 +103,6 @@ import org.jboss.netty.handler.codec.http.HttpResponseEncoder; import org.jboss.netty.handler.codec.http.HttpResponseStatus; import org.jboss.netty.handler.codec.http.QueryStringDecoder; import org.jboss.netty.handler.ssl.SslHandler; -import org.jboss.netty.handler.stream.ChunkedFile; import org.jboss.netty.handler.stream.ChunkedWriteHandler; import org.jboss.netty.util.CharsetUtil; @@ -114,6 +112,12 @@ public class ShuffleHandler extends AbstractService implements AuxServices.AuxiliaryService { private static final Log LOG = LogFactory.getLog(ShuffleHandler.class); + + public static final String SHUFFLE_MANAGE_OS_CACHE = "mapreduce.shuffle.manage.os.cache"; + public static final boolean DEFAULT_SHUFFLE_MANAGE_OS_CACHE = true; + + public static final String SHUFFLE_READAHEAD_BYTES = "mapreduce.shuffle.readahead.bytes"; + public static final int DEFAULT_SHUFFLE_READAHEAD_BYTES = 4 * 1024 * 1024; private int port; private ChannelFactory selector; @@ -121,6 +125,15 @@ public class ShuffleHandler extends AbstractService private HttpPipelineFactory pipelineFact; private int sslFileBufferSize; + /** + * Should the shuffle use posix_fadvise calls to manage the OS cache during + * sendfile + */ + private boolean manageOsCache; + private int readaheadLength; + private ReadaheadPool readaheadPool = ReadaheadPool.getInstance(); + + public static final String MAPREDUCE_SHUFFLE_SERVICEID = "mapreduce.shuffle"; @@ -242,6 +255,12 @@ public class ShuffleHandler extends AbstractService @Override public synchronized void init(Configuration conf) { + manageOsCache = conf.getBoolean(SHUFFLE_MANAGE_OS_CACHE, + DEFAULT_SHUFFLE_MANAGE_OS_CACHE); + + readaheadLength = conf.getInt(SHUFFLE_READAHEAD_BYTES, + DEFAULT_SHUFFLE_READAHEAD_BYTES); + ThreadFactory bossFactory = new ThreadFactoryBuilder() .setNameFormat("ShuffleHandler Netty Boss #%d") .build(); @@ -503,14 +522,14 @@ public class ShuffleHandler extends AbstractService base + "/file.out", conf); LOG.debug("DEBUG1 " + base + " : " + mapOutputFileName + " : " + indexFileName); - IndexRecord info = + final IndexRecord info = indexCache.getIndexInformation(mapId, reduce, indexFileName, user); final ShuffleHeader header = new ShuffleHeader(mapId, info.partLength, info.rawLength, reduce); final DataOutputBuffer dob = new DataOutputBuffer(); header.write(dob); ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength())); - File spillfile = new File(mapOutputFileName.toString()); + final File spillfile = new File(mapOutputFileName.toString()); RandomAccessFile spill; try { spill = new RandomAccessFile(spillfile, "r"); @@ -520,22 +539,25 @@ public class ShuffleHandler extends AbstractService } ChannelFuture writeFuture; if (ch.getPipeline().get(SslHandler.class) == null) { - final FileRegion partition = new DefaultFileRegion( - spill.getChannel(), info.startOffset, info.partLength); + final FadvisedFileRegion partition = new FadvisedFileRegion(spill, + info.startOffset, info.partLength, manageOsCache, readaheadLength, + readaheadPool, spillfile.getAbsolutePath()); writeFuture = ch.write(partition); writeFuture.addListener(new ChannelFutureListener() { // TODO error handling; distinguish IO/connection failures, // attribute to appropriate spill output - @Override - public void operationComplete(ChannelFuture future) { - partition.releaseExternalResources(); - } - }); + @Override + public void operationComplete(ChannelFuture future) { + partition.releaseExternalResources(); + } + }); } else { // HTTPS cannot be done with zero copy. - writeFuture = ch.write(new ChunkedFile(spill, info.startOffset, - info.partLength, - sslFileBufferSize)); + final FadvisedChunkedFile chunk = new FadvisedChunkedFile(spill, + info.startOffset, info.partLength, sslFileBufferSize, + manageOsCache, readaheadLength, readaheadPool, + spillfile.getAbsolutePath()); + writeFuture = ch.write(chunk); } metrics.shuffleConnections.incr(); metrics.shuffleOutputBytes.incr(info.partLength); // optimistic