YARN-7083. Log aggregation deletes/renames while file is open. Contributed by Jason Lowe.
This commit is contained in:
parent
fe4aff7330
commit
79294b5f32
@ -320,53 +320,54 @@ private void uploadLogsForContainers(boolean appFinished) {
|
||||
logAggregationTimes++;
|
||||
String diagnosticMessage = "";
|
||||
boolean logAggregationSucceedInThisCycle = true;
|
||||
try (LogWriter writer = new LogWriter()){
|
||||
try {
|
||||
writer.initialize(this.conf, this.remoteNodeTmpLogFileForApp,
|
||||
this.userUgi);
|
||||
// Write ACLs once when the writer is created.
|
||||
writer.writeApplicationACLs(appAcls);
|
||||
writer.writeApplicationOwner(this.userUgi.getShortUserName());
|
||||
} catch (IOException e1) {
|
||||
logAggregationSucceedInThisCycle = false;
|
||||
LOG.error("Cannot create writer for app " + this.applicationId
|
||||
+ ". Skip log upload this time. ", e1);
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
boolean uploadedLogsInThisCycle = false;
|
||||
for (ContainerId container : pendingContainerInThisCycle) {
|
||||
ContainerLogAggregator aggregator = null;
|
||||
if (containerLogAggregators.containsKey(container)) {
|
||||
aggregator = containerLogAggregators.get(container);
|
||||
} else {
|
||||
aggregator = new ContainerLogAggregator(container);
|
||||
containerLogAggregators.put(container, aggregator);
|
||||
}
|
||||
Set<Path> uploadedFilePathsInThisCycle =
|
||||
aggregator.doContainerLogAggregation(writer, appFinished,
|
||||
finishedContainers.contains(container));
|
||||
if (uploadedFilePathsInThisCycle.size() > 0) {
|
||||
uploadedLogsInThisCycle = true;
|
||||
this.delService.delete(this.userUgi.getShortUserName(), null,
|
||||
uploadedFilePathsInThisCycle
|
||||
.toArray(new Path[uploadedFilePathsInThisCycle.size()]));
|
||||
try (LogWriter writer = new LogWriter()){
|
||||
try {
|
||||
writer.initialize(this.conf, this.remoteNodeTmpLogFileForApp,
|
||||
this.userUgi);
|
||||
// Write ACLs once when the writer is created.
|
||||
writer.writeApplicationACLs(appAcls);
|
||||
writer.writeApplicationOwner(this.userUgi.getShortUserName());
|
||||
} catch (IOException e1) {
|
||||
logAggregationSucceedInThisCycle = false;
|
||||
LOG.error("Cannot create writer for app " + this.applicationId
|
||||
+ ". Skip log upload this time. ", e1);
|
||||
return;
|
||||
}
|
||||
|
||||
// This container is finished, and all its logs have been uploaded,
|
||||
// remove it from containerLogAggregators.
|
||||
if (finishedContainers.contains(container)) {
|
||||
containerLogAggregators.remove(container);
|
||||
for (ContainerId container : pendingContainerInThisCycle) {
|
||||
ContainerLogAggregator aggregator = null;
|
||||
if (containerLogAggregators.containsKey(container)) {
|
||||
aggregator = containerLogAggregators.get(container);
|
||||
} else {
|
||||
aggregator = new ContainerLogAggregator(container);
|
||||
containerLogAggregators.put(container, aggregator);
|
||||
}
|
||||
Set<Path> uploadedFilePathsInThisCycle =
|
||||
aggregator.doContainerLogAggregation(writer, appFinished,
|
||||
finishedContainers.contains(container));
|
||||
if (uploadedFilePathsInThisCycle.size() > 0) {
|
||||
uploadedLogsInThisCycle = true;
|
||||
this.delService.delete(this.userUgi.getShortUserName(), null,
|
||||
uploadedFilePathsInThisCycle
|
||||
.toArray(new Path[uploadedFilePathsInThisCycle.size()]));
|
||||
}
|
||||
|
||||
// This container is finished, and all its logs have been uploaded,
|
||||
// remove it from containerLogAggregators.
|
||||
if (finishedContainers.contains(container)) {
|
||||
containerLogAggregators.remove(container);
|
||||
}
|
||||
}
|
||||
|
||||
// Before upload logs, make sure the number of existing logs
|
||||
// is smaller than the configured NM log aggregation retention size.
|
||||
if (uploadedLogsInThisCycle && logAggregationInRolling) {
|
||||
cleanOldLogs();
|
||||
cleanupOldLogTimes++;
|
||||
}
|
||||
}
|
||||
|
||||
// Before upload logs, make sure the number of existing logs
|
||||
// is smaller than the configured NM log aggregation retention size.
|
||||
if (uploadedLogsInThisCycle && logAggregationInRolling) {
|
||||
cleanOldLogs();
|
||||
cleanupOldLogTimes++;
|
||||
}
|
||||
|
||||
long currentTime = System.currentTimeMillis();
|
||||
final Path renamedPath = this.rollingMonitorInterval <= 0
|
||||
? remoteNodeLogFileForApp : new Path(
|
||||
|
Loading…
x
Reference in New Issue
Block a user