YARN-8584. Several typos in Log Aggregation related classes. Contributed by Szilard Nemeth.

This commit is contained in:
bibinchundatt 2018-07-30 23:25:19 +05:30
parent e8f952ef06
commit 2b39ad2698
10 changed files with 35 additions and 35 deletions

View File

@ -258,7 +258,7 @@ public class AggregatedLogDeletionService extends AbstractService {
return; return;
} }
setLogAggCheckIntervalMsecs(retentionSecs); setLogAggCheckIntervalMsecs(retentionSecs);
task = new LogDeletionTask(conf, retentionSecs, creatRMClient()); task = new LogDeletionTask(conf, retentionSecs, createRMClient());
timer = new Timer(); timer = new Timer();
timer.scheduleAtFixedRate(task, 0, checkIntervalMsecs); timer.scheduleAtFixedRate(task, 0, checkIntervalMsecs);
} }
@ -281,7 +281,7 @@ public class AggregatedLogDeletionService extends AbstractService {
// We have already marked ApplicationClientProtocol.getApplicationReport // We have already marked ApplicationClientProtocol.getApplicationReport
// as @Idempotent, it will automatically take care of RM restart/failover. // as @Idempotent, it will automatically take care of RM restart/failover.
@VisibleForTesting @VisibleForTesting
protected ApplicationClientProtocol creatRMClient() throws IOException { protected ApplicationClientProtocol createRMClient() throws IOException {
return ClientRMProxy.createRMProxy(getConfig(), return ClientRMProxy.createRMProxy(getConfig(),
ApplicationClientProtocol.class); ApplicationClientProtocol.class);
} }

View File

@ -178,7 +178,7 @@ public class AggregatedLogFormat {
* The set of log files that are older than retention policy that will * The set of log files that are older than retention policy that will
* not be uploaded but ready for deletion. * not be uploaded but ready for deletion.
*/ */
private final Set<File> obseleteRetentionLogFiles = new HashSet<File>(); private final Set<File> obsoleteRetentionLogFiles = new HashSet<File>();
// TODO Maybe add a version string here. Instead of changing the version of // TODO Maybe add a version string here. Instead of changing the version of
// the entire k-v format // the entire k-v format
@ -324,7 +324,7 @@ public class AggregatedLogFormat {
// if log files are older than retention policy, do not upload them. // if log files are older than retention policy, do not upload them.
// but schedule them for deletion. // but schedule them for deletion.
if(logRetentionContext != null && !logRetentionContext.shouldRetainLog()){ if(logRetentionContext != null && !logRetentionContext.shouldRetainLog()){
obseleteRetentionLogFiles.addAll(candidates); obsoleteRetentionLogFiles.addAll(candidates);
candidates.clear(); candidates.clear();
return candidates; return candidates;
} }
@ -396,9 +396,9 @@ public class AggregatedLogFormat {
return info; return info;
} }
public Set<Path> getObseleteRetentionLogFiles() { public Set<Path> getObsoleteRetentionLogFiles() {
Set<Path> path = new HashSet<Path>(); Set<Path> path = new HashSet<Path>();
for(File file: this.obseleteRetentionLogFiles) { for(File file: this.obsoleteRetentionLogFiles) {
path.add(new Path(file.getAbsolutePath())); path.add(new Path(file.getAbsolutePath()));
} }
return path; return path;

View File

@ -115,16 +115,16 @@ public abstract class LogAggregationFileController {
*/ */
public void initialize(Configuration conf, String controllerName) { public void initialize(Configuration conf, String controllerName) {
this.conf = conf; this.conf = conf;
int configuredRentionSize = conf.getInt( int configuredRetentionSize = conf.getInt(
YarnConfiguration.NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP, YarnConfiguration.NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP,
YarnConfiguration YarnConfiguration
.DEFAULT_NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP); .DEFAULT_NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP);
if (configuredRentionSize <= 0) { if (configuredRetentionSize <= 0) {
this.retentionSize = this.retentionSize =
YarnConfiguration YarnConfiguration
.DEFAULT_NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP; .DEFAULT_NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP;
} else { } else {
this.retentionSize = configuredRentionSize; this.retentionSize = configuredRetentionSize;
} }
this.fileControllerName = controllerName; this.fileControllerName = controllerName;
initInternal(conf); initInternal(conf);

View File

@ -187,8 +187,8 @@ public class IndexedFileAggregatedLogsBlock extends LogAggregationHtmlBlock {
FSDataInputStream fsin = fileContext.open(thisNodeFile.getPath()); FSDataInputStream fsin = fileContext.open(thisNodeFile.getPath());
int bufferSize = 65536; int bufferSize = 65536;
for (IndexedFileLogMeta candidate : candidates) { for (IndexedFileLogMeta candidate : candidates) {
if (candidate.getLastModificatedTime() < startTime if (candidate.getLastModifiedTime() < startTime
|| candidate.getLastModificatedTime() > endTime) { || candidate.getLastModifiedTime() > endTime) {
continue; continue;
} }
byte[] cbuf = new byte[bufferSize]; byte[] cbuf = new byte[bufferSize];
@ -205,7 +205,7 @@ public class IndexedFileAggregatedLogsBlock extends LogAggregationHtmlBlock {
html.pre().__("\n\n").__(); html.pre().__("\n\n").__();
html.p().__("Log Type: " + candidate.getFileName()).__(); html.p().__("Log Type: " + candidate.getFileName()).__();
html.p().__("Log Upload Time: " + Times.format( html.p().__("Log Upload Time: " + Times.format(
candidate.getLastModificatedTime())).__(); candidate.getLastModifiedTime())).__();
html.p().__("Log Length: " + Long.toString( html.p().__("Log Length: " + Long.toString(
logLength)).__(); logLength)).__();
long startIndex = start < 0 long startIndex = start < 0

View File

@ -404,7 +404,7 @@ public class LogAggregationIndexedFileController
meta.setStartIndex(outputStreamState.getStartPos()); meta.setStartIndex(outputStreamState.getStartPos());
meta.setFileSize(fileLength); meta.setFileSize(fileLength);
} }
meta.setLastModificatedTime(logFile.lastModified()); meta.setLastModifiedTime(logFile.lastModified());
metas.add(meta); metas.add(meta);
} }
logsMetaInThisCycle.addContainerLogMeta(containerId, metas); logsMetaInThisCycle.addContainerLogMeta(containerId, metas);
@ -499,12 +499,12 @@ public class LogAggregationIndexedFileController
.getRemoteNodeFileDir(conf, appId, logRequest.getAppOwner(), .getRemoteNodeFileDir(conf, appId, logRequest.getAppOwner(),
this.remoteRootLogDir, this.remoteRootLogDirSuffix); this.remoteRootLogDir, this.remoteRootLogDirSuffix);
if (!nodeFiles.hasNext()) { if (!nodeFiles.hasNext()) {
throw new IOException("There is no available log fils for " throw new IOException("There is no available log file for "
+ "application:" + appId); + "application:" + appId);
} }
List<FileStatus> allFiles = getAllNodeFiles(nodeFiles, appId); List<FileStatus> allFiles = getAllNodeFiles(nodeFiles, appId);
if (allFiles.isEmpty()) { if (allFiles.isEmpty()) {
throw new IOException("There is no available log fils for " throw new IOException("There is no available log file for "
+ "application:" + appId); + "application:" + appId);
} }
Map<String, Long> checkSumFiles = parseCheckSumFiles(allFiles); Map<String, Long> checkSumFiles = parseCheckSumFiles(allFiles);
@ -581,7 +581,7 @@ public class LogAggregationIndexedFileController
decompressor, getFSInputBufferSize(conf)); decompressor, getFSInputBufferSize(conf));
LogToolUtils.outputContainerLog(candidate.getContainerId(), LogToolUtils.outputContainerLog(candidate.getContainerId(),
nodeName, candidate.getFileName(), candidate.getFileSize(), size, nodeName, candidate.getFileName(), candidate.getFileSize(), size,
Times.format(candidate.getLastModificatedTime()), Times.format(candidate.getLastModifiedTime()),
in, os, buf, ContainerLogAggregationType.AGGREGATED); in, os, buf, ContainerLogAggregationType.AGGREGATED);
byte[] b = aggregatedLogSuffix(candidate.getFileName()) byte[] b = aggregatedLogSuffix(candidate.getFileName())
.getBytes(Charset.forName("UTF-8")); .getBytes(Charset.forName("UTF-8"));
@ -618,12 +618,12 @@ public class LogAggregationIndexedFileController
.getRemoteNodeFileDir(conf, appId, appOwner, this.remoteRootLogDir, .getRemoteNodeFileDir(conf, appId, appOwner, this.remoteRootLogDir,
this.remoteRootLogDirSuffix); this.remoteRootLogDirSuffix);
if (!nodeFiles.hasNext()) { if (!nodeFiles.hasNext()) {
throw new IOException("There is no available log fils for " throw new IOException("There is no available log file for "
+ "application:" + appId); + "application:" + appId);
} }
List<FileStatus> allFiles = getAllNodeFiles(nodeFiles, appId); List<FileStatus> allFiles = getAllNodeFiles(nodeFiles, appId);
if (allFiles.isEmpty()) { if (allFiles.isEmpty()) {
throw new IOException("There is no available log fils for " throw new IOException("There is no available log file for "
+ "application:" + appId); + "application:" + appId);
} }
Map<String, Long> checkSumFiles = parseCheckSumFiles(allFiles); Map<String, Long> checkSumFiles = parseCheckSumFiles(allFiles);
@ -660,7 +660,7 @@ public class LogAggregationIndexedFileController
for (IndexedFileLogMeta aMeta : log.getValue()) { for (IndexedFileLogMeta aMeta : log.getValue()) {
meta.addLogMeta(aMeta.getFileName(), Long.toString( meta.addLogMeta(aMeta.getFileName(), Long.toString(
aMeta.getFileSize()), aMeta.getFileSize()),
Times.format(aMeta.getLastModificatedTime())); Times.format(aMeta.getLastModifiedTime()));
} }
containersLogMeta.add(meta); containersLogMeta.add(meta);
} }
@ -671,7 +671,7 @@ public class LogAggregationIndexedFileController
logMeta.getContainerLogMeta(containerIdStr)) { logMeta.getContainerLogMeta(containerIdStr)) {
meta.addLogMeta(log.getFileName(), Long.toString( meta.addLogMeta(log.getFileName(), Long.toString(
log.getFileSize()), log.getFileSize()),
Times.format(log.getLastModificatedTime())); Times.format(log.getLastModifiedTime()));
} }
containersLogMeta.add(meta); containersLogMeta.add(meta);
} }
@ -1002,7 +1002,7 @@ public class LogAggregationIndexedFileController
private String fileName; private String fileName;
private long fileSize; private long fileSize;
private long fileCompressedSize; private long fileCompressedSize;
private long lastModificatedTime; private long lastModifiedTime;
private long startIndex; private long startIndex;
public String getFileName() { public String getFileName() {
@ -1026,11 +1026,11 @@ public class LogAggregationIndexedFileController
this.fileCompressedSize = fileCompressedSize; this.fileCompressedSize = fileCompressedSize;
} }
public long getLastModificatedTime() { public long getLastModifiedTime() {
return lastModificatedTime; return lastModifiedTime;
} }
public void setLastModificatedTime(long lastModificatedTime) { public void setLastModifiedTime(long lastModifiedTime) {
this.lastModificatedTime = lastModificatedTime; this.lastModifiedTime = lastModifiedTime;
} }
public long getStartIndex() { public long getStartIndex() {

View File

@ -275,7 +275,7 @@ public class LogAggregationTFileController
RemoteIterator<FileStatus> nodeFiles = LogAggregationUtils RemoteIterator<FileStatus> nodeFiles = LogAggregationUtils
.getRemoteNodeFileDir(conf, appId, appOwner); .getRemoteNodeFileDir(conf, appId, appOwner);
if (nodeFiles == null) { if (nodeFiles == null) {
throw new IOException("There is no available log fils for " throw new IOException("There is no available log file for "
+ "application:" + appId); + "application:" + appId);
} }
while (nodeFiles.hasNext()) { while (nodeFiles.hasNext()) {

View File

@ -160,7 +160,7 @@ public class TestAggregatedLogDeletionService {
AggregatedLogDeletionService deletionService = AggregatedLogDeletionService deletionService =
new AggregatedLogDeletionService() { new AggregatedLogDeletionService() {
@Override @Override
protected ApplicationClientProtocol creatRMClient() protected ApplicationClientProtocol createRMClient()
throws IOException { throws IOException {
try { try {
return createMockRMClient(finishedApplications, return createMockRMClient(finishedApplications,
@ -262,7 +262,7 @@ public class TestAggregatedLogDeletionService {
return conf; return conf;
} }
@Override @Override
protected ApplicationClientProtocol creatRMClient() protected ApplicationClientProtocol createRMClient()
throws IOException { throws IOException {
try { try {
return createMockRMClient(finishedApplications, null); return createMockRMClient(finishedApplications, null);
@ -353,7 +353,7 @@ public class TestAggregatedLogDeletionService {
AggregatedLogDeletionService deletionSvc = AggregatedLogDeletionService deletionSvc =
new AggregatedLogDeletionService() { new AggregatedLogDeletionService() {
@Override @Override
protected ApplicationClientProtocol creatRMClient() protected ApplicationClientProtocol createRMClient()
throws IOException { throws IOException {
try { try {
return createMockRMClient(finishedApplications, null); return createMockRMClient(finishedApplications, null);

View File

@ -632,7 +632,7 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
// need to return files uploaded or older-than-retention clean up. // need to return files uploaded or older-than-retention clean up.
return Sets.union(logValue.getCurrentUpLoadedFilesPath(), return Sets.union(logValue.getCurrentUpLoadedFilesPath(),
logValue.getObseleteRetentionLogFiles()); logValue.getObsoleteRetentionLogFiles());
} }
} }

View File

@ -117,9 +117,9 @@ public class LogAggregationService extends AbstractService implements
LOG.info("Log aggregation debug mode enabled. rollingMonitorInterval = " LOG.info("Log aggregation debug mode enabled. rollingMonitorInterval = "
+ rollingMonitorInterval); + rollingMonitorInterval);
} else { } else {
LOG.warn("rollingMonitorIntervall should be more than or equal to " LOG.warn("rollingMonitorInterval should be more than or equal to {} " +
+ MIN_LOG_ROLLING_INTERVAL + " seconds. Using " "seconds. Using {} seconds instead.",
+ MIN_LOG_ROLLING_INTERVAL + " seconds instead."); MIN_LOG_ROLLING_INTERVAL, MIN_LOG_ROLLING_INTERVAL);
this.rollingMonitorInterval = MIN_LOG_ROLLING_INTERVAL; this.rollingMonitorInterval = MIN_LOG_ROLLING_INTERVAL;
} }
} else if (rollingMonitorInterval <= 0) { } else if (rollingMonitorInterval <= 0) {

View File

@ -110,7 +110,7 @@ public class NMLogAggregationStatusTracker extends CompositeService {
LogAggregationStatus logAggregationStatus, long updateTime, LogAggregationStatus logAggregationStatus, long updateTime,
String diagnosis, boolean finalized) { String diagnosis, boolean finalized) {
if (disabled) { if (disabled) {
LOG.warn("The log aggregation is diabled. No need to update " LOG.warn("The log aggregation is disabled. No need to update "
+ "the log aggregation status"); + "the log aggregation status");
} }
// In NM, each application has exactly one appLogAggregator thread // In NM, each application has exactly one appLogAggregator thread
@ -164,7 +164,7 @@ public class NMLogAggregationStatusTracker extends CompositeService {
public List<LogAggregationReport> pullCachedLogAggregationReports() { public List<LogAggregationReport> pullCachedLogAggregationReports() {
List<LogAggregationReport> reports = new ArrayList<>(); List<LogAggregationReport> reports = new ArrayList<>();
if (disabled) { if (disabled) {
LOG.warn("The log aggregation is diabled." LOG.warn("The log aggregation is disabled."
+ "There is no cached log aggregation status."); + "There is no cached log aggregation status.");
return reports; return reports;
} }