YARN-1578. Fixed reading incomplete application attempt and container data in FileSystemApplicationHistoryStore. Contributed by Shinichi Yamashita.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1567816 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Zhijie Shen 2014-02-13 00:08:47 +00:00
parent 2624b20291
commit 0185a57847
3 changed files with 85 additions and 101 deletions

View File

@ -259,6 +259,9 @@ Release 2.4.0 - UNRELEASED
YARN-1692. ConcurrentModificationException in fair scheduler AppSchedulable
(Sangjin Lee via Sandy Ryza)
YARN-1578. Fixed reading incomplete application attempt and container data
in FileSystemApplicationHistoryStore. (Shinichi Yamashita via zjshen)
Release 2.3.1 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -215,17 +215,30 @@ public class FileSystemApplicationHistoryStore extends AbstractService
getApplicationAttempts(ApplicationId appId) throws IOException {
Map<ApplicationAttemptId, ApplicationAttemptHistoryData> historyDataMap =
new HashMap<ApplicationAttemptId, ApplicationAttemptHistoryData>();
Map<ApplicationAttemptId, StartFinishDataPair<ApplicationAttemptStartData, ApplicationAttemptFinishData>> startFinshDataMap =
new HashMap<ApplicationAttemptId, StartFinishDataPair<ApplicationAttemptStartData, ApplicationAttemptFinishData>>();
HistoryFileReader hfReader = getHistoryFileReader(appId);
try {
while (hfReader.hasNext()) {
HistoryFileReader.Entry entry = hfReader.next();
if (entry.key.id.startsWith(ConverterUtils.APPLICATION_ATTEMPT_PREFIX)) {
if (entry.key.id.startsWith(
ConverterUtils.APPLICATION_ATTEMPT_PREFIX)) {
ApplicationAttemptId appAttemptId =
ConverterUtils.toApplicationAttemptId(entry.key.id);
if (appAttemptId.getApplicationId().equals(appId)) {
ApplicationAttemptHistoryData historyData =
historyDataMap.get(appAttemptId);
if (historyData == null) {
historyData = ApplicationAttemptHistoryData.newInstance(
appAttemptId, null, -1, null, null, null,
FinalApplicationStatus.UNDEFINED, null);
historyDataMap.put(appAttemptId, historyData);
}
if (entry.key.suffix.equals(START_DATA_SUFFIX)) {
retrieveStartFinishData(appId, entry, startFinshDataMap, true);
mergeApplicationAttemptHistoryData(historyData,
parseApplicationAttemptStartData(entry.value));
} else if (entry.key.suffix.equals(FINISH_DATA_SUFFIX)) {
retrieveStartFinishData(appId, entry, startFinshDataMap, false);
mergeApplicationAttemptHistoryData(historyData,
parseApplicationAttemptFinishData(entry.value));
}
}
}
}
@ -237,45 +250,9 @@ public class FileSystemApplicationHistoryStore extends AbstractService
} finally {
hfReader.close();
}
for (Map.Entry<ApplicationAttemptId, StartFinishDataPair<ApplicationAttemptStartData, ApplicationAttemptFinishData>> entry : startFinshDataMap
.entrySet()) {
ApplicationAttemptHistoryData historyData =
ApplicationAttemptHistoryData.newInstance(entry.getKey(), null, -1,
null, null, null, FinalApplicationStatus.UNDEFINED, null);
mergeApplicationAttemptHistoryData(historyData,
entry.getValue().startData);
mergeApplicationAttemptHistoryData(historyData,
entry.getValue().finishData);
historyDataMap.put(entry.getKey(), historyData);
}
return historyDataMap;
}
private
void
retrieveStartFinishData(
ApplicationId appId,
HistoryFileReader.Entry entry,
Map<ApplicationAttemptId, StartFinishDataPair<ApplicationAttemptStartData, ApplicationAttemptFinishData>> startFinshDataMap,
boolean start) throws IOException {
ApplicationAttemptId appAttemptId =
ConverterUtils.toApplicationAttemptId(entry.key.id);
if (appAttemptId.getApplicationId().equals(appId)) {
StartFinishDataPair<ApplicationAttemptStartData, ApplicationAttemptFinishData> pair =
startFinshDataMap.get(appAttemptId);
if (pair == null) {
pair =
new StartFinishDataPair<ApplicationAttemptStartData, ApplicationAttemptFinishData>();
startFinshDataMap.put(appAttemptId, pair);
}
if (start) {
pair.startData = parseApplicationAttemptStartData(entry.value);
} else {
pair.finishData = parseApplicationAttemptFinishData(entry.value);
}
}
}
@Override
public ApplicationAttemptHistoryData getApplicationAttempt(
ApplicationAttemptId appAttemptId) throws IOException {
@ -391,20 +368,30 @@ public class FileSystemApplicationHistoryStore extends AbstractService
ApplicationAttemptId appAttemptId) throws IOException {
Map<ContainerId, ContainerHistoryData> historyDataMap =
new HashMap<ContainerId, ContainerHistoryData>();
Map<ContainerId, StartFinishDataPair<ContainerStartData, ContainerFinishData>> startFinshDataMap =
new HashMap<ContainerId, StartFinishDataPair<ContainerStartData, ContainerFinishData>>();
HistoryFileReader hfReader =
getHistoryFileReader(appAttemptId.getApplicationId());
try {
while (hfReader.hasNext()) {
HistoryFileReader.Entry entry = hfReader.next();
if (entry.key.id.startsWith(ConverterUtils.CONTAINER_PREFIX)) {
ContainerId containerId =
ConverterUtils.toContainerId(entry.key.id);
if (containerId.getApplicationAttemptId().equals(appAttemptId)) {
ContainerHistoryData historyData =
historyDataMap.get(containerId);
if (historyData == null) {
historyData = ContainerHistoryData.newInstance(
containerId, null, null, null, Long.MIN_VALUE,
Long.MAX_VALUE, null, null, Integer.MAX_VALUE, null);
historyDataMap.put(containerId, historyData);
}
if (entry.key.suffix.equals(START_DATA_SUFFIX)) {
retrieveStartFinishData(appAttemptId, entry, startFinshDataMap,
true);
mergeContainerHistoryData(historyData,
parseContainerStartData(entry.value));
} else if (entry.key.suffix.equals(FINISH_DATA_SUFFIX)) {
retrieveStartFinishData(appAttemptId, entry, startFinshDataMap,
false);
mergeContainerHistoryData(historyData,
parseContainerFinishData(entry.value));
}
}
}
}
@ -416,43 +403,9 @@ public class FileSystemApplicationHistoryStore extends AbstractService
} finally {
hfReader.close();
}
for (Map.Entry<ContainerId, StartFinishDataPair<ContainerStartData, ContainerFinishData>> entry : startFinshDataMap
.entrySet()) {
ContainerHistoryData historyData =
ContainerHistoryData
.newInstance(entry.getKey(), null, null, null, Long.MIN_VALUE,
Long.MAX_VALUE, null, null, Integer.MAX_VALUE, null);
mergeContainerHistoryData(historyData, entry.getValue().startData);
mergeContainerHistoryData(historyData, entry.getValue().finishData);
historyDataMap.put(entry.getKey(), historyData);
}
return historyDataMap;
}
private
void
retrieveStartFinishData(
ApplicationAttemptId appAttemptId,
HistoryFileReader.Entry entry,
Map<ContainerId, StartFinishDataPair<ContainerStartData, ContainerFinishData>> startFinshDataMap,
boolean start) throws IOException {
ContainerId containerId = ConverterUtils.toContainerId(entry.key.id);
if (containerId.getApplicationAttemptId().equals(appAttemptId)) {
StartFinishDataPair<ContainerStartData, ContainerFinishData> pair =
startFinshDataMap.get(containerId);
if (pair == null) {
pair =
new StartFinishDataPair<ContainerStartData, ContainerFinishData>();
startFinshDataMap.put(containerId, pair);
}
if (start) {
pair.startData = parseContainerStartData(entry.value);
} else {
pair.finishData = parseContainerFinishData(entry.value);
}
}
}
@Override
public void applicationStarted(ApplicationStartData appStart)
throws IOException {
@ -828,14 +781,5 @@ public class FileSystemApplicationHistoryStore extends AbstractService
id = in.readUTF();
suffix = in.readUTF();
}
}
private static class StartFinishDataPair<S, F> {
private S startData;
private F finishData;
}
}

View File

@ -72,6 +72,12 @@ public class TestFileSystemApplicationHistoryStore extends
}
private void testWriteHistoryData(int num) throws IOException {
testWriteHistoryData(num, false, false);
}
private void testWriteHistoryData(
int num, boolean missingContainer, boolean missingApplicationAttempt)
throws IOException {
// write application history data
for (int i = 1; i <= num; ++i) {
ApplicationId appId = ApplicationId.newInstance(0, i);
@ -83,21 +89,31 @@ public class TestFileSystemApplicationHistoryStore extends
ApplicationAttemptId.newInstance(appId, j);
writeApplicationAttemptStartData(appAttemptId);
if (missingApplicationAttempt && j == num) {
continue;
}
// write container history data
for (int k = 1; k <= num; ++k) {
ContainerId containerId = ContainerId.newInstance(appAttemptId, k);
writeContainerStartData(containerId);
if (missingContainer && k == num) {
continue;
}
writeContainerFinishData(containerId);
}
writeApplicationAttemptFinishData(appAttemptId);
}
}
writeApplicationFinishData(appId);
}
}
private void testReadHistoryData(int num) throws IOException {
testReadHistoryData(num, false, false);
}
private void testReadHistoryData(
int num, boolean missingContainer, boolean missingApplicationAttempt)
throws IOException {
// read application history data
Assert.assertEquals(num, store.getAllApplications().size());
for (int i = 1; i <= num; ++i) {
@ -116,8 +132,14 @@ public class TestFileSystemApplicationHistoryStore extends
store.getApplicationAttempt(appAttemptId);
Assert.assertNotNull(attemptData);
Assert.assertEquals(appAttemptId.toString(), attemptData.getHost());
if (missingApplicationAttempt && j == num) {
Assert.assertNull(attemptData.getDiagnosticsInfo());
continue;
} else {
Assert.assertEquals(appAttemptId.toString(),
attemptData.getDiagnosticsInfo());
}
// read container history data
Assert.assertEquals(num, store.getContainers(appAttemptId).size());
@ -127,9 +149,13 @@ public class TestFileSystemApplicationHistoryStore extends
Assert.assertNotNull(containerData);
Assert.assertEquals(Priority.newInstance(containerId.getId()),
containerData.getPriority());
if (missingContainer && k == num) {
Assert.assertNull(containerData.getDiagnosticsInfo());
} else {
Assert.assertEquals(containerId.toString(),
containerData.getDiagnosticsInfo());
}
}
ContainerHistoryData masterContainer =
store.getAMContainer(appAttemptId);
Assert.assertNotNull(masterContainer);
@ -193,4 +219,15 @@ public class TestFileSystemApplicationHistoryStore extends
Assert.assertTrue((usedDiskAfter - usedDiskBefore) < 20);
}
@Test
public void testMissingContainerHistoryData() throws IOException {
testWriteHistoryData(3, true, false);
testReadHistoryData(3, true, false);
}
@Test
public void testMissingApplicationAttemptHistoryData() throws IOException {
testWriteHistoryData(3, false, true);
testReadHistoryData(3, false, true);
}
}