MAPREDUCE-6616. Fail to create jobhistory file if there are some multibyte characters in the job name. Contributed by Kousuke Saruta.

(cherry picked from commit d314a3f99d5d03c62a3b8fb8883886440ab7d1db)
(cherry picked from commit 79cde21064a1fe274a4dcbf3dec4ed0d743876cf)
This commit is contained in:
Akira Ajisaka 2016-01-29 16:19:28 +09:00
parent 6a06a492ec
commit ce5947916d
3 changed files with 296 additions and 77 deletions

View File

@ -406,6 +406,9 @@ Release 2.8.0 - UNRELEASED
MAPREDUCE-6563. Streaming documentation contains a stray '%' character.
(cnauroth)
MAPREDUCE-6616. Fail to create jobhistory file if there are some multibyte
characters in the job name. (Kousuke Saruta via aajisaka)
Release 2.7.3 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -22,6 +22,7 @@ import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.net.URLEncoder;
import static java.nio.charset.StandardCharsets.UTF_8;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -57,7 +58,8 @@ public class FileNameIndexUtils {
* @param indexInfo the index info.
* @return the done job history filename.
*/
public static String getDoneFileName(JobIndexInfo indexInfo) throws IOException {
public static String getDoneFileName(JobIndexInfo indexInfo)
throws IOException {
return getDoneFileName(indexInfo,
JHAdminConfig.DEFAULT_MR_HS_JOBNAME_LIMIT);
}
@ -66,47 +68,56 @@ public class FileNameIndexUtils {
int jobNameLimit) throws IOException {
StringBuilder sb = new StringBuilder();
//JobId
sb.append(escapeDelimiters(TypeConverter.fromYarn(indexInfo.getJobId()).toString()));
sb.append(encodeJobHistoryFileName(escapeDelimiters(
TypeConverter.fromYarn(indexInfo.getJobId()).toString())));
sb.append(DELIMITER);
//SubmitTime
sb.append(indexInfo.getSubmitTime());
sb.append(encodeJobHistoryFileName(String.valueOf(
indexInfo.getSubmitTime())));
sb.append(DELIMITER);
//UserName
sb.append(escapeDelimiters(getUserName(indexInfo)));
sb.append(encodeJobHistoryFileName(escapeDelimiters(
getUserName(indexInfo))));
sb.append(DELIMITER);
//JobName
sb.append(escapeDelimiters(trimJobName(
getJobName(indexInfo), jobNameLimit)));
sb.append(trimURLEncodedString(encodeJobHistoryFileName(escapeDelimiters(
getJobName(indexInfo))), jobNameLimit));
sb.append(DELIMITER);
//FinishTime
sb.append(indexInfo.getFinishTime());
sb.append(encodeJobHistoryFileName(
String.valueOf(indexInfo.getFinishTime())));
sb.append(DELIMITER);
//NumMaps
sb.append(indexInfo.getNumMaps());
sb.append(encodeJobHistoryFileName(
String.valueOf(indexInfo.getNumMaps())));
sb.append(DELIMITER);
//NumReduces
sb.append(indexInfo.getNumReduces());
sb.append(encodeJobHistoryFileName(
String.valueOf(indexInfo.getNumReduces())));
sb.append(DELIMITER);
//JobStatus
sb.append(indexInfo.getJobStatus());
sb.append(encodeJobHistoryFileName(indexInfo.getJobStatus()));
sb.append(DELIMITER);
//QueueName
sb.append(escapeDelimiters(getQueueName(indexInfo)));
sb.append(escapeDelimiters(encodeJobHistoryFileName(
getQueueName(indexInfo))));
sb.append(DELIMITER);
//JobStartTime
sb.append(indexInfo.getJobStartTime());
sb.append(encodeJobHistoryFileName(
String.valueOf(indexInfo.getJobStartTime())));
sb.append(JobHistoryUtils.JOB_HISTORY_FILE_EXTENSION);
return encodeJobHistoryFileName(sb.toString());
sb.append(encodeJobHistoryFileName(
JobHistoryUtils.JOB_HISTORY_FILE_EXTENSION));
return sb.toString();
}
/**
@ -116,21 +127,24 @@ public class FileNameIndexUtils {
* @param jhFileName the job history filename.
* @return a JobIndexInfo object built from the filename.
*/
public static JobIndexInfo getIndexInfo(String jhFileName) throws IOException {
String fileName = jhFileName.substring(0, jhFileName.indexOf(JobHistoryUtils.JOB_HISTORY_FILE_EXTENSION));
public static JobIndexInfo getIndexInfo(String jhFileName)
throws IOException {
String fileName = jhFileName.substring(0,
jhFileName.indexOf(JobHistoryUtils.JOB_HISTORY_FILE_EXTENSION));
JobIndexInfo indexInfo = new JobIndexInfo();
String[] jobDetails = fileName.split(DELIMITER);
JobID oldJobId = JobID.forName(decodeJobHistoryFileName(jobDetails[JOB_ID_INDEX]));
JobID oldJobId =
JobID.forName(decodeJobHistoryFileName(jobDetails[JOB_ID_INDEX]));
JobId jobId = TypeConverter.toYarn(oldJobId);
indexInfo.setJobId(jobId);
// Do not fail if there are some minor parse errors
try {
try {
indexInfo.setSubmitTime(
Long.parseLong(decodeJobHistoryFileName(jobDetails[SUBMIT_TIME_INDEX])));
indexInfo.setSubmitTime(Long.parseLong(
decodeJobHistoryFileName(jobDetails[SUBMIT_TIME_INDEX])));
} catch (NumberFormatException e) {
LOG.warn("Unable to parse submit time from job history file "
+ jhFileName + " : " + e);
@ -143,24 +157,24 @@ public class FileNameIndexUtils {
decodeJobHistoryFileName(jobDetails[JOB_NAME_INDEX]));
try {
indexInfo.setFinishTime(
Long.parseLong(decodeJobHistoryFileName(jobDetails[FINISH_TIME_INDEX])));
indexInfo.setFinishTime(Long.parseLong(
decodeJobHistoryFileName(jobDetails[FINISH_TIME_INDEX])));
} catch (NumberFormatException e) {
LOG.warn("Unable to parse finish time from job history file "
+ jhFileName + " : " + e);
}
try {
indexInfo.setNumMaps(
Integer.parseInt(decodeJobHistoryFileName(jobDetails[NUM_MAPS_INDEX])));
indexInfo.setNumMaps(Integer.parseInt(
decodeJobHistoryFileName(jobDetails[NUM_MAPS_INDEX])));
} catch (NumberFormatException e) {
LOG.warn("Unable to parse num maps from job history file "
+ jhFileName + " : " + e);
}
try {
indexInfo.setNumReduces(
Integer.parseInt(decodeJobHistoryFileName(jobDetails[NUM_REDUCES_INDEX])));
indexInfo.setNumReduces(Integer.parseInt(
decodeJobHistoryFileName(jobDetails[NUM_REDUCES_INDEX])));
} catch (NumberFormatException e) {
LOG.warn("Unable to parse num reduces from job history file "
+ jhFileName + " : " + e);
@ -176,8 +190,8 @@ public class FileNameIndexUtils {
if (jobDetails.length <= JOB_START_TIME_INDEX) {
indexInfo.setJobStartTime(indexInfo.getSubmitTime());
} else {
indexInfo.setJobStartTime(
Long.parseLong(decodeJobHistoryFileName(jobDetails[JOB_START_TIME_INDEX])));
indexInfo.setJobStartTime(Long.parseLong(
decodeJobHistoryFileName(jobDetails[JOB_START_TIME_INDEX])));
}
} catch (NumberFormatException e){
LOG.warn("Unable to parse start time from job history file "
@ -208,7 +222,8 @@ public class FileNameIndexUtils {
if (logFileName.contains(DELIMITER_ESCAPE)) {
replacementDelimiterEscape = nonOccursString(logFileName);
logFileName = logFileName.replaceAll(DELIMITER_ESCAPE, replacementDelimiterEscape);
logFileName = logFileName.replaceAll(
DELIMITER_ESCAPE, replacementDelimiterEscape);
}
String encodedFileName = null;
@ -223,7 +238,8 @@ public class FileNameIndexUtils {
// Restore protected escape delimiters after encoding
if (replacementDelimiterEscape != null) {
encodedFileName = encodedFileName.replaceAll(replacementDelimiterEscape, DELIMITER_ESCAPE);
encodedFileName = encodedFileName.replaceAll(
replacementDelimiterEscape, DELIMITER_ESCAPE);
}
return encodedFileName;
@ -289,12 +305,59 @@ public class FileNameIndexUtils {
}
/**
* Trims the job-name if required
* Trims the url-encoded string if required
*/
private static String trimJobName(String jobName, int jobNameLimit) {
if (jobName.length() > jobNameLimit) {
jobName = jobName.substring(0, jobNameLimit);
private static String trimURLEncodedString(
String encodedString, int limitLength) {
assert(limitLength >= 0) : "limitLength should be positive integer";
if (encodedString.length() < limitLength) {
return encodedString;
}
return jobName;
int index = 0;
int increase = 0;
byte[] strBytes = encodedString.getBytes(UTF_8);
// calculate effective character length based on UTF-8 specification.
// The size of a character coded in UTF-8 should be 4-byte at most.
// See RFC3629
while (true) {
byte b = strBytes[index];
if (b == '%') {
byte minuend1 = strBytes[index + 1];
byte subtrahend1 = (byte)(Character.isDigit(
minuend1) ? '0' : 'A' - 10);
byte minuend2 = strBytes[index + 2];
byte subtrahend2 = (byte)(Character.isDigit(
minuend2) ? '0' : 'A' - 10);
int initialHex =
((Character.toUpperCase(minuend1) - subtrahend1) << 4) +
(Character.toUpperCase(minuend2) - subtrahend2);
if (0x00 <= initialHex && initialHex <= 0x7F) {
// For 1-byte UTF-8 characters
increase = 3;
} else if (0xC2 <= initialHex && initialHex <= 0xDF) {
// For 2-byte UTF-8 characters
increase = 6;
} else if (0xE0 <= initialHex && initialHex <= 0xEF) {
// For 3-byte UTF-8 characters
increase = 9;
} else {
// For 4-byte UTF-8 characters
increase = 12;
}
} else {
increase = 1;
}
if (index + increase > limitLength) {
break;
} else {
index += increase;
}
}
return encodedString.substring(0, index);
}
}

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.mapreduce.v2.jobhistory;
import java.io.IOException;
import static java.nio.charset.StandardCharsets.UTF_8;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.TypeConverter;
@ -173,6 +174,158 @@ public class TestFileNameIndexUtils {
parsedInfo.getJobName());
}
/**
* Verify the name of jobhistory file is not greater than 255 bytes
* even if there are some multibyte characters in the job name.
*/
@Test
public void testJobNameWithMultibyteChars() throws IOException {
JobIndexInfo info = new JobIndexInfo();
JobID oldJobId = JobID.forName(JOB_ID);
JobId jobId = TypeConverter.toYarn(oldJobId);
info.setJobId(jobId);
info.setSubmitTime(Long.parseLong(SUBMIT_TIME));
info.setUser(USER_NAME);
StringBuilder sb = new StringBuilder();
info.setFinishTime(Long.parseLong(FINISH_TIME));
info.setNumMaps(Integer.parseInt(NUM_MAPS));
info.setNumReduces(Integer.parseInt(NUM_REDUCES));
info.setJobStatus(JOB_STATUS);
info.setQueueName(QUEUE_NAME);
info.setJobStartTime(Long.parseLong(JOB_START_TIME));
// Test for 1 byte UTF-8 character
// which is encoded into 1 x 3 = 3 characters by URL encode.
for (int i = 0; i < 100; i++) {
sb.append('%');
}
String longJobName = sb.toString();
info.setJobName(longJobName);
String jobHistoryFile =
FileNameIndexUtils.getDoneFileName(info, 50);
Assert.assertTrue(jobHistoryFile.length() <= 255);
String trimedJobName = jobHistoryFile.split(
FileNameIndexUtils.DELIMITER)[3]; // 3 is index of job name
// 3 x 16 < 50 < 3 x 17 so the length of trimedJobName should be 48
Assert.assertEquals(48, trimedJobName.getBytes(UTF_8).length);
// validate whether trimmedJobName by testing reversibility
byte[] trimedJobNameInByte = trimedJobName.getBytes(UTF_8);
String reEncodedTrimedJobName = new String(trimedJobNameInByte, UTF_8);
Assert.assertArrayEquals(trimedJobNameInByte,
reEncodedTrimedJobName.getBytes(UTF_8));
sb.setLength(0);
// Test for 2 bytes UTF-8 character
// which is encoded into 2 x 3 = 6 characters by URL encode.
for (int i = 0; i < 100; i++) {
sb.append('\u03A9'); // large omega
}
longJobName = sb.toString();
info.setJobName(longJobName);
jobHistoryFile =
FileNameIndexUtils.getDoneFileName(info, 27);
Assert.assertTrue(jobHistoryFile.length() <= 255);
trimedJobName = jobHistoryFile.split(
FileNameIndexUtils.DELIMITER)[3]; // 3 is index of job name
// 6 x 4 < 27 < 6 x 5 so the length of trimedJobName should be 24
Assert.assertEquals(24, trimedJobName.getBytes(UTF_8).length);
// validate whether trimmedJobName by testing reversibility
trimedJobNameInByte = trimedJobName.getBytes(UTF_8);
reEncodedTrimedJobName = new String(trimedJobNameInByte, UTF_8);
Assert.assertArrayEquals(trimedJobNameInByte,
reEncodedTrimedJobName.getBytes(UTF_8));
sb.setLength(0);
// Test for 3 bytes UTF-8 character
// which is encoded into 3 x 3 = 9 characters by URL encode.
for (int i = 0; i < 100; i++) {
sb.append('\u2192'); // rightwards arrow
}
longJobName = sb.toString();
info.setJobName(longJobName);
jobHistoryFile =
FileNameIndexUtils.getDoneFileName(info, 40);
Assert.assertTrue(jobHistoryFile.length() <= 255);
trimedJobName = jobHistoryFile.split(
FileNameIndexUtils.DELIMITER)[3]; // 3 is index of job name
// 9 x 4 < 40 < 9 x 5 so the length of trimedJobName should be 36
Assert.assertEquals(36, trimedJobName.getBytes(UTF_8).length);
// validate whether trimmedJobName by testing reversibility
trimedJobNameInByte = trimedJobName.getBytes(UTF_8);
reEncodedTrimedJobName = new String(trimedJobNameInByte, UTF_8);
Assert.assertArrayEquals(trimedJobNameInByte,
reEncodedTrimedJobName.getBytes(UTF_8));
sb.setLength(0);
// Test for 4 bytes UTF-8 character
// which is encoded into 4 x 3 = 12 characters by URL encode.
for (int i = 0; i < 100; i++) {
sb.append("\uD867\uDE3D"); // Mugil cephalus in Kanji.
}
longJobName = sb.toString();
info.setJobName(longJobName);
jobHistoryFile =
FileNameIndexUtils.getDoneFileName(info, 49);
Assert.assertTrue(jobHistoryFile.length() <= 255);
trimedJobName = jobHistoryFile.split(
FileNameIndexUtils.DELIMITER)[3]; // 3 is index of job name
// 12 x 4 < 49 < 12 x 5 so the length of trimedJobName should be 48
Assert.assertEquals(48, trimedJobName.getBytes(UTF_8).length);
// validate whether trimmedJobName by testing reversibility
trimedJobNameInByte = trimedJobName.getBytes(UTF_8);
reEncodedTrimedJobName = new String(trimedJobNameInByte, UTF_8);
Assert.assertArrayEquals(trimedJobNameInByte,
reEncodedTrimedJobName.getBytes(UTF_8));
sb.setLength(0);
// Test for the combination of 1 to 4 bytes UTF-8 characters
sb.append('\u732B') // cat in Kanji (encoded into 3 bytes x 3 characters)
.append("[") // (encoded into 1 byte x 3 characters)
.append('\u03BB') // small lambda (encoded into 2 bytes x 3 characters)
.append('/') // (encoded into 1 byte x 3 characters)
.append('A') // not url-encoded (1 byte x 1 character)
.append("\ud867\ude49") // flying fish in
// Kanji (encoded into 4 bytes x 3 characters)
.append('\u72AC'); // dog in Kanji (encoded into 3 bytes x 3 characters)
longJobName = sb.toString();
info.setJobName(longJobName);
jobHistoryFile =
FileNameIndexUtils.getDoneFileName(info, 23);
Assert.assertTrue(jobHistoryFile.length() <= 255);
trimedJobName = jobHistoryFile.split(
FileNameIndexUtils.DELIMITER)[3]; // 3 is index of job name
// total size of the first 5 characters = 22
// 23 < total size of the first 6 characters
Assert.assertEquals(22, trimedJobName.getBytes(UTF_8).length);
// validate whether trimmedJobName by testing reversibility
trimedJobNameInByte = trimedJobName.getBytes(UTF_8);
reEncodedTrimedJobName = new String(trimedJobNameInByte, UTF_8);
Assert.assertArrayEquals(trimedJobNameInByte,
reEncodedTrimedJobName.getBytes(UTF_8));
}
@Test
public void testUserNamePercentDecoding() throws IOException {
String jobHistoryFile = String.format(JOB_HISTORY_FILE_FORMATTER,