MAPREDUCE-6616. Fail to create jobhistory file if there are some multibyte characters in the job name. Contributed by Kousuke Saruta.

(cherry picked from commit d314a3f99d5d03c62a3b8fb8883886440ab7d1db)
This commit is contained in:
Akira Ajisaka 2016-01-29 16:19:28 +09:00
parent a24898fd49
commit 6ce9b1240e
3 changed files with 296 additions and 77 deletions

View File

@ -420,6 +420,9 @@ Release 2.8.0 - UNRELEASED
MAPREDUCE-6563. Streaming documentation contains a stray '%' character. MAPREDUCE-6563. Streaming documentation contains a stray '%' character.
(cnauroth) (cnauroth)
MAPREDUCE-6616. Fail to create jobhistory file if there are some multibyte
characters in the job name. (Kousuke Saruta via aajisaka)
Release 2.7.3 - UNRELEASED Release 2.7.3 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -22,6 +22,7 @@
import java.io.UnsupportedEncodingException; import java.io.UnsupportedEncodingException;
import java.net.URLDecoder; import java.net.URLDecoder;
import java.net.URLEncoder; import java.net.URLEncoder;
import static java.nio.charset.StandardCharsets.UTF_8;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -57,7 +58,8 @@ public class FileNameIndexUtils {
* @param indexInfo the index info. * @param indexInfo the index info.
* @return the done job history filename. * @return the done job history filename.
*/ */
public static String getDoneFileName(JobIndexInfo indexInfo) throws IOException { public static String getDoneFileName(JobIndexInfo indexInfo)
throws IOException {
return getDoneFileName(indexInfo, return getDoneFileName(indexInfo,
JHAdminConfig.DEFAULT_MR_HS_JOBNAME_LIMIT); JHAdminConfig.DEFAULT_MR_HS_JOBNAME_LIMIT);
} }
@ -66,47 +68,56 @@ public static String getDoneFileName(JobIndexInfo indexInfo,
int jobNameLimit) throws IOException { int jobNameLimit) throws IOException {
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
//JobId //JobId
sb.append(escapeDelimiters(TypeConverter.fromYarn(indexInfo.getJobId()).toString())); sb.append(encodeJobHistoryFileName(escapeDelimiters(
TypeConverter.fromYarn(indexInfo.getJobId()).toString())));
sb.append(DELIMITER); sb.append(DELIMITER);
//SubmitTime //SubmitTime
sb.append(indexInfo.getSubmitTime()); sb.append(encodeJobHistoryFileName(String.valueOf(
indexInfo.getSubmitTime())));
sb.append(DELIMITER); sb.append(DELIMITER);
//UserName //UserName
sb.append(escapeDelimiters(getUserName(indexInfo))); sb.append(encodeJobHistoryFileName(escapeDelimiters(
getUserName(indexInfo))));
sb.append(DELIMITER); sb.append(DELIMITER);
//JobName //JobName
sb.append(escapeDelimiters(trimJobName( sb.append(trimURLEncodedString(encodeJobHistoryFileName(escapeDelimiters(
getJobName(indexInfo), jobNameLimit))); getJobName(indexInfo))), jobNameLimit));
sb.append(DELIMITER); sb.append(DELIMITER);
//FinishTime //FinishTime
sb.append(indexInfo.getFinishTime()); sb.append(encodeJobHistoryFileName(
String.valueOf(indexInfo.getFinishTime())));
sb.append(DELIMITER); sb.append(DELIMITER);
//NumMaps //NumMaps
sb.append(indexInfo.getNumMaps()); sb.append(encodeJobHistoryFileName(
String.valueOf(indexInfo.getNumMaps())));
sb.append(DELIMITER); sb.append(DELIMITER);
//NumReduces //NumReduces
sb.append(indexInfo.getNumReduces()); sb.append(encodeJobHistoryFileName(
String.valueOf(indexInfo.getNumReduces())));
sb.append(DELIMITER); sb.append(DELIMITER);
//JobStatus //JobStatus
sb.append(indexInfo.getJobStatus()); sb.append(encodeJobHistoryFileName(indexInfo.getJobStatus()));
sb.append(DELIMITER); sb.append(DELIMITER);
//QueueName //QueueName
sb.append(escapeDelimiters(getQueueName(indexInfo))); sb.append(escapeDelimiters(encodeJobHistoryFileName(
getQueueName(indexInfo))));
sb.append(DELIMITER); sb.append(DELIMITER);
//JobStartTime //JobStartTime
sb.append(indexInfo.getJobStartTime()); sb.append(encodeJobHistoryFileName(
String.valueOf(indexInfo.getJobStartTime())));
sb.append(JobHistoryUtils.JOB_HISTORY_FILE_EXTENSION); sb.append(encodeJobHistoryFileName(
return encodeJobHistoryFileName(sb.toString()); JobHistoryUtils.JOB_HISTORY_FILE_EXTENSION));
return sb.toString();
} }
/** /**
@ -116,21 +127,24 @@ public static String getDoneFileName(JobIndexInfo indexInfo,
* @param jhFileName the job history filename. * @param jhFileName the job history filename.
* @return a JobIndexInfo object built from the filename. * @return a JobIndexInfo object built from the filename.
*/ */
public static JobIndexInfo getIndexInfo(String jhFileName) throws IOException { public static JobIndexInfo getIndexInfo(String jhFileName)
String fileName = jhFileName.substring(0, jhFileName.indexOf(JobHistoryUtils.JOB_HISTORY_FILE_EXTENSION)); throws IOException {
String fileName = jhFileName.substring(0,
jhFileName.indexOf(JobHistoryUtils.JOB_HISTORY_FILE_EXTENSION));
JobIndexInfo indexInfo = new JobIndexInfo(); JobIndexInfo indexInfo = new JobIndexInfo();
String[] jobDetails = fileName.split(DELIMITER); String[] jobDetails = fileName.split(DELIMITER);
JobID oldJobId = JobID.forName(decodeJobHistoryFileName(jobDetails[JOB_ID_INDEX])); JobID oldJobId =
JobID.forName(decodeJobHistoryFileName(jobDetails[JOB_ID_INDEX]));
JobId jobId = TypeConverter.toYarn(oldJobId); JobId jobId = TypeConverter.toYarn(oldJobId);
indexInfo.setJobId(jobId); indexInfo.setJobId(jobId);
// Do not fail if there are some minor parse errors // Do not fail if there are some minor parse errors
try { try {
try { try {
indexInfo.setSubmitTime( indexInfo.setSubmitTime(Long.parseLong(
Long.parseLong(decodeJobHistoryFileName(jobDetails[SUBMIT_TIME_INDEX]))); decodeJobHistoryFileName(jobDetails[SUBMIT_TIME_INDEX])));
} catch (NumberFormatException e) { } catch (NumberFormatException e) {
LOG.warn("Unable to parse submit time from job history file " LOG.warn("Unable to parse submit time from job history file "
+ jhFileName + " : " + e); + jhFileName + " : " + e);
@ -143,24 +157,24 @@ public static JobIndexInfo getIndexInfo(String jhFileName) throws IOException {
decodeJobHistoryFileName(jobDetails[JOB_NAME_INDEX])); decodeJobHistoryFileName(jobDetails[JOB_NAME_INDEX]));
try { try {
indexInfo.setFinishTime( indexInfo.setFinishTime(Long.parseLong(
Long.parseLong(decodeJobHistoryFileName(jobDetails[FINISH_TIME_INDEX]))); decodeJobHistoryFileName(jobDetails[FINISH_TIME_INDEX])));
} catch (NumberFormatException e) { } catch (NumberFormatException e) {
LOG.warn("Unable to parse finish time from job history file " LOG.warn("Unable to parse finish time from job history file "
+ jhFileName + " : " + e); + jhFileName + " : " + e);
} }
try { try {
indexInfo.setNumMaps( indexInfo.setNumMaps(Integer.parseInt(
Integer.parseInt(decodeJobHistoryFileName(jobDetails[NUM_MAPS_INDEX]))); decodeJobHistoryFileName(jobDetails[NUM_MAPS_INDEX])));
} catch (NumberFormatException e) { } catch (NumberFormatException e) {
LOG.warn("Unable to parse num maps from job history file " LOG.warn("Unable to parse num maps from job history file "
+ jhFileName + " : " + e); + jhFileName + " : " + e);
} }
try { try {
indexInfo.setNumReduces( indexInfo.setNumReduces(Integer.parseInt(
Integer.parseInt(decodeJobHistoryFileName(jobDetails[NUM_REDUCES_INDEX]))); decodeJobHistoryFileName(jobDetails[NUM_REDUCES_INDEX])));
} catch (NumberFormatException e) { } catch (NumberFormatException e) {
LOG.warn("Unable to parse num reduces from job history file " LOG.warn("Unable to parse num reduces from job history file "
+ jhFileName + " : " + e); + jhFileName + " : " + e);
@ -176,8 +190,8 @@ public static JobIndexInfo getIndexInfo(String jhFileName) throws IOException {
if (jobDetails.length <= JOB_START_TIME_INDEX) { if (jobDetails.length <= JOB_START_TIME_INDEX) {
indexInfo.setJobStartTime(indexInfo.getSubmitTime()); indexInfo.setJobStartTime(indexInfo.getSubmitTime());
} else { } else {
indexInfo.setJobStartTime( indexInfo.setJobStartTime(Long.parseLong(
Long.parseLong(decodeJobHistoryFileName(jobDetails[JOB_START_TIME_INDEX]))); decodeJobHistoryFileName(jobDetails[JOB_START_TIME_INDEX])));
} }
} catch (NumberFormatException e){ } catch (NumberFormatException e){
LOG.warn("Unable to parse start time from job history file " LOG.warn("Unable to parse start time from job history file "
@ -208,7 +222,8 @@ public static String encodeJobHistoryFileName(String logFileName)
if (logFileName.contains(DELIMITER_ESCAPE)) { if (logFileName.contains(DELIMITER_ESCAPE)) {
replacementDelimiterEscape = nonOccursString(logFileName); replacementDelimiterEscape = nonOccursString(logFileName);
logFileName = logFileName.replaceAll(DELIMITER_ESCAPE, replacementDelimiterEscape); logFileName = logFileName.replaceAll(
DELIMITER_ESCAPE, replacementDelimiterEscape);
} }
String encodedFileName = null; String encodedFileName = null;
@ -223,7 +238,8 @@ public static String encodeJobHistoryFileName(String logFileName)
// Restore protected escape delimiters after encoding // Restore protected escape delimiters after encoding
if (replacementDelimiterEscape != null) { if (replacementDelimiterEscape != null) {
encodedFileName = encodedFileName.replaceAll(replacementDelimiterEscape, DELIMITER_ESCAPE); encodedFileName = encodedFileName.replaceAll(
replacementDelimiterEscape, DELIMITER_ESCAPE);
} }
return encodedFileName; return encodedFileName;
@ -289,12 +305,59 @@ private static String escapeDelimiters(String escapee) {
} }
/** /**
* Trims the job-name if required * Trims the url-encoded string if required
*/ */
private static String trimJobName(String jobName, int jobNameLimit) { private static String trimURLEncodedString(
if (jobName.length() > jobNameLimit) { String encodedString, int limitLength) {
jobName = jobName.substring(0, jobNameLimit); assert(limitLength >= 0) : "limitLength should be positive integer";
if (encodedString.length() < limitLength) {
return encodedString;
} }
return jobName;
int index = 0;
int increase = 0;
byte[] strBytes = encodedString.getBytes(UTF_8);
// calculate effective character length based on UTF-8 specification.
// The size of a character coded in UTF-8 should be 4-byte at most.
// See RFC3629
while (true) {
byte b = strBytes[index];
if (b == '%') {
byte minuend1 = strBytes[index + 1];
byte subtrahend1 = (byte)(Character.isDigit(
minuend1) ? '0' : 'A' - 10);
byte minuend2 = strBytes[index + 2];
byte subtrahend2 = (byte)(Character.isDigit(
minuend2) ? '0' : 'A' - 10);
int initialHex =
((Character.toUpperCase(minuend1) - subtrahend1) << 4) +
(Character.toUpperCase(minuend2) - subtrahend2);
if (0x00 <= initialHex && initialHex <= 0x7F) {
// For 1-byte UTF-8 characters
increase = 3;
} else if (0xC2 <= initialHex && initialHex <= 0xDF) {
// For 2-byte UTF-8 characters
increase = 6;
} else if (0xE0 <= initialHex && initialHex <= 0xEF) {
// For 3-byte UTF-8 characters
increase = 9;
} else {
// For 4-byte UTF-8 characters
increase = 12;
}
} else {
increase = 1;
}
if (index + increase > limitLength) {
break;
} else {
index += increase;
}
}
return encodedString.substring(0, index);
} }
} }

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.mapreduce.v2.jobhistory; package org.apache.hadoop.mapreduce.v2.jobhistory;
import java.io.IOException; import java.io.IOException;
import static java.nio.charset.StandardCharsets.UTF_8;
import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.TypeConverter;
@ -173,6 +174,158 @@ public void testTrimJobName() throws IOException {
parsedInfo.getJobName()); parsedInfo.getJobName());
} }
/**
* Verify the name of jobhistory file is not greater than 255 bytes
* even if there are some multibyte characters in the job name.
*/
@Test
public void testJobNameWithMultibyteChars() throws IOException {
JobIndexInfo info = new JobIndexInfo();
JobID oldJobId = JobID.forName(JOB_ID);
JobId jobId = TypeConverter.toYarn(oldJobId);
info.setJobId(jobId);
info.setSubmitTime(Long.parseLong(SUBMIT_TIME));
info.setUser(USER_NAME);
StringBuilder sb = new StringBuilder();
info.setFinishTime(Long.parseLong(FINISH_TIME));
info.setNumMaps(Integer.parseInt(NUM_MAPS));
info.setNumReduces(Integer.parseInt(NUM_REDUCES));
info.setJobStatus(JOB_STATUS);
info.setQueueName(QUEUE_NAME);
info.setJobStartTime(Long.parseLong(JOB_START_TIME));
// Test for 1 byte UTF-8 character
// which is encoded into 1 x 3 = 3 characters by URL encode.
for (int i = 0; i < 100; i++) {
sb.append('%');
}
String longJobName = sb.toString();
info.setJobName(longJobName);
String jobHistoryFile =
FileNameIndexUtils.getDoneFileName(info, 50);
Assert.assertTrue(jobHistoryFile.length() <= 255);
String trimedJobName = jobHistoryFile.split(
FileNameIndexUtils.DELIMITER)[3]; // 3 is index of job name
// 3 x 16 < 50 < 3 x 17 so the length of trimedJobName should be 48
Assert.assertEquals(48, trimedJobName.getBytes(UTF_8).length);
// validate whether trimmedJobName by testing reversibility
byte[] trimedJobNameInByte = trimedJobName.getBytes(UTF_8);
String reEncodedTrimedJobName = new String(trimedJobNameInByte, UTF_8);
Assert.assertArrayEquals(trimedJobNameInByte,
reEncodedTrimedJobName.getBytes(UTF_8));
sb.setLength(0);
// Test for 2 bytes UTF-8 character
// which is encoded into 2 x 3 = 6 characters by URL encode.
for (int i = 0; i < 100; i++) {
sb.append('\u03A9'); // large omega
}
longJobName = sb.toString();
info.setJobName(longJobName);
jobHistoryFile =
FileNameIndexUtils.getDoneFileName(info, 27);
Assert.assertTrue(jobHistoryFile.length() <= 255);
trimedJobName = jobHistoryFile.split(
FileNameIndexUtils.DELIMITER)[3]; // 3 is index of job name
// 6 x 4 < 27 < 6 x 5 so the length of trimedJobName should be 24
Assert.assertEquals(24, trimedJobName.getBytes(UTF_8).length);
// validate whether trimmedJobName by testing reversibility
trimedJobNameInByte = trimedJobName.getBytes(UTF_8);
reEncodedTrimedJobName = new String(trimedJobNameInByte, UTF_8);
Assert.assertArrayEquals(trimedJobNameInByte,
reEncodedTrimedJobName.getBytes(UTF_8));
sb.setLength(0);
// Test for 3 bytes UTF-8 character
// which is encoded into 3 x 3 = 9 characters by URL encode.
for (int i = 0; i < 100; i++) {
sb.append('\u2192'); // rightwards arrow
}
longJobName = sb.toString();
info.setJobName(longJobName);
jobHistoryFile =
FileNameIndexUtils.getDoneFileName(info, 40);
Assert.assertTrue(jobHistoryFile.length() <= 255);
trimedJobName = jobHistoryFile.split(
FileNameIndexUtils.DELIMITER)[3]; // 3 is index of job name
// 9 x 4 < 40 < 9 x 5 so the length of trimedJobName should be 36
Assert.assertEquals(36, trimedJobName.getBytes(UTF_8).length);
// validate whether trimmedJobName by testing reversibility
trimedJobNameInByte = trimedJobName.getBytes(UTF_8);
reEncodedTrimedJobName = new String(trimedJobNameInByte, UTF_8);
Assert.assertArrayEquals(trimedJobNameInByte,
reEncodedTrimedJobName.getBytes(UTF_8));
sb.setLength(0);
// Test for 4 bytes UTF-8 character
// which is encoded into 4 x 3 = 12 characters by URL encode.
for (int i = 0; i < 100; i++) {
sb.append("\uD867\uDE3D"); // Mugil cephalus in Kanji.
}
longJobName = sb.toString();
info.setJobName(longJobName);
jobHistoryFile =
FileNameIndexUtils.getDoneFileName(info, 49);
Assert.assertTrue(jobHistoryFile.length() <= 255);
trimedJobName = jobHistoryFile.split(
FileNameIndexUtils.DELIMITER)[3]; // 3 is index of job name
// 12 x 4 < 49 < 12 x 5 so the length of trimedJobName should be 48
Assert.assertEquals(48, trimedJobName.getBytes(UTF_8).length);
// validate whether trimmedJobName by testing reversibility
trimedJobNameInByte = trimedJobName.getBytes(UTF_8);
reEncodedTrimedJobName = new String(trimedJobNameInByte, UTF_8);
Assert.assertArrayEquals(trimedJobNameInByte,
reEncodedTrimedJobName.getBytes(UTF_8));
sb.setLength(0);
// Test for the combination of 1 to 4 bytes UTF-8 characters
sb.append('\u732B') // cat in Kanji (encoded into 3 bytes x 3 characters)
.append("[") // (encoded into 1 byte x 3 characters)
.append('\u03BB') // small lambda (encoded into 2 bytes x 3 characters)
.append('/') // (encoded into 1 byte x 3 characters)
.append('A') // not url-encoded (1 byte x 1 character)
.append("\ud867\ude49") // flying fish in
// Kanji (encoded into 4 bytes x 3 characters)
.append('\u72AC'); // dog in Kanji (encoded into 3 bytes x 3 characters)
longJobName = sb.toString();
info.setJobName(longJobName);
jobHistoryFile =
FileNameIndexUtils.getDoneFileName(info, 23);
Assert.assertTrue(jobHistoryFile.length() <= 255);
trimedJobName = jobHistoryFile.split(
FileNameIndexUtils.DELIMITER)[3]; // 3 is index of job name
// total size of the first 5 characters = 22
// 23 < total size of the first 6 characters
Assert.assertEquals(22, trimedJobName.getBytes(UTF_8).length);
// validate whether trimmedJobName by testing reversibility
trimedJobNameInByte = trimedJobName.getBytes(UTF_8);
reEncodedTrimedJobName = new String(trimedJobNameInByte, UTF_8);
Assert.assertArrayEquals(trimedJobNameInByte,
reEncodedTrimedJobName.getBytes(UTF_8));
}
@Test @Test
public void testUserNamePercentDecoding() throws IOException { public void testUserNamePercentDecoding() throws IOException {
String jobHistoryFile = String.format(JOB_HISTORY_FILE_FORMATTER, String jobHistoryFile = String.format(JOB_HISTORY_FILE_FORMATTER,