diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 428d37e909f..02c1f1f28c5 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -590,6 +590,9 @@ Release 2.7.2 - UNRELEASED MAPREDUCE-6442. Stack trace is missing when error occurs in client protocol provider's constructor (Chang Li via ozawa) + MAPREDUCE-6474. ShuffleHandler can possibly exhaust nodemanager file + descriptors (Kuhu Shukla via jlowe) + Release 2.7.1 - 2015-07-06 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java index ee1be23407a..7a078a87f9f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java @@ -49,6 +49,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.regex.Pattern; import javax.crypto.SecretKey; @@ -170,6 +171,7 @@ public class ShuffleHandler extends AuxiliaryService { private int maxShuffleConnections; private int shuffleBufferSize; private boolean shuffleTransferToAllowed; + private int maxSessionOpenFiles; private ReadaheadPool readaheadPool = ReadaheadPool.getInstance(); private Map userRsrc; @@ -220,6 +222,13 @@ public class ShuffleHandler extends AuxiliaryService { public static final boolean WINDOWS_DEFAULT_SHUFFLE_TRANSFERTO_ALLOWED = false; + /* the maximum number of files a single GET request can + open simultaneously during shuffle + */ + public static final String SHUFFLE_MAX_SESSION_OPEN_FILES = + "mapreduce.shuffle.max.session-open-files"; + public static final int DEFAULT_SHUFFLE_MAX_SESSION_OPEN_FILES = 3; + boolean connectionKeepAliveEnabled = false; int connectionKeepAliveTimeOut; int mapOutputMetaInfoCacheSize; @@ -248,6 +257,104 @@ public void operationComplete(ChannelFuture future) throws Exception { final ShuffleMetrics metrics; + class ReduceMapFileCount implements ChannelFutureListener { + + private ReduceContext reduceContext; + + public ReduceMapFileCount(ReduceContext rc) { + this.reduceContext = rc; + } + + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (!future.isSuccess()) { + future.getChannel().close(); + return; + } + int waitCount = this.reduceContext.getMapsToWait().decrementAndGet(); + if (waitCount == 0) { + metrics.operationComplete(future); + future.getChannel().close(); + } else { + pipelineFact.getSHUFFLE().sendMap(reduceContext); + } + } + } + + /** + * Maintain parameters per messageReceived() Netty context. + * Allows sendMapOutput calls from operationComplete() + */ + private static class ReduceContext { + + private List mapIds; + private AtomicInteger mapsToWait; + private AtomicInteger mapsToSend; + private int reduceId; + private ChannelHandlerContext ctx; + private String user; + private Map infoMap; + private String outputBasePathStr; + + public ReduceContext(List mapIds, int rId, + ChannelHandlerContext context, String usr, + Map mapOutputInfoMap, + String outputBasePath) { + + this.mapIds = mapIds; + this.reduceId = rId; + /** + * Atomic count for tracking the no. of map outputs that are yet to + * complete. Multiple futureListeners' operationComplete() can decrement + * this value asynchronously. It is used to decide when the channel should + * be closed. + */ + this.mapsToWait = new AtomicInteger(mapIds.size()); + /** + * Atomic count for tracking the no. of map outputs that have been sent. + * Multiple sendMap() calls can increment this value + * asynchronously. Used to decide which mapId should be sent next. + */ + this.mapsToSend = new AtomicInteger(0); + this.ctx = context; + this.user = usr; + this.infoMap = mapOutputInfoMap; + this.outputBasePathStr = outputBasePath; + } + + public int getReduceId() { + return reduceId; + } + + public ChannelHandlerContext getCtx() { + return ctx; + } + + public String getUser() { + return user; + } + + public Map getInfoMap() { + return infoMap; + } + + public String getOutputBasePathStr() { + return outputBasePathStr; + } + + public List getMapIds() { + return mapIds; + } + + public AtomicInteger getMapsToSend() { + return mapsToSend; + } + + public AtomicInteger getMapsToWait() { + return mapsToWait; + } + } + ShuffleHandler(MetricsSystem ms) { super("httpshuffle"); metrics = ms.register(new ShuffleMetrics()); @@ -357,6 +464,9 @@ protected void serviceInit(Configuration conf) throws Exception { (Shell.WINDOWS)?WINDOWS_DEFAULT_SHUFFLE_TRANSFERTO_ALLOWED: DEFAULT_SHUFFLE_TRANSFERTO_ALLOWED); + maxSessionOpenFiles = conf.getInt(SHUFFLE_MAX_SESSION_OPEN_FILES, + DEFAULT_SHUFFLE_MAX_SESSION_OPEN_FILES); + ThreadFactory bossFactory = new ThreadFactoryBuilder() .setNameFormat("ShuffleHandler Netty Boss #%d") .build(); @@ -638,6 +748,10 @@ public HttpPipelineFactory(Configuration conf) throws Exception { } } + public Shuffle getSHUFFLE() { + return SHUFFLE; + } + public void destroy() { if (sslFactory != null) { sslFactory.destroy(); @@ -809,31 +923,62 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt) return; } ch.write(response); - // TODO refactor the following into the pipeline - ChannelFuture lastMap = null; - for (String mapId : mapIds) { - try { - MapOutputInfo info = mapOutputInfoMap.get(mapId); - if (info == null) { - info = getMapOutputInfo(outputBasePathStr + mapId, - mapId, reduceId, user); - } - lastMap = - sendMapOutput(ctx, ch, user, mapId, - reduceId, info); - if (null == lastMap) { - sendError(ctx, NOT_FOUND); - return; - } - } catch (IOException e) { - LOG.error("Shuffle error :", e); - String errorMessage = getErrorMessage(e); - sendError(ctx,errorMessage , INTERNAL_SERVER_ERROR); + //Initialize one ReduceContext object per messageReceived call + ReduceContext reduceContext = new ReduceContext(mapIds, reduceId, ctx, + user, mapOutputInfoMap, outputBasePathStr); + for (int i = 0; i < Math.min(maxSessionOpenFiles, mapIds.size()); i++) { + ChannelFuture nextMap = sendMap(reduceContext); + if(nextMap == null) { return; } } - lastMap.addListener(metrics); - lastMap.addListener(ChannelFutureListener.CLOSE); + } + + /** + * Calls sendMapOutput for the mapId pointed by ReduceContext.mapsToSend + * and increments it. This method is first called by messageReceived() + * maxSessionOpenFiles times and then on the completion of every + * sendMapOutput operation. This limits the number of open files on a node, + * which can get really large(exhausting file descriptors on the NM) if all + * sendMapOutputs are called in one go, as was done previous to this change. + * @param reduceContext used to call sendMapOutput with correct params. + * @return the ChannelFuture of the sendMapOutput, can be null. + */ + public ChannelFuture sendMap(ReduceContext reduceContext) + throws Exception { + + ChannelFuture nextMap = null; + if (reduceContext.getMapsToSend().get() < + reduceContext.getMapIds().size()) { + int nextIndex = reduceContext.getMapsToSend().getAndIncrement(); + String mapId = reduceContext.getMapIds().get(nextIndex); + + try { + MapOutputInfo info = reduceContext.getInfoMap().get(mapId); + if (info == null) { + info = getMapOutputInfo(reduceContext.getOutputBasePathStr() + + mapId, mapId, reduceContext.getReduceId(), + reduceContext.getUser()); + } + nextMap = sendMapOutput( + reduceContext.getCtx(), + reduceContext.getCtx().getChannel(), + reduceContext.getUser(), mapId, + reduceContext.getReduceId(), info); + if (null == nextMap) { + sendError(reduceContext.getCtx(), NOT_FOUND); + return null; + } + nextMap.addListener(new ReduceMapFileCount(reduceContext)); + } catch (IOException e) { + LOG.error("Shuffle error :", e); + String errorMessage = getErrorMessage(e); + sendError(reduceContext.getCtx(), errorMessage, + INTERNAL_SERVER_ERROR); + return null; + } + } + return nextMap; } private String getErrorMessage(Throwable t) { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java index bad9b2de4df..f16fe5a43ac 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java @@ -22,6 +22,7 @@ import static org.apache.hadoop.test.MetricsAsserts.getMetrics; import static org.apache.hadoop.test.MockitoMaker.make; import static org.apache.hadoop.test.MockitoMaker.stub; +import static org.junit.Assert.assertTrue; import static org.jboss.netty.buffer.ChannelBuffers.wrappedBuffer; import static org.jboss.netty.handler.codec.http.HttpResponseStatus.OK; import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1; @@ -79,18 +80,66 @@ import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelFuture; import org.jboss.netty.channel.ChannelHandlerContext; +import org.jboss.netty.channel.MessageEvent; +import org.jboss.netty.channel.AbstractChannel; import org.jboss.netty.handler.codec.http.DefaultHttpResponse; import org.jboss.netty.handler.codec.http.HttpRequest; import org.jboss.netty.handler.codec.http.HttpResponse; import org.jboss.netty.handler.codec.http.HttpResponseStatus; +import org.jboss.netty.handler.codec.http.HttpMethod; import org.junit.Assert; import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.mockito.Mockito; import org.mortbay.jetty.HttpHeaders; public class TestShuffleHandler { static final long MiB = 1024 * 1024; private static final Log LOG = LogFactory.getLog(TestShuffleHandler.class); + class MockShuffleHandler extends org.apache.hadoop.mapred.ShuffleHandler { + @Override + protected Shuffle getShuffle(final Configuration conf) { + return new Shuffle(conf) { + @Override + protected void verifyRequest(String appid, ChannelHandlerContext ctx, + HttpRequest request, HttpResponse response, URL requestUri) + throws IOException { + } + @Override + protected MapOutputInfo getMapOutputInfo(String base, String mapId, + int reduce, String user) throws IOException { + // Do nothing. + return null; + } + @Override + protected void populateHeaders(List mapIds, String jobId, + String user, int reduce, HttpRequest request, + HttpResponse response, boolean keepAliveParam, + Map infoMap) throws IOException { + // Do nothing. + } + @Override + protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx, + Channel ch, String user, String mapId, int reduce, + MapOutputInfo info) throws IOException { + + ShuffleHeader header = + new ShuffleHeader("attempt_12345_1_m_1_0", 5678, 5678, 1); + DataOutputBuffer dob = new DataOutputBuffer(); + header.write(dob); + ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength())); + dob = new DataOutputBuffer(); + for (int i = 0; i < 100; ++i) { + header.write(dob); + } + return ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength())); + } + }; + } + } + /** * Test the validation of ShuffleHandler's meta-data's serialization and * de-serialization. @@ -934,4 +983,84 @@ protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx, FileUtil.fullyDelete(absLogDir); } } + + @Test(timeout = 4000) + public void testSendMapCount() throws Exception { + final List listenerList = + new ArrayList(); + + final ChannelHandlerContext mockCtx = + Mockito.mock(ChannelHandlerContext.class); + final MessageEvent mockEvt = Mockito.mock(MessageEvent.class); + final Channel mockCh = Mockito.mock(AbstractChannel.class); + + // Mock HttpRequest and ChannelFuture + final HttpRequest mockHttpRequest = createMockHttpRequest(); + final ChannelFuture mockFuture = createMockChannelFuture(mockCh, + listenerList); + + // Mock Netty Channel Context and Channel behavior + Mockito.doReturn(mockCh).when(mockCtx).getChannel(); + Mockito.when(mockCtx.getChannel()).thenReturn(mockCh); + Mockito.doReturn(mockFuture).when(mockCh).write(Mockito.any(Object.class)); + Mockito.when(mockCh.write(Object.class)).thenReturn(mockFuture); + + //Mock MessageEvent behavior + Mockito.doReturn(mockCh).when(mockEvt).getChannel(); + Mockito.when(mockEvt.getChannel()).thenReturn(mockCh); + Mockito.doReturn(mockHttpRequest).when(mockEvt).getMessage(); + + final ShuffleHandler sh = new MockShuffleHandler(); + Configuration conf = new Configuration(); + sh.init(conf); + sh.start(); + int maxOpenFiles =conf.getInt(ShuffleHandler.SHUFFLE_MAX_SESSION_OPEN_FILES, + ShuffleHandler.DEFAULT_SHUFFLE_MAX_SESSION_OPEN_FILES); + sh.getShuffle(conf).messageReceived(mockCtx, mockEvt); + assertTrue("Number of Open files should not exceed the configured " + + "value!-Not Expected", + listenerList.size() <= maxOpenFiles); + while(!listenerList.isEmpty()) { + listenerList.remove(0).operationComplete(mockFuture); + assertTrue("Number of Open files should not exceed the configured " + + "value!-Not Expected", + listenerList.size() <= maxOpenFiles); + } + sh.close(); + } + + public ChannelFuture createMockChannelFuture(Channel mockCh, + final List listenerList) { + final ChannelFuture mockFuture = Mockito.mock(ChannelFuture.class); + Mockito.when(mockFuture.getChannel()).thenReturn(mockCh); + Mockito.doReturn(true).when(mockFuture).isSuccess(); + Mockito.doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + //Add ReduceMapFileCount listener to a list + if (invocation.getArguments()[0].getClass() == + ShuffleHandler.ReduceMapFileCount.class) + listenerList.add((ShuffleHandler.ReduceMapFileCount) + invocation.getArguments()[0]); + return null; + } + }).when(mockFuture).addListener(Mockito.any( + ShuffleHandler.ReduceMapFileCount.class)); + return mockFuture; + } + + public HttpRequest createMockHttpRequest() { + HttpRequest mockHttpRequest = Mockito.mock(HttpRequest.class); + Mockito.doReturn(HttpMethod.GET).when(mockHttpRequest).getMethod(); + Mockito.doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + String uri = "/mapOutput?job=job_12345_1&reduce=1"; + for (int i = 0; i < 100; i++) + uri = uri.concat("&map=attempt_12345_1_m_" + i + "_0"); + return uri; + } + }).when(mockHttpRequest).getUri(); + return mockHttpRequest; + } }