diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java index 8cbae819958..ed197f25cc1 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java @@ -46,6 +46,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -131,6 +132,12 @@ import org.mortbay.jetty.HttpHeaders; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Charsets; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; +import com.google.common.cache.Weigher; import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.protobuf.ByteString; @@ -156,6 +163,9 @@ public class ShuffleHandler extends AuxiliaryService { protected static final Version CURRENT_VERSION_INFO = Version.newInstance(1, 0); + private static final String DATA_FILE_NAME = "file.out"; + private static final String INDEX_FILE_NAME = "file.out.index"; + private int port; private ChannelFactory selector; private final ChannelGroup accepted = new DefaultChannelGroup(); @@ -294,12 +304,12 @@ public class ShuffleHandler extends AuxiliaryService { private ChannelHandlerContext ctx; private String user; private Map infoMap; - private String outputBasePathStr; + private String jobId; public ReduceContext(List mapIds, int rId, ChannelHandlerContext context, String usr, Map mapOutputInfoMap, - String outputBasePath) { + String jobId) { this.mapIds = mapIds; this.reduceId = rId; @@ -319,7 +329,7 @@ public class ShuffleHandler extends AuxiliaryService { this.ctx = context; this.user = usr; this.infoMap = mapOutputInfoMap; - this.outputBasePathStr = outputBasePath; + this.jobId = jobId; } public int getReduceId() { @@ -338,8 +348,8 @@ public class ShuffleHandler extends AuxiliaryService { return infoMap; } - public String getOutputBasePathStr() { - return outputBasePathStr; + public String getJobId() { + return jobId; } public List getMapIds() { @@ -780,18 +790,63 @@ public class ShuffleHandler extends AuxiliaryService { class Shuffle extends SimpleChannelUpstreamHandler { + private static final int MAX_WEIGHT = 10 * 1024 * 1024; + private static final int EXPIRE_AFTER_ACCESS_MINUTES = 5; + private static final int ALLOWED_CONCURRENCY = 16; private final Configuration conf; private final IndexCache indexCache; private final LocalDirAllocator lDirAlloc = new LocalDirAllocator(YarnConfiguration.NM_LOCAL_DIRS); private int port; + private final LoadingCache pathCache = + CacheBuilder.newBuilder().expireAfterAccess(EXPIRE_AFTER_ACCESS_MINUTES, + TimeUnit.MINUTES).softValues().concurrencyLevel(ALLOWED_CONCURRENCY). + removalListener( + new RemovalListener() { + @Override + public void onRemoval(RemovalNotification notification) { + if (LOG.isDebugEnabled()) { + LOG.debug("PathCache Eviction: " + notification.getKey() + + ", Reason=" + notification.getCause()); + } + } + } + ).maximumWeight(MAX_WEIGHT).weigher( + new Weigher() { + @Override + public int weigh(AttemptPathIdentifier key, + AttemptPathInfo value) { + return key.jobId.length() + key.user.length() + + key.attemptId.length()+ + value.indexPath.toString().length() + + value.dataPath.toString().length(); + } + } + ).build(new CacheLoader() { + @Override + public AttemptPathInfo load(AttemptPathIdentifier key) throws + Exception { + String base = getBaseLocation(key.jobId, key.user); + String attemptBase = base + key.attemptId; + Path indexFileName = lDirAlloc.getLocalPathToRead( + attemptBase + "/" + INDEX_FILE_NAME, conf); + Path mapOutputFileName = lDirAlloc.getLocalPathToRead( + attemptBase + "/" + DATA_FILE_NAME, conf); + + if (LOG.isDebugEnabled()) { + LOG.debug("Loaded : " + key + " via loader"); + } + return new AttemptPathInfo(indexFileName, mapOutputFileName); + } + }); public Shuffle(Configuration conf) { this.conf = conf; indexCache = new IndexCache(new JobConf(conf)); this.port = conf.getInt(SHUFFLE_PORT_CONFIG_KEY, DEFAULT_SHUFFLE_PORT); } - + public void setPort(int port) { this.port = port; } @@ -908,13 +963,8 @@ public class ShuffleHandler extends AuxiliaryService { Channel ch = evt.getChannel(); String user = userRsrc.get(jobId); - // $x/$user/appcache/$appId/output/$mapId - // TODO: Once Shuffle is out of NM, this can use MR APIs to convert - // between App and Job - String outputBasePathStr = getBaseLocation(jobId, user); - try { - populateHeaders(mapIds, outputBasePathStr, user, reduceId, request, + populateHeaders(mapIds, jobId, user, reduceId, request, response, keepAliveParam, mapOutputInfoMap); } catch(IOException e) { ch.write(response); @@ -926,7 +976,7 @@ public class ShuffleHandler extends AuxiliaryService { ch.write(response); //Initialize one ReduceContext object per messageReceived call ReduceContext reduceContext = new ReduceContext(mapIds, reduceId, ctx, - user, mapOutputInfoMap, outputBasePathStr); + user, mapOutputInfoMap, jobId); for (int i = 0; i < Math.min(maxSessionOpenFiles, mapIds.size()); i++) { ChannelFuture nextMap = sendMap(reduceContext); if(nextMap == null) { @@ -957,9 +1007,8 @@ public class ShuffleHandler extends AuxiliaryService { try { MapOutputInfo info = reduceContext.getInfoMap().get(mapId); if (info == null) { - info = getMapOutputInfo(reduceContext.getOutputBasePathStr() + - mapId, mapId, reduceContext.getReduceId(), - reduceContext.getUser()); + info = getMapOutputInfo(mapId, reduceContext.getReduceId(), + reduceContext.getJobId(), reduceContext.getUser()); } nextMap = sendMapOutput( reduceContext.getCtx(), @@ -1003,46 +1052,58 @@ public class ShuffleHandler extends AuxiliaryService { return baseStr; } - protected MapOutputInfo getMapOutputInfo(String base, String mapId, - int reduce, String user) throws IOException { - // Index file - Path indexFileName = - lDirAlloc.getLocalPathToRead(base + "/file.out.index", conf); - IndexRecord info = - indexCache.getIndexInformation(mapId, reduce, indexFileName, user); - - Path mapOutputFileName = - lDirAlloc.getLocalPathToRead(base + "/file.out", conf); - if (LOG.isDebugEnabled()) { - LOG.debug(base + " : " + mapOutputFileName + " : " + indexFileName); + protected MapOutputInfo getMapOutputInfo(String mapId, int reduce, + String jobId, String user) throws IOException { + AttemptPathInfo pathInfo; + try { + AttemptPathIdentifier identifier = new AttemptPathIdentifier( + jobId, user, mapId); + pathInfo = pathCache.get(identifier); + if (LOG.isDebugEnabled()) { + LOG.debug("Retrieved pathInfo for " + identifier + + " check for corresponding loaded messages to determine whether" + + " it was loaded or cached"); + } + } catch (ExecutionException e) { + if (e.getCause() instanceof IOException) { + throw (IOException) e.getCause(); + } else { + throw new RuntimeException(e.getCause()); + } } - MapOutputInfo outputInfo = new MapOutputInfo(mapOutputFileName, info); + + IndexRecord info = + indexCache.getIndexInformation(mapId, reduce, pathInfo.indexPath, user); + + if (LOG.isDebugEnabled()) { + LOG.debug("getMapOutputInfo: jobId=" + jobId + ", mapId=" + mapId + + ",dataFile=" + pathInfo.dataPath + ", indexFile=" + + pathInfo.indexPath); + } + + MapOutputInfo outputInfo = new MapOutputInfo(pathInfo.dataPath, info); return outputInfo; } - protected void populateHeaders(List mapIds, String outputBaseStr, + protected void populateHeaders(List mapIds, String jobId, String user, int reduce, HttpRequest request, HttpResponse response, boolean keepAliveParam, Map mapOutputInfoMap) throws IOException { long contentLength = 0; for (String mapId : mapIds) { - String base = outputBaseStr + mapId; - MapOutputInfo outputInfo = getMapOutputInfo(base, mapId, reduce, user); + MapOutputInfo outputInfo = getMapOutputInfo(mapId, reduce, jobId, user); if (mapOutputInfoMap.size() < mapOutputMetaInfoCacheSize) { mapOutputInfoMap.put(mapId, outputInfo); } - // Index file - Path indexFileName = - lDirAlloc.getLocalPathToRead(base + "/file.out.index", conf); - IndexRecord info = - indexCache.getIndexInformation(mapId, reduce, indexFileName, user); + ShuffleHeader header = - new ShuffleHeader(mapId, info.partLength, info.rawLength, reduce); + new ShuffleHeader(mapId, outputInfo.indexRecord.partLength, + outputInfo.indexRecord.rawLength, reduce); DataOutputBuffer dob = new DataOutputBuffer(); header.write(dob); - contentLength += info.partLength; + contentLength += outputInfo.indexRecord.partLength; contentLength += dob.getLength(); } @@ -1215,4 +1276,64 @@ public class ShuffleHandler extends AuxiliaryService { } } } + + static class AttemptPathInfo { + // TODO Change this over to just store local dir indices, instead of the + // entire path. Far more efficient. + private final Path indexPath; + private final Path dataPath; + + public AttemptPathInfo(Path indexPath, Path dataPath) { + this.indexPath = indexPath; + this.dataPath = dataPath; + } + } + + static class AttemptPathIdentifier { + private final String jobId; + private final String user; + private final String attemptId; + + public AttemptPathIdentifier(String jobId, String user, String attemptId) { + this.jobId = jobId; + this.user = user; + this.attemptId = attemptId; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + AttemptPathIdentifier that = (AttemptPathIdentifier) o; + + if (!attemptId.equals(that.attemptId)) { + return false; + } + if (!jobId.equals(that.jobId)) { + return false; + } + + return true; + } + + @Override + public int hashCode() { + int result = jobId.hashCode(); + result = 31 * result + attemptId.hashCode(); + return result; + } + + @Override + public String toString() { + return "AttemptPathIdentifier{" + + "attemptId='" + attemptId + '\'' + + ", jobId='" + jobId + '\'' + + '}'; + } + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java index ff8920f648f..5068ea57468 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java @@ -110,8 +110,8 @@ public class TestShuffleHandler { throws IOException { } @Override - protected MapOutputInfo getMapOutputInfo(String base, String mapId, - int reduce, String user) throws IOException { + protected MapOutputInfo getMapOutputInfo(String mapId, int reduce, + String jobId, String user) throws IOException { // Do nothing. return null; } @@ -230,8 +230,8 @@ public class TestShuffleHandler { // replace the shuffle handler with one stubbed for testing return new Shuffle(conf) { @Override - protected MapOutputInfo getMapOutputInfo(String base, String mapId, - int reduce, String user) throws IOException { + protected MapOutputInfo getMapOutputInfo(String mapId, int reduce, + String jobId, String user) throws IOException { return null; } @Override @@ -325,8 +325,8 @@ public class TestShuffleHandler { // replace the shuffle handler with one stubbed for testing return new Shuffle(conf) { @Override - protected MapOutputInfo getMapOutputInfo(String base, String mapId, - int reduce, String user) throws IOException { + protected MapOutputInfo getMapOutputInfo(String mapId, int reduce, + String jobId, String user) throws IOException { return null; } @Override @@ -534,8 +534,8 @@ public class TestShuffleHandler { // replace the shuffle handler with one stubbed for testing return new Shuffle(conf) { @Override - protected MapOutputInfo getMapOutputInfo(String base, String mapId, - int reduce, String user) throws IOException { + protected MapOutputInfo getMapOutputInfo(String mapId, int reduce, + String jobId, String user) throws IOException { // Do nothing. return null; }