MAPREDUCE-6425. ShuffleHandler passes wrong "base" parameter to
getMapOutputInfo if mapId is not in the cache. Contributed by zhihai xu.
This commit is contained in:
parent
688617d6d7
commit
bff67dfe2f
|
@ -542,6 +542,9 @@ Release 2.7.2 - UNRELEASED
|
|||
|
||||
BUG FIXES
|
||||
|
||||
MAPREDUCE-6425. ShuffleHandler passes wrong "base" parameter to getMapOutputInfo
|
||||
if mapId is not in the cache. (zhihai xu via devaraj)
|
||||
|
||||
Release 2.7.1 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -815,7 +815,8 @@ public class ShuffleHandler extends AuxiliaryService {
|
|||
try {
|
||||
MapOutputInfo info = mapOutputInfoMap.get(mapId);
|
||||
if (info == null) {
|
||||
info = getMapOutputInfo(outputBasePathStr, mapId, reduceId, user);
|
||||
info = getMapOutputInfo(outputBasePathStr + mapId,
|
||||
mapId, reduceId, user);
|
||||
}
|
||||
lastMap =
|
||||
sendMapOutput(ctx, ch, user, mapId,
|
||||
|
|
|
@ -601,6 +601,7 @@ public class TestShuffleHandler {
|
|||
Assert.assertTrue((new String(byteArr)).contains(message));
|
||||
} finally {
|
||||
shuffleHandler.stop();
|
||||
FileUtil.fullyDelete(absLogDir);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -829,4 +830,104 @@ public class TestShuffleHandler {
|
|||
conn.disconnect();
|
||||
return rc;
|
||||
}
|
||||
|
||||
@Test(timeout = 100000)
|
||||
public void testGetMapOutputInfo() throws Exception {
|
||||
final ArrayList<Throwable> failures = new ArrayList<Throwable>(1);
|
||||
Configuration conf = new Configuration();
|
||||
conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
|
||||
conf.setInt(ShuffleHandler.MAX_SHUFFLE_CONNECTIONS, 3);
|
||||
File absLogDir = new File("target", TestShuffleHandler.class.
|
||||
getSimpleName() + "LocDir").getAbsoluteFile();
|
||||
conf.set(YarnConfiguration.NM_LOCAL_DIRS, absLogDir.getAbsolutePath());
|
||||
ApplicationId appId = ApplicationId.newInstance(12345, 1);
|
||||
String appAttemptId = "attempt_12345_1_m_1_0";
|
||||
String user = "randomUser";
|
||||
String reducerId = "0";
|
||||
List<File> fileMap = new ArrayList<File>();
|
||||
createShuffleHandlerFiles(absLogDir, user, appId.toString(), appAttemptId,
|
||||
conf, fileMap);
|
||||
ShuffleHandler shuffleHandler = new ShuffleHandler() {
|
||||
@Override
|
||||
protected Shuffle getShuffle(Configuration conf) {
|
||||
// replace the shuffle handler with one stubbed for testing
|
||||
return new Shuffle(conf) {
|
||||
@Override
|
||||
protected void populateHeaders(List<String> mapIds,
|
||||
String outputBaseStr, String user, int reduce,
|
||||
HttpRequest request, HttpResponse response,
|
||||
boolean keepAliveParam, Map<String, MapOutputInfo> infoMap)
|
||||
throws IOException {
|
||||
// Only set response headers and skip everything else
|
||||
// send some dummy value for content-length
|
||||
super.setResponseHeaders(response, keepAliveParam, 100);
|
||||
}
|
||||
@Override
|
||||
protected void verifyRequest(String appid,
|
||||
ChannelHandlerContext ctx, HttpRequest request,
|
||||
HttpResponse response, URL requestUri) throws IOException {
|
||||
// Do nothing.
|
||||
}
|
||||
@Override
|
||||
protected void sendError(ChannelHandlerContext ctx, String message,
|
||||
HttpResponseStatus status) {
|
||||
if (failures.size() == 0) {
|
||||
failures.add(new Error(message));
|
||||
ctx.getChannel().close();
|
||||
}
|
||||
}
|
||||
@Override
|
||||
protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx,
|
||||
Channel ch, String user, String mapId, int reduce,
|
||||
MapOutputInfo info) throws IOException {
|
||||
// send a shuffle header
|
||||
ShuffleHeader header =
|
||||
new ShuffleHeader("attempt_12345_1_m_1_0", 5678, 5678, 1);
|
||||
DataOutputBuffer dob = new DataOutputBuffer();
|
||||
header.write(dob);
|
||||
return ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
|
||||
}
|
||||
};
|
||||
}
|
||||
};
|
||||
shuffleHandler.init(conf);
|
||||
try {
|
||||
shuffleHandler.start();
|
||||
DataOutputBuffer outputBuffer = new DataOutputBuffer();
|
||||
outputBuffer.reset();
|
||||
Token<JobTokenIdentifier> jt =
|
||||
new Token<JobTokenIdentifier>("identifier".getBytes(),
|
||||
"password".getBytes(), new Text(user), new Text("shuffleService"));
|
||||
jt.write(outputBuffer);
|
||||
shuffleHandler
|
||||
.initializeApplication(new ApplicationInitializationContext(user,
|
||||
appId, ByteBuffer.wrap(outputBuffer.getData(), 0,
|
||||
outputBuffer.getLength())));
|
||||
URL url =
|
||||
new URL(
|
||||
"http://127.0.0.1:"
|
||||
+ shuffleHandler.getConfig().get(
|
||||
ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY)
|
||||
+ "/mapOutput?job=job_12345_0001&reduce=" + reducerId
|
||||
+ "&map=attempt_12345_1_m_1_0");
|
||||
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
|
||||
conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME,
|
||||
ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
|
||||
conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION,
|
||||
ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
|
||||
conn.connect();
|
||||
try {
|
||||
DataInputStream is = new DataInputStream(conn.getInputStream());
|
||||
ShuffleHeader header = new ShuffleHeader();
|
||||
header.readFields(is);
|
||||
is.close();
|
||||
} catch (EOFException e) {
|
||||
// ignore
|
||||
}
|
||||
Assert.assertEquals(failures.size(), 0);
|
||||
} finally {
|
||||
shuffleHandler.stop();
|
||||
FileUtil.fullyDelete(absLogDir);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue