MAPREDUCE-6958. Shuffle audit logger should log size of shuffle transfer. Contributed by Jason Lowe

This commit is contained in:
Jason Lowe 2017-09-19 10:27:04 -05:00
parent b1e1f7546c
commit 03a380daec
2 changed files with 22 additions and 18 deletions

View File

@ -923,13 +923,6 @@ public class ShuffleHandler extends AuxiliaryService {
return;
}
// this audit log is disabled by default,
// to turn it on please enable this audit log
// on log4j.properties by uncommenting the setting
if (AUDITLOG.isDebugEnabled()) {
AUDITLOG.debug("shuffle for " + jobQ.get(0) + " mappers: " + mapIds +
" reducer " + reduceQ.get(0));
}
int reduceId;
String jobId;
try {
@ -973,8 +966,8 @@ public class ShuffleHandler extends AuxiliaryService {
String outputBasePathStr = getBaseLocation(jobId, user);
try {
populateHeaders(mapIds, outputBasePathStr, user, reduceId, request,
response, keepAliveParam, mapOutputInfoMap);
populateHeaders(mapIds, jobId, outputBasePathStr, user, reduceId,
request, response, keepAliveParam, mapOutputInfoMap);
} catch(IOException e) {
ch.write(response);
LOG.error("Shuffle error in populating headers :", e);
@ -1080,10 +1073,10 @@ public class ShuffleHandler extends AuxiliaryService {
return outputInfo;
}
protected void populateHeaders(List<String> mapIds, String outputBaseStr,
String user, int reduce, HttpRequest request, HttpResponse response,
boolean keepAliveParam, Map<String, MapOutputInfo> mapOutputInfoMap)
throws IOException {
protected void populateHeaders(List<String> mapIds, String jobId,
String outputBaseStr, String user, int reduce, HttpRequest request,
HttpResponse response, boolean keepAliveParam,
Map<String, MapOutputInfo> mapOutputInfoMap) throws IOException {
long contentLength = 0;
for (String mapId : mapIds) {
@ -1108,6 +1101,17 @@ public class ShuffleHandler extends AuxiliaryService {
// Now set the response headers.
setResponseHeaders(response, keepAliveParam, contentLength);
// this audit log is disabled by default,
// to turn it on please enable this audit log
// on log4j.properties by uncommenting the setting
if (AUDITLOG.isDebugEnabled()) {
StringBuilder sb = new StringBuilder("shuffle for ");
sb.append(jobId).append(" reducer ").append(reduce);
sb.append(" length ").append(contentLength);
sb.append(" mappers: ").append(mapIds);
AUDITLOG.debug(sb.toString());
}
}
protected void setResponseHeaders(HttpResponse response,

View File

@ -118,7 +118,7 @@ public class TestShuffleHandler {
}
@Override
protected void populateHeaders(List<String> mapIds, String jobId,
String user, int reduce, HttpRequest request,
String outputBase, String user, int reduce, HttpRequest request,
HttpResponse response, boolean keepAliveParam,
Map<String, MapOutputInfo> infoMap) throws IOException {
// Do nothing.
@ -237,7 +237,7 @@ public class TestShuffleHandler {
}
@Override
protected void populateHeaders(List<String> mapIds, String jobId,
String user, int reduce, HttpRequest request,
String outputBase, String user, int reduce, HttpRequest request,
HttpResponse response, boolean keepAliveParam,
Map<String, MapOutputInfo> infoMap) throws IOException {
// Only set response headers and skip everything else
@ -349,7 +349,7 @@ public class TestShuffleHandler {
@Override
protected void populateHeaders(List<String> mapIds, String jobId,
String user, int reduce, HttpRequest request,
String outputBase, String user, int reduce, HttpRequest request,
HttpResponse response, boolean keepAliveParam,
Map<String, MapOutputInfo> infoMap) throws IOException {
// Send some dummy data (populate content length details)
@ -565,7 +565,7 @@ public class TestShuffleHandler {
}
@Override
protected void populateHeaders(List<String> mapIds, String jobId,
String user, int reduce, HttpRequest request,
String outputBase, String user, int reduce, HttpRequest request,
HttpResponse response, boolean keepAliveParam,
Map<String, MapOutputInfo> infoMap) throws IOException {
// Do nothing.
@ -988,7 +988,7 @@ public class TestShuffleHandler {
// replace the shuffle handler with one stubbed for testing
return new Shuffle(conf) {
@Override
protected void populateHeaders(List<String> mapIds,
protected void populateHeaders(List<String> mapIds, String jobId,
String outputBaseStr, String user, int reduce,
HttpRequest request, HttpResponse response,
boolean keepAliveParam, Map<String, MapOutputInfo> infoMap)