YARN-1578. Fixed reading incomplete application attempt and container data in FileSystemApplicationHistoryStore. Contributed by Shinichi Yamashita.
svn merge --ignore-ancestry -c 1567816 ../../trunk/ git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1567817 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
3b1dcfd7a6
commit
c244093fe2
|
@ -244,6 +244,9 @@ Release 2.4.0 - UNRELEASED
|
|||
YARN-1692. ConcurrentModificationException in fair scheduler AppSchedulable
|
||||
(Sangjin Lee via Sandy Ryza)
|
||||
|
||||
YARN-1578. Fixed reading incomplete application attempt and container data
|
||||
in FileSystemApplicationHistoryStore. (Shinichi Yamashita via zjshen)
|
||||
|
||||
Release 2.3.1 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -215,17 +215,30 @@ public class FileSystemApplicationHistoryStore extends AbstractService
|
|||
getApplicationAttempts(ApplicationId appId) throws IOException {
|
||||
Map<ApplicationAttemptId, ApplicationAttemptHistoryData> historyDataMap =
|
||||
new HashMap<ApplicationAttemptId, ApplicationAttemptHistoryData>();
|
||||
Map<ApplicationAttemptId, StartFinishDataPair<ApplicationAttemptStartData, ApplicationAttemptFinishData>> startFinshDataMap =
|
||||
new HashMap<ApplicationAttemptId, StartFinishDataPair<ApplicationAttemptStartData, ApplicationAttemptFinishData>>();
|
||||
HistoryFileReader hfReader = getHistoryFileReader(appId);
|
||||
try {
|
||||
while (hfReader.hasNext()) {
|
||||
HistoryFileReader.Entry entry = hfReader.next();
|
||||
if (entry.key.id.startsWith(ConverterUtils.APPLICATION_ATTEMPT_PREFIX)) {
|
||||
if (entry.key.id.startsWith(
|
||||
ConverterUtils.APPLICATION_ATTEMPT_PREFIX)) {
|
||||
ApplicationAttemptId appAttemptId =
|
||||
ConverterUtils.toApplicationAttemptId(entry.key.id);
|
||||
if (appAttemptId.getApplicationId().equals(appId)) {
|
||||
ApplicationAttemptHistoryData historyData =
|
||||
historyDataMap.get(appAttemptId);
|
||||
if (historyData == null) {
|
||||
historyData = ApplicationAttemptHistoryData.newInstance(
|
||||
appAttemptId, null, -1, null, null, null,
|
||||
FinalApplicationStatus.UNDEFINED, null);
|
||||
historyDataMap.put(appAttemptId, historyData);
|
||||
}
|
||||
if (entry.key.suffix.equals(START_DATA_SUFFIX)) {
|
||||
retrieveStartFinishData(appId, entry, startFinshDataMap, true);
|
||||
mergeApplicationAttemptHistoryData(historyData,
|
||||
parseApplicationAttemptStartData(entry.value));
|
||||
} else if (entry.key.suffix.equals(FINISH_DATA_SUFFIX)) {
|
||||
retrieveStartFinishData(appId, entry, startFinshDataMap, false);
|
||||
mergeApplicationAttemptHistoryData(historyData,
|
||||
parseApplicationAttemptFinishData(entry.value));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -237,45 +250,9 @@ public class FileSystemApplicationHistoryStore extends AbstractService
|
|||
} finally {
|
||||
hfReader.close();
|
||||
}
|
||||
for (Map.Entry<ApplicationAttemptId, StartFinishDataPair<ApplicationAttemptStartData, ApplicationAttemptFinishData>> entry : startFinshDataMap
|
||||
.entrySet()) {
|
||||
ApplicationAttemptHistoryData historyData =
|
||||
ApplicationAttemptHistoryData.newInstance(entry.getKey(), null, -1,
|
||||
null, null, null, FinalApplicationStatus.UNDEFINED, null);
|
||||
mergeApplicationAttemptHistoryData(historyData,
|
||||
entry.getValue().startData);
|
||||
mergeApplicationAttemptHistoryData(historyData,
|
||||
entry.getValue().finishData);
|
||||
historyDataMap.put(entry.getKey(), historyData);
|
||||
}
|
||||
return historyDataMap;
|
||||
}
|
||||
|
||||
private
|
||||
void
|
||||
retrieveStartFinishData(
|
||||
ApplicationId appId,
|
||||
HistoryFileReader.Entry entry,
|
||||
Map<ApplicationAttemptId, StartFinishDataPair<ApplicationAttemptStartData, ApplicationAttemptFinishData>> startFinshDataMap,
|
||||
boolean start) throws IOException {
|
||||
ApplicationAttemptId appAttemptId =
|
||||
ConverterUtils.toApplicationAttemptId(entry.key.id);
|
||||
if (appAttemptId.getApplicationId().equals(appId)) {
|
||||
StartFinishDataPair<ApplicationAttemptStartData, ApplicationAttemptFinishData> pair =
|
||||
startFinshDataMap.get(appAttemptId);
|
||||
if (pair == null) {
|
||||
pair =
|
||||
new StartFinishDataPair<ApplicationAttemptStartData, ApplicationAttemptFinishData>();
|
||||
startFinshDataMap.put(appAttemptId, pair);
|
||||
}
|
||||
if (start) {
|
||||
pair.startData = parseApplicationAttemptStartData(entry.value);
|
||||
} else {
|
||||
pair.finishData = parseApplicationAttemptFinishData(entry.value);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ApplicationAttemptHistoryData getApplicationAttempt(
|
||||
ApplicationAttemptId appAttemptId) throws IOException {
|
||||
|
@ -391,20 +368,30 @@ public class FileSystemApplicationHistoryStore extends AbstractService
|
|||
ApplicationAttemptId appAttemptId) throws IOException {
|
||||
Map<ContainerId, ContainerHistoryData> historyDataMap =
|
||||
new HashMap<ContainerId, ContainerHistoryData>();
|
||||
Map<ContainerId, StartFinishDataPair<ContainerStartData, ContainerFinishData>> startFinshDataMap =
|
||||
new HashMap<ContainerId, StartFinishDataPair<ContainerStartData, ContainerFinishData>>();
|
||||
HistoryFileReader hfReader =
|
||||
getHistoryFileReader(appAttemptId.getApplicationId());
|
||||
try {
|
||||
while (hfReader.hasNext()) {
|
||||
HistoryFileReader.Entry entry = hfReader.next();
|
||||
if (entry.key.id.startsWith(ConverterUtils.CONTAINER_PREFIX)) {
|
||||
ContainerId containerId =
|
||||
ConverterUtils.toContainerId(entry.key.id);
|
||||
if (containerId.getApplicationAttemptId().equals(appAttemptId)) {
|
||||
ContainerHistoryData historyData =
|
||||
historyDataMap.get(containerId);
|
||||
if (historyData == null) {
|
||||
historyData = ContainerHistoryData.newInstance(
|
||||
containerId, null, null, null, Long.MIN_VALUE,
|
||||
Long.MAX_VALUE, null, null, Integer.MAX_VALUE, null);
|
||||
historyDataMap.put(containerId, historyData);
|
||||
}
|
||||
if (entry.key.suffix.equals(START_DATA_SUFFIX)) {
|
||||
retrieveStartFinishData(appAttemptId, entry, startFinshDataMap,
|
||||
true);
|
||||
mergeContainerHistoryData(historyData,
|
||||
parseContainerStartData(entry.value));
|
||||
} else if (entry.key.suffix.equals(FINISH_DATA_SUFFIX)) {
|
||||
retrieveStartFinishData(appAttemptId, entry, startFinshDataMap,
|
||||
false);
|
||||
mergeContainerHistoryData(historyData,
|
||||
parseContainerFinishData(entry.value));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -416,43 +403,9 @@ public class FileSystemApplicationHistoryStore extends AbstractService
|
|||
} finally {
|
||||
hfReader.close();
|
||||
}
|
||||
for (Map.Entry<ContainerId, StartFinishDataPair<ContainerStartData, ContainerFinishData>> entry : startFinshDataMap
|
||||
.entrySet()) {
|
||||
ContainerHistoryData historyData =
|
||||
ContainerHistoryData
|
||||
.newInstance(entry.getKey(), null, null, null, Long.MIN_VALUE,
|
||||
Long.MAX_VALUE, null, null, Integer.MAX_VALUE, null);
|
||||
mergeContainerHistoryData(historyData, entry.getValue().startData);
|
||||
mergeContainerHistoryData(historyData, entry.getValue().finishData);
|
||||
historyDataMap.put(entry.getKey(), historyData);
|
||||
}
|
||||
return historyDataMap;
|
||||
}
|
||||
|
||||
private
|
||||
void
|
||||
retrieveStartFinishData(
|
||||
ApplicationAttemptId appAttemptId,
|
||||
HistoryFileReader.Entry entry,
|
||||
Map<ContainerId, StartFinishDataPair<ContainerStartData, ContainerFinishData>> startFinshDataMap,
|
||||
boolean start) throws IOException {
|
||||
ContainerId containerId = ConverterUtils.toContainerId(entry.key.id);
|
||||
if (containerId.getApplicationAttemptId().equals(appAttemptId)) {
|
||||
StartFinishDataPair<ContainerStartData, ContainerFinishData> pair =
|
||||
startFinshDataMap.get(containerId);
|
||||
if (pair == null) {
|
||||
pair =
|
||||
new StartFinishDataPair<ContainerStartData, ContainerFinishData>();
|
||||
startFinshDataMap.put(containerId, pair);
|
||||
}
|
||||
if (start) {
|
||||
pair.startData = parseContainerStartData(entry.value);
|
||||
} else {
|
||||
pair.finishData = parseContainerFinishData(entry.value);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void applicationStarted(ApplicationStartData appStart)
|
||||
throws IOException {
|
||||
|
@ -828,14 +781,5 @@ public class FileSystemApplicationHistoryStore extends AbstractService
|
|||
id = in.readUTF();
|
||||
suffix = in.readUTF();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private static class StartFinishDataPair<S, F> {
|
||||
|
||||
private S startData;
|
||||
private F finishData;
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -72,6 +72,12 @@ public class TestFileSystemApplicationHistoryStore extends
|
|||
}
|
||||
|
||||
private void testWriteHistoryData(int num) throws IOException {
|
||||
testWriteHistoryData(num, false, false);
|
||||
}
|
||||
|
||||
private void testWriteHistoryData(
|
||||
int num, boolean missingContainer, boolean missingApplicationAttempt)
|
||||
throws IOException {
|
||||
// write application history data
|
||||
for (int i = 1; i <= num; ++i) {
|
||||
ApplicationId appId = ApplicationId.newInstance(0, i);
|
||||
|
@ -83,21 +89,31 @@ public class TestFileSystemApplicationHistoryStore extends
|
|||
ApplicationAttemptId.newInstance(appId, j);
|
||||
writeApplicationAttemptStartData(appAttemptId);
|
||||
|
||||
if (missingApplicationAttempt && j == num) {
|
||||
continue;
|
||||
}
|
||||
// write container history data
|
||||
for (int k = 1; k <= num; ++k) {
|
||||
ContainerId containerId = ContainerId.newInstance(appAttemptId, k);
|
||||
writeContainerStartData(containerId);
|
||||
if (missingContainer && k == num) {
|
||||
continue;
|
||||
}
|
||||
writeContainerFinishData(containerId);
|
||||
|
||||
}
|
||||
writeApplicationAttemptFinishData(appAttemptId);
|
||||
}
|
||||
}
|
||||
|
||||
writeApplicationFinishData(appId);
|
||||
}
|
||||
}
|
||||
|
||||
private void testReadHistoryData(int num) throws IOException {
|
||||
testReadHistoryData(num, false, false);
|
||||
}
|
||||
|
||||
private void testReadHistoryData(
|
||||
int num, boolean missingContainer, boolean missingApplicationAttempt)
|
||||
throws IOException {
|
||||
// read application history data
|
||||
Assert.assertEquals(num, store.getAllApplications().size());
|
||||
for (int i = 1; i <= num; ++i) {
|
||||
|
@ -116,8 +132,14 @@ public class TestFileSystemApplicationHistoryStore extends
|
|||
store.getApplicationAttempt(appAttemptId);
|
||||
Assert.assertNotNull(attemptData);
|
||||
Assert.assertEquals(appAttemptId.toString(), attemptData.getHost());
|
||||
|
||||
if (missingApplicationAttempt && j == num) {
|
||||
Assert.assertNull(attemptData.getDiagnosticsInfo());
|
||||
continue;
|
||||
} else {
|
||||
Assert.assertEquals(appAttemptId.toString(),
|
||||
attemptData.getDiagnosticsInfo());
|
||||
}
|
||||
|
||||
// read container history data
|
||||
Assert.assertEquals(num, store.getContainers(appAttemptId).size());
|
||||
|
@ -127,9 +149,13 @@ public class TestFileSystemApplicationHistoryStore extends
|
|||
Assert.assertNotNull(containerData);
|
||||
Assert.assertEquals(Priority.newInstance(containerId.getId()),
|
||||
containerData.getPriority());
|
||||
if (missingContainer && k == num) {
|
||||
Assert.assertNull(containerData.getDiagnosticsInfo());
|
||||
} else {
|
||||
Assert.assertEquals(containerId.toString(),
|
||||
containerData.getDiagnosticsInfo());
|
||||
}
|
||||
}
|
||||
ContainerHistoryData masterContainer =
|
||||
store.getAMContainer(appAttemptId);
|
||||
Assert.assertNotNull(masterContainer);
|
||||
|
@ -193,4 +219,15 @@ public class TestFileSystemApplicationHistoryStore extends
|
|||
Assert.assertTrue((usedDiskAfter - usedDiskBefore) < 20);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMissingContainerHistoryData() throws IOException {
|
||||
testWriteHistoryData(3, true, false);
|
||||
testReadHistoryData(3, true, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMissingApplicationAttemptHistoryData() throws IOException {
|
||||
testWriteHistoryData(3, false, true);
|
||||
testReadHistoryData(3, false, true);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue