MAPREDUCE-6474. ShuffleHandler can possibly exhaust nodemanager file descriptors. Contributed by Kuhu Shukla

This commit is contained in:
Jason Lowe 2015-09-10 16:00:17 +00:00
parent a40342b0da
commit 8e615588d5
3 changed files with 299 additions and 22 deletions

View File

@ -590,6 +590,9 @@ Release 2.7.2 - UNRELEASED
MAPREDUCE-6442. Stack trace is missing when error occurs in client protocol
provider's constructor (Chang Li via ozawa)
MAPREDUCE-6474. ShuffleHandler can possibly exhaust nodemanager file
descriptors (Kuhu Shukla via jlowe)
Release 2.7.1 - 2015-07-06
INCOMPATIBLE CHANGES

View File

@ -49,6 +49,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
import javax.crypto.SecretKey;
@ -170,6 +171,7 @@ public class ShuffleHandler extends AuxiliaryService {
private int maxShuffleConnections;
private int shuffleBufferSize;
private boolean shuffleTransferToAllowed;
private int maxSessionOpenFiles;
private ReadaheadPool readaheadPool = ReadaheadPool.getInstance();
private Map<String,String> userRsrc;
@ -220,6 +222,13 @@ public class ShuffleHandler extends AuxiliaryService {
public static final boolean WINDOWS_DEFAULT_SHUFFLE_TRANSFERTO_ALLOWED =
false;
/* the maximum number of files a single GET request can
open simultaneously during shuffle
*/
public static final String SHUFFLE_MAX_SESSION_OPEN_FILES =
"mapreduce.shuffle.max.session-open-files";
public static final int DEFAULT_SHUFFLE_MAX_SESSION_OPEN_FILES = 3;
boolean connectionKeepAliveEnabled = false;
int connectionKeepAliveTimeOut;
int mapOutputMetaInfoCacheSize;
@ -248,6 +257,104 @@ public void operationComplete(ChannelFuture future) throws Exception {
final ShuffleMetrics metrics;
class ReduceMapFileCount implements ChannelFutureListener {
private ReduceContext reduceContext;
public ReduceMapFileCount(ReduceContext rc) {
this.reduceContext = rc;
}
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
future.getChannel().close();
return;
}
int waitCount = this.reduceContext.getMapsToWait().decrementAndGet();
if (waitCount == 0) {
metrics.operationComplete(future);
future.getChannel().close();
} else {
pipelineFact.getSHUFFLE().sendMap(reduceContext);
}
}
}
/**
* Maintain parameters per messageReceived() Netty context.
* Allows sendMapOutput calls from operationComplete()
*/
private static class ReduceContext {
private List<String> mapIds;
private AtomicInteger mapsToWait;
private AtomicInteger mapsToSend;
private int reduceId;
private ChannelHandlerContext ctx;
private String user;
private Map<String, Shuffle.MapOutputInfo> infoMap;
private String outputBasePathStr;
public ReduceContext(List<String> mapIds, int rId,
ChannelHandlerContext context, String usr,
Map<String, Shuffle.MapOutputInfo> mapOutputInfoMap,
String outputBasePath) {
this.mapIds = mapIds;
this.reduceId = rId;
/**
* Atomic count for tracking the no. of map outputs that are yet to
* complete. Multiple futureListeners' operationComplete() can decrement
* this value asynchronously. It is used to decide when the channel should
* be closed.
*/
this.mapsToWait = new AtomicInteger(mapIds.size());
/**
* Atomic count for tracking the no. of map outputs that have been sent.
* Multiple sendMap() calls can increment this value
* asynchronously. Used to decide which mapId should be sent next.
*/
this.mapsToSend = new AtomicInteger(0);
this.ctx = context;
this.user = usr;
this.infoMap = mapOutputInfoMap;
this.outputBasePathStr = outputBasePath;
}
public int getReduceId() {
return reduceId;
}
public ChannelHandlerContext getCtx() {
return ctx;
}
public String getUser() {
return user;
}
public Map<String, Shuffle.MapOutputInfo> getInfoMap() {
return infoMap;
}
public String getOutputBasePathStr() {
return outputBasePathStr;
}
public List<String> getMapIds() {
return mapIds;
}
public AtomicInteger getMapsToSend() {
return mapsToSend;
}
public AtomicInteger getMapsToWait() {
return mapsToWait;
}
}
ShuffleHandler(MetricsSystem ms) {
super("httpshuffle");
metrics = ms.register(new ShuffleMetrics());
@ -357,6 +464,9 @@ protected void serviceInit(Configuration conf) throws Exception {
(Shell.WINDOWS)?WINDOWS_DEFAULT_SHUFFLE_TRANSFERTO_ALLOWED:
DEFAULT_SHUFFLE_TRANSFERTO_ALLOWED);
maxSessionOpenFiles = conf.getInt(SHUFFLE_MAX_SESSION_OPEN_FILES,
DEFAULT_SHUFFLE_MAX_SESSION_OPEN_FILES);
ThreadFactory bossFactory = new ThreadFactoryBuilder()
.setNameFormat("ShuffleHandler Netty Boss #%d")
.build();
@ -638,6 +748,10 @@ public HttpPipelineFactory(Configuration conf) throws Exception {
}
}
public Shuffle getSHUFFLE() {
return SHUFFLE;
}
public void destroy() {
if (sslFactory != null) {
sslFactory.destroy();
@ -809,31 +923,62 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt)
return;
}
ch.write(response);
// TODO refactor the following into the pipeline
ChannelFuture lastMap = null;
for (String mapId : mapIds) {
try {
MapOutputInfo info = mapOutputInfoMap.get(mapId);
if (info == null) {
info = getMapOutputInfo(outputBasePathStr + mapId,
mapId, reduceId, user);
}
lastMap =
sendMapOutput(ctx, ch, user, mapId,
reduceId, info);
if (null == lastMap) {
sendError(ctx, NOT_FOUND);
return;
}
} catch (IOException e) {
LOG.error("Shuffle error :", e);
String errorMessage = getErrorMessage(e);
sendError(ctx,errorMessage , INTERNAL_SERVER_ERROR);
//Initialize one ReduceContext object per messageReceived call
ReduceContext reduceContext = new ReduceContext(mapIds, reduceId, ctx,
user, mapOutputInfoMap, outputBasePathStr);
for (int i = 0; i < Math.min(maxSessionOpenFiles, mapIds.size()); i++) {
ChannelFuture nextMap = sendMap(reduceContext);
if(nextMap == null) {
return;
}
}
lastMap.addListener(metrics);
lastMap.addListener(ChannelFutureListener.CLOSE);
}
/**
* Calls sendMapOutput for the mapId pointed by ReduceContext.mapsToSend
* and increments it. This method is first called by messageReceived()
* maxSessionOpenFiles times and then on the completion of every
* sendMapOutput operation. This limits the number of open files on a node,
* which can get really large(exhausting file descriptors on the NM) if all
* sendMapOutputs are called in one go, as was done previous to this change.
* @param reduceContext used to call sendMapOutput with correct params.
* @return the ChannelFuture of the sendMapOutput, can be null.
*/
public ChannelFuture sendMap(ReduceContext reduceContext)
throws Exception {
ChannelFuture nextMap = null;
if (reduceContext.getMapsToSend().get() <
reduceContext.getMapIds().size()) {
int nextIndex = reduceContext.getMapsToSend().getAndIncrement();
String mapId = reduceContext.getMapIds().get(nextIndex);
try {
MapOutputInfo info = reduceContext.getInfoMap().get(mapId);
if (info == null) {
info = getMapOutputInfo(reduceContext.getOutputBasePathStr() +
mapId, mapId, reduceContext.getReduceId(),
reduceContext.getUser());
}
nextMap = sendMapOutput(
reduceContext.getCtx(),
reduceContext.getCtx().getChannel(),
reduceContext.getUser(), mapId,
reduceContext.getReduceId(), info);
if (null == nextMap) {
sendError(reduceContext.getCtx(), NOT_FOUND);
return null;
}
nextMap.addListener(new ReduceMapFileCount(reduceContext));
} catch (IOException e) {
LOG.error("Shuffle error :", e);
String errorMessage = getErrorMessage(e);
sendError(reduceContext.getCtx(), errorMessage,
INTERNAL_SERVER_ERROR);
return null;
}
}
return nextMap;
}
private String getErrorMessage(Throwable t) {

View File

@ -22,6 +22,7 @@
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
import static org.apache.hadoop.test.MockitoMaker.make;
import static org.apache.hadoop.test.MockitoMaker.stub;
import static org.junit.Assert.assertTrue;
import static org.jboss.netty.buffer.ChannelBuffers.wrappedBuffer;
import static org.jboss.netty.handler.codec.http.HttpResponseStatus.OK;
import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1;
@ -79,18 +80,66 @@
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.AbstractChannel;
import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
import org.jboss.netty.handler.codec.http.HttpRequest;
import org.jboss.netty.handler.codec.http.HttpResponse;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.mockito.Mockito;
import org.mortbay.jetty.HttpHeaders;
public class TestShuffleHandler {
static final long MiB = 1024 * 1024;
private static final Log LOG = LogFactory.getLog(TestShuffleHandler.class);
class MockShuffleHandler extends org.apache.hadoop.mapred.ShuffleHandler {
@Override
protected Shuffle getShuffle(final Configuration conf) {
return new Shuffle(conf) {
@Override
protected void verifyRequest(String appid, ChannelHandlerContext ctx,
HttpRequest request, HttpResponse response, URL requestUri)
throws IOException {
}
@Override
protected MapOutputInfo getMapOutputInfo(String base, String mapId,
int reduce, String user) throws IOException {
// Do nothing.
return null;
}
@Override
protected void populateHeaders(List<String> mapIds, String jobId,
String user, int reduce, HttpRequest request,
HttpResponse response, boolean keepAliveParam,
Map<String, MapOutputInfo> infoMap) throws IOException {
// Do nothing.
}
@Override
protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx,
Channel ch, String user, String mapId, int reduce,
MapOutputInfo info) throws IOException {
ShuffleHeader header =
new ShuffleHeader("attempt_12345_1_m_1_0", 5678, 5678, 1);
DataOutputBuffer dob = new DataOutputBuffer();
header.write(dob);
ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
dob = new DataOutputBuffer();
for (int i = 0; i < 100; ++i) {
header.write(dob);
}
return ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
}
};
}
}
/**
* Test the validation of ShuffleHandler's meta-data's serialization and
* de-serialization.
@ -934,4 +983,84 @@ protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx,
FileUtil.fullyDelete(absLogDir);
}
}
@Test(timeout = 4000)
public void testSendMapCount() throws Exception {
final List<ShuffleHandler.ReduceMapFileCount> listenerList =
new ArrayList<ShuffleHandler.ReduceMapFileCount>();
final ChannelHandlerContext mockCtx =
Mockito.mock(ChannelHandlerContext.class);
final MessageEvent mockEvt = Mockito.mock(MessageEvent.class);
final Channel mockCh = Mockito.mock(AbstractChannel.class);
// Mock HttpRequest and ChannelFuture
final HttpRequest mockHttpRequest = createMockHttpRequest();
final ChannelFuture mockFuture = createMockChannelFuture(mockCh,
listenerList);
// Mock Netty Channel Context and Channel behavior
Mockito.doReturn(mockCh).when(mockCtx).getChannel();
Mockito.when(mockCtx.getChannel()).thenReturn(mockCh);
Mockito.doReturn(mockFuture).when(mockCh).write(Mockito.any(Object.class));
Mockito.when(mockCh.write(Object.class)).thenReturn(mockFuture);
//Mock MessageEvent behavior
Mockito.doReturn(mockCh).when(mockEvt).getChannel();
Mockito.when(mockEvt.getChannel()).thenReturn(mockCh);
Mockito.doReturn(mockHttpRequest).when(mockEvt).getMessage();
final ShuffleHandler sh = new MockShuffleHandler();
Configuration conf = new Configuration();
sh.init(conf);
sh.start();
int maxOpenFiles =conf.getInt(ShuffleHandler.SHUFFLE_MAX_SESSION_OPEN_FILES,
ShuffleHandler.DEFAULT_SHUFFLE_MAX_SESSION_OPEN_FILES);
sh.getShuffle(conf).messageReceived(mockCtx, mockEvt);
assertTrue("Number of Open files should not exceed the configured " +
"value!-Not Expected",
listenerList.size() <= maxOpenFiles);
while(!listenerList.isEmpty()) {
listenerList.remove(0).operationComplete(mockFuture);
assertTrue("Number of Open files should not exceed the configured " +
"value!-Not Expected",
listenerList.size() <= maxOpenFiles);
}
sh.close();
}
public ChannelFuture createMockChannelFuture(Channel mockCh,
final List<ShuffleHandler.ReduceMapFileCount> listenerList) {
final ChannelFuture mockFuture = Mockito.mock(ChannelFuture.class);
Mockito.when(mockFuture.getChannel()).thenReturn(mockCh);
Mockito.doReturn(true).when(mockFuture).isSuccess();
Mockito.doAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
//Add ReduceMapFileCount listener to a list
if (invocation.getArguments()[0].getClass() ==
ShuffleHandler.ReduceMapFileCount.class)
listenerList.add((ShuffleHandler.ReduceMapFileCount)
invocation.getArguments()[0]);
return null;
}
}).when(mockFuture).addListener(Mockito.any(
ShuffleHandler.ReduceMapFileCount.class));
return mockFuture;
}
public HttpRequest createMockHttpRequest() {
HttpRequest mockHttpRequest = Mockito.mock(HttpRequest.class);
Mockito.doReturn(HttpMethod.GET).when(mockHttpRequest).getMethod();
Mockito.doAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
String uri = "/mapOutput?job=job_12345_1&reduce=1";
for (int i = 0; i < 100; i++)
uri = uri.concat("&map=attempt_12345_1_m_" + i + "_0");
return uri;
}
}).when(mockHttpRequest).getUri();
return mockHttpRequest;
}
}