YARN-8584. Several typos in Log Aggregation related classes. Contributed by Szilard Nemeth.
(cherry picked from commit 2b39ad2698
)
This commit is contained in:
parent
6812535b0b
commit
8cd2a73777
|
@ -258,7 +258,7 @@ public class AggregatedLogDeletionService extends AbstractService {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
setLogAggCheckIntervalMsecs(retentionSecs);
|
setLogAggCheckIntervalMsecs(retentionSecs);
|
||||||
task = new LogDeletionTask(conf, retentionSecs, creatRMClient());
|
task = new LogDeletionTask(conf, retentionSecs, createRMClient());
|
||||||
timer = new Timer();
|
timer = new Timer();
|
||||||
timer.scheduleAtFixedRate(task, 0, checkIntervalMsecs);
|
timer.scheduleAtFixedRate(task, 0, checkIntervalMsecs);
|
||||||
}
|
}
|
||||||
|
@ -281,7 +281,7 @@ public class AggregatedLogDeletionService extends AbstractService {
|
||||||
// We have already marked ApplicationClientProtocol.getApplicationReport
|
// We have already marked ApplicationClientProtocol.getApplicationReport
|
||||||
// as @Idempotent, it will automatically take care of RM restart/failover.
|
// as @Idempotent, it will automatically take care of RM restart/failover.
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
protected ApplicationClientProtocol creatRMClient() throws IOException {
|
protected ApplicationClientProtocol createRMClient() throws IOException {
|
||||||
return ClientRMProxy.createRMProxy(getConfig(),
|
return ClientRMProxy.createRMProxy(getConfig(),
|
||||||
ApplicationClientProtocol.class);
|
ApplicationClientProtocol.class);
|
||||||
}
|
}
|
||||||
|
|
|
@ -177,7 +177,7 @@ public class AggregatedLogFormat {
|
||||||
* The set of log files that are older than retention policy that will
|
* The set of log files that are older than retention policy that will
|
||||||
* not be uploaded but ready for deletion.
|
* not be uploaded but ready for deletion.
|
||||||
*/
|
*/
|
||||||
private final Set<File> obseleteRetentionLogFiles = new HashSet<File>();
|
private final Set<File> obsoleteRetentionLogFiles = new HashSet<File>();
|
||||||
|
|
||||||
// TODO Maybe add a version string here. Instead of changing the version of
|
// TODO Maybe add a version string here. Instead of changing the version of
|
||||||
// the entire k-v format
|
// the entire k-v format
|
||||||
|
@ -323,7 +323,7 @@ public class AggregatedLogFormat {
|
||||||
// if log files are older than retention policy, do not upload them.
|
// if log files are older than retention policy, do not upload them.
|
||||||
// but schedule them for deletion.
|
// but schedule them for deletion.
|
||||||
if(logRetentionContext != null && !logRetentionContext.shouldRetainLog()){
|
if(logRetentionContext != null && !logRetentionContext.shouldRetainLog()){
|
||||||
obseleteRetentionLogFiles.addAll(candidates);
|
obsoleteRetentionLogFiles.addAll(candidates);
|
||||||
candidates.clear();
|
candidates.clear();
|
||||||
return candidates;
|
return candidates;
|
||||||
}
|
}
|
||||||
|
@ -395,9 +395,9 @@ public class AggregatedLogFormat {
|
||||||
return info;
|
return info;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Set<Path> getObseleteRetentionLogFiles() {
|
public Set<Path> getObsoleteRetentionLogFiles() {
|
||||||
Set<Path> path = new HashSet<Path>();
|
Set<Path> path = new HashSet<Path>();
|
||||||
for(File file: this.obseleteRetentionLogFiles) {
|
for(File file: this.obsoleteRetentionLogFiles) {
|
||||||
path.add(new Path(file.getAbsolutePath()));
|
path.add(new Path(file.getAbsolutePath()));
|
||||||
}
|
}
|
||||||
return path;
|
return path;
|
||||||
|
|
|
@ -115,16 +115,16 @@ public abstract class LogAggregationFileController {
|
||||||
*/
|
*/
|
||||||
public void initialize(Configuration conf, String controllerName) {
|
public void initialize(Configuration conf, String controllerName) {
|
||||||
this.conf = conf;
|
this.conf = conf;
|
||||||
int configuredRentionSize = conf.getInt(
|
int configuredRetentionSize = conf.getInt(
|
||||||
YarnConfiguration.NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP,
|
YarnConfiguration.NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP,
|
||||||
YarnConfiguration
|
YarnConfiguration
|
||||||
.DEFAULT_NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP);
|
.DEFAULT_NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP);
|
||||||
if (configuredRentionSize <= 0) {
|
if (configuredRetentionSize <= 0) {
|
||||||
this.retentionSize =
|
this.retentionSize =
|
||||||
YarnConfiguration
|
YarnConfiguration
|
||||||
.DEFAULT_NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP;
|
.DEFAULT_NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP;
|
||||||
} else {
|
} else {
|
||||||
this.retentionSize = configuredRentionSize;
|
this.retentionSize = configuredRetentionSize;
|
||||||
}
|
}
|
||||||
this.fileControllerName = controllerName;
|
this.fileControllerName = controllerName;
|
||||||
initInternal(conf);
|
initInternal(conf);
|
||||||
|
|
|
@ -187,8 +187,8 @@ public class IndexedFileAggregatedLogsBlock extends LogAggregationHtmlBlock {
|
||||||
FSDataInputStream fsin = fileContext.open(thisNodeFile.getPath());
|
FSDataInputStream fsin = fileContext.open(thisNodeFile.getPath());
|
||||||
int bufferSize = 65536;
|
int bufferSize = 65536;
|
||||||
for (IndexedFileLogMeta candidate : candidates) {
|
for (IndexedFileLogMeta candidate : candidates) {
|
||||||
if (candidate.getLastModificatedTime() < startTime
|
if (candidate.getLastModifiedTime() < startTime
|
||||||
|| candidate.getLastModificatedTime() > endTime) {
|
|| candidate.getLastModifiedTime() > endTime) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
byte[] cbuf = new byte[bufferSize];
|
byte[] cbuf = new byte[bufferSize];
|
||||||
|
@ -205,7 +205,7 @@ public class IndexedFileAggregatedLogsBlock extends LogAggregationHtmlBlock {
|
||||||
html.pre().__("\n\n").__();
|
html.pre().__("\n\n").__();
|
||||||
html.p().__("Log Type: " + candidate.getFileName()).__();
|
html.p().__("Log Type: " + candidate.getFileName()).__();
|
||||||
html.p().__("Log Upload Time: " + Times.format(
|
html.p().__("Log Upload Time: " + Times.format(
|
||||||
candidate.getLastModificatedTime())).__();
|
candidate.getLastModifiedTime())).__();
|
||||||
html.p().__("Log Length: " + Long.toString(
|
html.p().__("Log Length: " + Long.toString(
|
||||||
logLength)).__();
|
logLength)).__();
|
||||||
long startIndex = start < 0
|
long startIndex = start < 0
|
||||||
|
|
|
@ -404,7 +404,7 @@ public class LogAggregationIndexedFileController
|
||||||
meta.setStartIndex(outputStreamState.getStartPos());
|
meta.setStartIndex(outputStreamState.getStartPos());
|
||||||
meta.setFileSize(fileLength);
|
meta.setFileSize(fileLength);
|
||||||
}
|
}
|
||||||
meta.setLastModificatedTime(logFile.lastModified());
|
meta.setLastModifiedTime(logFile.lastModified());
|
||||||
metas.add(meta);
|
metas.add(meta);
|
||||||
}
|
}
|
||||||
logsMetaInThisCycle.addContainerLogMeta(containerId, metas);
|
logsMetaInThisCycle.addContainerLogMeta(containerId, metas);
|
||||||
|
@ -499,12 +499,12 @@ public class LogAggregationIndexedFileController
|
||||||
.getRemoteNodeFileDir(conf, appId, logRequest.getAppOwner(),
|
.getRemoteNodeFileDir(conf, appId, logRequest.getAppOwner(),
|
||||||
this.remoteRootLogDir, this.remoteRootLogDirSuffix);
|
this.remoteRootLogDir, this.remoteRootLogDirSuffix);
|
||||||
if (!nodeFiles.hasNext()) {
|
if (!nodeFiles.hasNext()) {
|
||||||
throw new IOException("There is no available log fils for "
|
throw new IOException("There is no available log file for "
|
||||||
+ "application:" + appId);
|
+ "application:" + appId);
|
||||||
}
|
}
|
||||||
List<FileStatus> allFiles = getAllNodeFiles(nodeFiles, appId);
|
List<FileStatus> allFiles = getAllNodeFiles(nodeFiles, appId);
|
||||||
if (allFiles.isEmpty()) {
|
if (allFiles.isEmpty()) {
|
||||||
throw new IOException("There is no available log fils for "
|
throw new IOException("There is no available log file for "
|
||||||
+ "application:" + appId);
|
+ "application:" + appId);
|
||||||
}
|
}
|
||||||
Map<String, Long> checkSumFiles = parseCheckSumFiles(allFiles);
|
Map<String, Long> checkSumFiles = parseCheckSumFiles(allFiles);
|
||||||
|
@ -581,7 +581,7 @@ public class LogAggregationIndexedFileController
|
||||||
decompressor, getFSInputBufferSize(conf));
|
decompressor, getFSInputBufferSize(conf));
|
||||||
LogToolUtils.outputContainerLog(candidate.getContainerId(),
|
LogToolUtils.outputContainerLog(candidate.getContainerId(),
|
||||||
nodeName, candidate.getFileName(), candidate.getFileSize(), size,
|
nodeName, candidate.getFileName(), candidate.getFileSize(), size,
|
||||||
Times.format(candidate.getLastModificatedTime()),
|
Times.format(candidate.getLastModifiedTime()),
|
||||||
in, os, buf, ContainerLogAggregationType.AGGREGATED);
|
in, os, buf, ContainerLogAggregationType.AGGREGATED);
|
||||||
byte[] b = aggregatedLogSuffix(candidate.getFileName())
|
byte[] b = aggregatedLogSuffix(candidate.getFileName())
|
||||||
.getBytes(Charset.forName("UTF-8"));
|
.getBytes(Charset.forName("UTF-8"));
|
||||||
|
@ -618,12 +618,12 @@ public class LogAggregationIndexedFileController
|
||||||
.getRemoteNodeFileDir(conf, appId, appOwner, this.remoteRootLogDir,
|
.getRemoteNodeFileDir(conf, appId, appOwner, this.remoteRootLogDir,
|
||||||
this.remoteRootLogDirSuffix);
|
this.remoteRootLogDirSuffix);
|
||||||
if (!nodeFiles.hasNext()) {
|
if (!nodeFiles.hasNext()) {
|
||||||
throw new IOException("There is no available log fils for "
|
throw new IOException("There is no available log file for "
|
||||||
+ "application:" + appId);
|
+ "application:" + appId);
|
||||||
}
|
}
|
||||||
List<FileStatus> allFiles = getAllNodeFiles(nodeFiles, appId);
|
List<FileStatus> allFiles = getAllNodeFiles(nodeFiles, appId);
|
||||||
if (allFiles.isEmpty()) {
|
if (allFiles.isEmpty()) {
|
||||||
throw new IOException("There is no available log fils for "
|
throw new IOException("There is no available log file for "
|
||||||
+ "application:" + appId);
|
+ "application:" + appId);
|
||||||
}
|
}
|
||||||
Map<String, Long> checkSumFiles = parseCheckSumFiles(allFiles);
|
Map<String, Long> checkSumFiles = parseCheckSumFiles(allFiles);
|
||||||
|
@ -660,7 +660,7 @@ public class LogAggregationIndexedFileController
|
||||||
for (IndexedFileLogMeta aMeta : log.getValue()) {
|
for (IndexedFileLogMeta aMeta : log.getValue()) {
|
||||||
meta.addLogMeta(aMeta.getFileName(), Long.toString(
|
meta.addLogMeta(aMeta.getFileName(), Long.toString(
|
||||||
aMeta.getFileSize()),
|
aMeta.getFileSize()),
|
||||||
Times.format(aMeta.getLastModificatedTime()));
|
Times.format(aMeta.getLastModifiedTime()));
|
||||||
}
|
}
|
||||||
containersLogMeta.add(meta);
|
containersLogMeta.add(meta);
|
||||||
}
|
}
|
||||||
|
@ -671,7 +671,7 @@ public class LogAggregationIndexedFileController
|
||||||
logMeta.getContainerLogMeta(containerIdStr)) {
|
logMeta.getContainerLogMeta(containerIdStr)) {
|
||||||
meta.addLogMeta(log.getFileName(), Long.toString(
|
meta.addLogMeta(log.getFileName(), Long.toString(
|
||||||
log.getFileSize()),
|
log.getFileSize()),
|
||||||
Times.format(log.getLastModificatedTime()));
|
Times.format(log.getLastModifiedTime()));
|
||||||
}
|
}
|
||||||
containersLogMeta.add(meta);
|
containersLogMeta.add(meta);
|
||||||
}
|
}
|
||||||
|
@ -1002,7 +1002,7 @@ public class LogAggregationIndexedFileController
|
||||||
private String fileName;
|
private String fileName;
|
||||||
private long fileSize;
|
private long fileSize;
|
||||||
private long fileCompressedSize;
|
private long fileCompressedSize;
|
||||||
private long lastModificatedTime;
|
private long lastModifiedTime;
|
||||||
private long startIndex;
|
private long startIndex;
|
||||||
|
|
||||||
public String getFileName() {
|
public String getFileName() {
|
||||||
|
@ -1026,11 +1026,11 @@ public class LogAggregationIndexedFileController
|
||||||
this.fileCompressedSize = fileCompressedSize;
|
this.fileCompressedSize = fileCompressedSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
public long getLastModificatedTime() {
|
public long getLastModifiedTime() {
|
||||||
return lastModificatedTime;
|
return lastModifiedTime;
|
||||||
}
|
}
|
||||||
public void setLastModificatedTime(long lastModificatedTime) {
|
public void setLastModifiedTime(long lastModifiedTime) {
|
||||||
this.lastModificatedTime = lastModificatedTime;
|
this.lastModifiedTime = lastModifiedTime;
|
||||||
}
|
}
|
||||||
|
|
||||||
public long getStartIndex() {
|
public long getStartIndex() {
|
||||||
|
|
|
@ -268,7 +268,7 @@ public class LogAggregationTFileController
|
||||||
RemoteIterator<FileStatus> nodeFiles = LogAggregationUtils
|
RemoteIterator<FileStatus> nodeFiles = LogAggregationUtils
|
||||||
.getRemoteNodeFileDir(conf, appId, appOwner);
|
.getRemoteNodeFileDir(conf, appId, appOwner);
|
||||||
if (nodeFiles == null) {
|
if (nodeFiles == null) {
|
||||||
throw new IOException("There is no available log fils for "
|
throw new IOException("There is no available log file for "
|
||||||
+ "application:" + appId);
|
+ "application:" + appId);
|
||||||
}
|
}
|
||||||
while (nodeFiles.hasNext()) {
|
while (nodeFiles.hasNext()) {
|
||||||
|
|
|
@ -160,7 +160,7 @@ public class TestAggregatedLogDeletionService {
|
||||||
AggregatedLogDeletionService deletionService =
|
AggregatedLogDeletionService deletionService =
|
||||||
new AggregatedLogDeletionService() {
|
new AggregatedLogDeletionService() {
|
||||||
@Override
|
@Override
|
||||||
protected ApplicationClientProtocol creatRMClient()
|
protected ApplicationClientProtocol createRMClient()
|
||||||
throws IOException {
|
throws IOException {
|
||||||
try {
|
try {
|
||||||
return createMockRMClient(finishedApplications,
|
return createMockRMClient(finishedApplications,
|
||||||
|
@ -262,7 +262,7 @@ public class TestAggregatedLogDeletionService {
|
||||||
return conf;
|
return conf;
|
||||||
}
|
}
|
||||||
@Override
|
@Override
|
||||||
protected ApplicationClientProtocol creatRMClient()
|
protected ApplicationClientProtocol createRMClient()
|
||||||
throws IOException {
|
throws IOException {
|
||||||
try {
|
try {
|
||||||
return createMockRMClient(finishedApplications, null);
|
return createMockRMClient(finishedApplications, null);
|
||||||
|
@ -353,7 +353,7 @@ public class TestAggregatedLogDeletionService {
|
||||||
AggregatedLogDeletionService deletionSvc =
|
AggregatedLogDeletionService deletionSvc =
|
||||||
new AggregatedLogDeletionService() {
|
new AggregatedLogDeletionService() {
|
||||||
@Override
|
@Override
|
||||||
protected ApplicationClientProtocol creatRMClient()
|
protected ApplicationClientProtocol createRMClient()
|
||||||
throws IOException {
|
throws IOException {
|
||||||
try {
|
try {
|
||||||
return createMockRMClient(finishedApplications, null);
|
return createMockRMClient(finishedApplications, null);
|
||||||
|
|
|
@ -605,7 +605,7 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
|
||||||
|
|
||||||
// need to return files uploaded or older-than-retention clean up.
|
// need to return files uploaded or older-than-retention clean up.
|
||||||
return Sets.union(logValue.getCurrentUpLoadedFilesPath(),
|
return Sets.union(logValue.getCurrentUpLoadedFilesPath(),
|
||||||
logValue.getObseleteRetentionLogFiles());
|
logValue.getObsoleteRetentionLogFiles());
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -117,9 +117,9 @@ public class LogAggregationService extends AbstractService implements
|
||||||
LOG.info("Log aggregation debug mode enabled. rollingMonitorInterval = "
|
LOG.info("Log aggregation debug mode enabled. rollingMonitorInterval = "
|
||||||
+ rollingMonitorInterval);
|
+ rollingMonitorInterval);
|
||||||
} else {
|
} else {
|
||||||
LOG.warn("rollingMonitorIntervall should be more than or equal to "
|
LOG.warn("rollingMonitorInterval should be more than or equal to {} " +
|
||||||
+ MIN_LOG_ROLLING_INTERVAL + " seconds. Using "
|
"seconds. Using {} seconds instead.",
|
||||||
+ MIN_LOG_ROLLING_INTERVAL + " seconds instead.");
|
MIN_LOG_ROLLING_INTERVAL, MIN_LOG_ROLLING_INTERVAL);
|
||||||
this.rollingMonitorInterval = MIN_LOG_ROLLING_INTERVAL;
|
this.rollingMonitorInterval = MIN_LOG_ROLLING_INTERVAL;
|
||||||
}
|
}
|
||||||
} else if (rollingMonitorInterval <= 0) {
|
} else if (rollingMonitorInterval <= 0) {
|
||||||
|
|
|
@ -110,7 +110,7 @@ public class NMLogAggregationStatusTracker extends CompositeService {
|
||||||
LogAggregationStatus logAggregationStatus, long updateTime,
|
LogAggregationStatus logAggregationStatus, long updateTime,
|
||||||
String diagnosis, boolean finalized) {
|
String diagnosis, boolean finalized) {
|
||||||
if (disabled) {
|
if (disabled) {
|
||||||
LOG.warn("The log aggregation is diabled. No need to update "
|
LOG.warn("The log aggregation is disabled. No need to update "
|
||||||
+ "the log aggregation status");
|
+ "the log aggregation status");
|
||||||
}
|
}
|
||||||
// In NM, each application has exactly one appLogAggregator thread
|
// In NM, each application has exactly one appLogAggregator thread
|
||||||
|
@ -164,7 +164,7 @@ public class NMLogAggregationStatusTracker extends CompositeService {
|
||||||
public List<LogAggregationReport> pullCachedLogAggregationReports() {
|
public List<LogAggregationReport> pullCachedLogAggregationReports() {
|
||||||
List<LogAggregationReport> reports = new ArrayList<>();
|
List<LogAggregationReport> reports = new ArrayList<>();
|
||||||
if (disabled) {
|
if (disabled) {
|
||||||
LOG.warn("The log aggregation is diabled."
|
LOG.warn("The log aggregation is disabled."
|
||||||
+ "There is no cached log aggregation status.");
|
+ "There is no cached log aggregation status.");
|
||||||
return reports;
|
return reports;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue