YARN-2583. Modified AggregatedLogDeletionService to be able to delete rolling aggregated logs. Contributed by Xuan Gong.

This commit is contained in:
Zhijie Shen 2014-10-10 00:10:39 -07:00
parent d3afd730ac
commit cb81bac002
6 changed files with 467 additions and 68 deletions

View File

@ -333,6 +333,9 @@ Release 2.6.0 - UNRELEASED
YARN-2629. Made the distributed shell use the domain-based timeline ACLs.
(zjshen)
YARN-2583. Modified AggregatedLogDeletionService to be able to delete rolling
aggregated logs. (Xuan Gong via zjshen)
OPTIMIZATIONS
BUG FIXES

View File

@ -24,38 +24,53 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.ClientRMProxy;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.ConverterUtils;
import com.google.common.annotations.VisibleForTesting;
/**
* A service that periodically deletes aggregated logs.
*/
@Private
@InterfaceAudience.LimitedPrivate({"yarn", "mapreduce"})
public class AggregatedLogDeletionService extends AbstractService {
private static final Log LOG = LogFactory.getLog(AggregatedLogDeletionService.class);
private Timer timer = null;
private long checkIntervalMsecs;
private LogDeletionTask task;
static class LogDeletionTask extends TimerTask {
private Configuration conf;
private long retentionMillis;
private String suffix = null;
private Path remoteRootLogDir = null;
private ApplicationClientProtocol rmClient = null;
public LogDeletionTask(Configuration conf, long retentionSecs) {
public LogDeletionTask(Configuration conf, long retentionSecs, ApplicationClientProtocol rmClient) {
this.conf = conf;
this.retentionMillis = retentionSecs * 1000;
this.suffix = LogAggregationUtils.getRemoteNodeLogDirSuffix(conf);
this.remoteRootLogDir =
new Path(conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
this.rmClient = rmClient;
}
@Override
@ -64,11 +79,10 @@ public void run() {
LOG.info("aggregated log deletion started.");
try {
FileSystem fs = remoteRootLogDir.getFileSystem(conf);
for(FileStatus userDir : fs.listStatus(remoteRootLogDir)) {
if(userDir.isDirectory()) {
Path userDirPath = new Path(userDir.getPath(), suffix);
deleteOldLogDirsFrom(userDirPath, cutoffMillis, fs);
deleteOldLogDirsFrom(userDirPath, cutoffMillis, fs, rmClient);
}
}
} catch (IOException e) {
@ -79,18 +93,36 @@ public void run() {
}
private static void deleteOldLogDirsFrom(Path dir, long cutoffMillis,
FileSystem fs) {
FileSystem fs, ApplicationClientProtocol rmClient) {
try {
for(FileStatus appDir : fs.listStatus(dir)) {
if(appDir.isDirectory() &&
appDir.getModificationTime() < cutoffMillis) {
if(shouldDeleteLogDir(appDir, cutoffMillis, fs)) {
boolean appTerminated =
isApplicationTerminated(ConverterUtils.toApplicationId(appDir
.getPath().getName()), rmClient);
if(appTerminated && shouldDeleteLogDir(appDir, cutoffMillis, fs)) {
try {
LOG.info("Deleting aggregated logs in "+appDir.getPath());
fs.delete(appDir.getPath(), true);
} catch (IOException e) {
logIOException("Could not delete "+appDir.getPath(), e);
}
} else if (!appTerminated){
try {
for(FileStatus node: fs.listStatus(appDir.getPath())) {
if(node.getModificationTime() < cutoffMillis) {
try {
fs.delete(node.getPath(), true);
} catch (IOException ex) {
logIOException("Could not delete "+appDir.getPath(), ex);
}
}
}
} catch(IOException e) {
logIOException(
"Error reading the contents of " + appDir.getPath(), e);
}
}
}
}
@ -115,6 +147,29 @@ private static boolean shouldDeleteLogDir(FileStatus dir, long cutoffMillis,
}
return shouldDelete;
}
private static boolean isApplicationTerminated(ApplicationId appId,
ApplicationClientProtocol rmClient) throws IOException {
ApplicationReport appReport = null;
try {
appReport =
rmClient.getApplicationReport(
GetApplicationReportRequest.newInstance(appId))
.getApplicationReport();
} catch (ApplicationNotFoundException e) {
return true;
} catch (YarnException e) {
throw new IOException(e);
}
YarnApplicationState currentState = appReport.getYarnApplicationState();
return currentState == YarnApplicationState.FAILED
|| currentState == YarnApplicationState.KILLED
|| currentState == YarnApplicationState.FINISHED;
}
public ApplicationClientProtocol getRMClient() {
return this.rmClient;
}
}
private static void logIOException(String comment, IOException e) {
@ -140,6 +195,7 @@ protected void serviceStart() throws Exception {
@Override
protected void serviceStop() throws Exception {
stopRMClient();
stopTimer();
super.serviceStop();
}
@ -156,10 +212,11 @@ private void setLogAggCheckIntervalMsecs(long retentionSecs) {
}
}
public void refreshLogRetentionSettings() {
public void refreshLogRetentionSettings() throws IOException {
if (getServiceState() == STATE.STARTED) {
Configuration conf = createConf();
setConfig(conf);
stopRMClient();
stopTimer();
scheduleLogDeletionTask();
} else {
@ -167,7 +224,7 @@ public void refreshLogRetentionSettings() {
}
}
private void scheduleLogDeletionTask() {
private void scheduleLogDeletionTask() throws IOException {
Configuration conf = getConfig();
if (!conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED)) {
@ -183,7 +240,7 @@ private void scheduleLogDeletionTask() {
return;
}
setLogAggCheckIntervalMsecs(retentionSecs);
TimerTask task = new LogDeletionTask(conf, retentionSecs);
task = new LogDeletionTask(conf, retentionSecs, creatRMClient());
timer = new Timer();
timer.scheduleAtFixedRate(task, 0, checkIntervalMsecs);
}
@ -201,4 +258,20 @@ public long getCheckIntervalMsecs() {
protected Configuration createConf() {
return new Configuration();
}
// Directly create and use ApplicationClientProtocol.
// We have already marked ApplicationClientProtocol.getApplicationReport
// as @Idempotent, it will automatically take care of RM restart/failover.
@VisibleForTesting
protected ApplicationClientProtocol creatRMClient() throws IOException {
return ClientRMProxy.createRMProxy(getConfig(),
ApplicationClientProtocol.class);
}
@VisibleForTesting
protected void stopRMClient() {
if (task != null && task.getRMClient() != null) {
RPC.stopProxy(task.getRMClient());
}
}
}

View File

@ -20,6 +20,9 @@
import java.io.IOException;
import java.net.URI;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
@ -27,6 +30,12 @@
import org.apache.hadoop.fs.FilterFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.junit.Before;
import org.junit.Test;
@ -51,7 +60,7 @@ public void testDeletion() throws Exception {
String root = "mockfs://foo/";
String remoteRootLogDir = root+"tmp/logs";
String suffix = "logs";
Configuration conf = new Configuration();
final Configuration conf = new Configuration();
conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
conf.set(YarnConfiguration.LOG_AGGREGATION_ENABLED, "true");
conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, "1800");
@ -69,22 +78,37 @@ public void testDeletion() throws Exception {
when(mockFs.listStatus(remoteRootLogPath)).thenReturn(
new FileStatus[]{userDirStatus});
ApplicationId appId1 =
ApplicationId.newInstance(System.currentTimeMillis(), 1);
Path userLogDir = new Path(userDir, suffix);
Path app1Dir = new Path(userLogDir, "application_1_1");
Path app1Dir = new Path(userLogDir, appId1.toString());
FileStatus app1DirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, app1Dir);
Path app2Dir = new Path(userLogDir, "application_1_2");
ApplicationId appId2 =
ApplicationId.newInstance(System.currentTimeMillis(), 2);
Path app2Dir = new Path(userLogDir, appId2.toString());
FileStatus app2DirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, app2Dir);
Path app3Dir = new Path(userLogDir, "application_1_3");
ApplicationId appId3 =
ApplicationId.newInstance(System.currentTimeMillis(), 3);
Path app3Dir = new Path(userLogDir, appId3.toString());
FileStatus app3DirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, app3Dir);
Path app4Dir = new Path(userLogDir, "application_1_4");
ApplicationId appId4 =
ApplicationId.newInstance(System.currentTimeMillis(), 4);
Path app4Dir = new Path(userLogDir, appId4.toString());
FileStatus app4DirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, app4Dir);
ApplicationId appId5 =
ApplicationId.newInstance(System.currentTimeMillis(), 5);
Path app5Dir = new Path(userLogDir, appId5.toString());
FileStatus app5DirStatus =
new FileStatus(0, true, 0, 0, toDeleteTime, app5Dir);
when(mockFs.listStatus(userLogDir)).thenReturn(
new FileStatus[]{app1DirStatus, app2DirStatus, app3DirStatus, app4DirStatus});
new FileStatus[] { app1DirStatus, app2DirStatus, app3DirStatus,
app4DirStatus, app5DirStatus });
when(mockFs.listStatus(app1Dir)).thenReturn(
new FileStatus[]{});
@ -117,20 +141,55 @@ public void testDeletion() throws Exception {
when(mockFs.listStatus(app4Dir)).thenReturn(
new FileStatus[]{app4Log1Status, app4Log2Status});
Path app5Log1 = new Path(app5Dir, "host1");
FileStatus app5Log1Status = new FileStatus(10, false, 1, 1, toDeleteTime, app5Log1);
AggregatedLogDeletionService.LogDeletionTask task =
new AggregatedLogDeletionService.LogDeletionTask(conf, 1800);
task.run();
verify(mockFs).delete(app1Dir, true);
verify(mockFs, times(0)).delete(app2Dir, true);
verify(mockFs).delete(app3Dir, true);
verify(mockFs).delete(app4Dir, true);
Path app5Log2 = new Path(app5Dir, "host2");
FileStatus app5Log2Status = new FileStatus(10, false, 1, 1, toKeepTime, app5Log2);
when(mockFs.listStatus(app5Dir)).thenReturn(
new FileStatus[]{app5Log1Status, app5Log2Status});
final List<ApplicationId> finishedApplications =
Collections.unmodifiableList(Arrays.asList(appId1, appId2, appId3,
appId4));
final List<ApplicationId> runningApplications =
Collections.unmodifiableList(Arrays.asList(appId5));
AggregatedLogDeletionService deletionService =
new AggregatedLogDeletionService() {
@Override
protected ApplicationClientProtocol creatRMClient()
throws IOException {
try {
return createMockRMClient(finishedApplications,
runningApplications);
} catch (Exception e) {
throw new IOException(e);
}
}
@Override
protected void stopRMClient() {
// DO NOTHING
}
};
deletionService.init(conf);
deletionService.start();
verify(mockFs, timeout(2000)).delete(app1Dir, true);
verify(mockFs, timeout(2000).times(0)).delete(app2Dir, true);
verify(mockFs, timeout(2000)).delete(app3Dir, true);
verify(mockFs, timeout(2000)).delete(app4Dir, true);
verify(mockFs, timeout(2000).times(0)).delete(app5Dir, true);
verify(mockFs, timeout(2000)).delete(app5Log1, true);
verify(mockFs, timeout(2000).times(0)).delete(app5Log2, true);
deletionService.stop();
}
@Test
public void testRefreshLogRetentionSettings() throws IOException {
public void testRefreshLogRetentionSettings() throws Exception {
long now = System.currentTimeMillis();
//time before 2000 sec
long before2000Secs = now - (2000 * 1000);
@ -163,13 +222,17 @@ public void testRefreshLogRetentionSettings() throws IOException {
Path userLogDir = new Path(userDir, suffix);
ApplicationId appId1 =
ApplicationId.newInstance(System.currentTimeMillis(), 1);
//Set time last modified of app1Dir directory and its files to before2000Secs
Path app1Dir = new Path(userLogDir, "application_1_1");
Path app1Dir = new Path(userLogDir, appId1.toString());
FileStatus app1DirStatus = new FileStatus(0, true, 0, 0, before2000Secs,
app1Dir);
ApplicationId appId2 =
ApplicationId.newInstance(System.currentTimeMillis(), 2);
//Set time last modified of app1Dir directory and its files to before50Secs
Path app2Dir = new Path(userLogDir, "application_1_2");
Path app2Dir = new Path(userLogDir, appId2.toString());
FileStatus app2DirStatus = new FileStatus(0, true, 0, 0, before50Secs,
app2Dir);
@ -190,11 +253,27 @@ public void testRefreshLogRetentionSettings() throws IOException {
when(mockFs.listStatus(app2Dir)).thenReturn(
new FileStatus[] { app2Log1Status });
final List<ApplicationId> finishedApplications =
Collections.unmodifiableList(Arrays.asList(appId1, appId2));
AggregatedLogDeletionService deletionSvc = new AggregatedLogDeletionService() {
@Override
protected Configuration createConf() {
return conf;
}
@Override
protected ApplicationClientProtocol creatRMClient()
throws IOException {
try {
return createMockRMClient(finishedApplications, null);
} catch (Exception e) {
throw new IOException(e);
}
}
@Override
protected void stopRMClient() {
// DO NOTHING
}
};
deletionSvc.init(conf);
@ -253,8 +332,10 @@ public void testCheckInterval() throws Exception {
when(mockFs.listStatus(remoteRootLogPath)).thenReturn(
new FileStatus[]{userDirStatus});
ApplicationId appId1 =
ApplicationId.newInstance(System.currentTimeMillis(), 1);
Path userLogDir = new Path(userDir, suffix);
Path app1Dir = new Path(userLogDir, "application_1_1");
Path app1Dir = new Path(userLogDir, appId1.toString());
FileStatus app1DirStatus = new FileStatus(0, true, 0, 0, now, app1Dir);
when(mockFs.listStatus(userLogDir)).thenReturn(
@ -266,8 +347,25 @@ public void testCheckInterval() throws Exception {
when(mockFs.listStatus(app1Dir)).thenReturn(
new FileStatus[]{app1Log1Status});
final List<ApplicationId> finishedApplications =
Collections.unmodifiableList(Arrays.asList(appId1));
AggregatedLogDeletionService deletionSvc =
new AggregatedLogDeletionService();
new AggregatedLogDeletionService() {
@Override
protected ApplicationClientProtocol creatRMClient()
throws IOException {
try {
return createMockRMClient(finishedApplications, null);
} catch (Exception e) {
throw new IOException(e);
}
}
@Override
protected void stopRMClient() {
// DO NOTHING
}
};
deletionSvc.init(conf);
deletionSvc.start();
@ -286,11 +384,61 @@ public void testCheckInterval() throws Exception {
deletionSvc.stop();
}
static class MockFileSystem extends FilterFileSystem {
MockFileSystem() {
super(mock(FileSystem.class));
}
public void initialize(URI name, Configuration conf) throws IOException {}
}
private static ApplicationClientProtocol createMockRMClient(
List<ApplicationId> finishedApplicaitons,
List<ApplicationId> runningApplications) throws Exception {
final ApplicationClientProtocol mockProtocol =
mock(ApplicationClientProtocol.class);
if (finishedApplicaitons != null && !finishedApplicaitons.isEmpty()) {
for (ApplicationId appId : finishedApplicaitons) {
GetApplicationReportRequest request =
GetApplicationReportRequest.newInstance(appId);
GetApplicationReportResponse response =
createApplicationReportWithFinishedApplication();
when(mockProtocol.getApplicationReport(request))
.thenReturn(response);
}
}
if (runningApplications != null && !runningApplications.isEmpty()) {
for (ApplicationId appId : runningApplications) {
GetApplicationReportRequest request =
GetApplicationReportRequest.newInstance(appId);
GetApplicationReportResponse response =
createApplicationReportWithRunningApplication();
when(mockProtocol.getApplicationReport(request))
.thenReturn(response);
}
}
return mockProtocol;
}
private static GetApplicationReportResponse
createApplicationReportWithRunningApplication() {
ApplicationReport report = mock(ApplicationReport.class);
when(report.getYarnApplicationState()).thenReturn(
YarnApplicationState.RUNNING);
GetApplicationReportResponse response =
mock(GetApplicationReportResponse.class);
when(response.getApplicationReport()).thenReturn(report);
return response;
}
private static GetApplicationReportResponse
createApplicationReportWithFinishedApplication() {
ApplicationReport report = mock(ApplicationReport.class);
when(report.getYarnApplicationState()).thenReturn(
YarnApplicationState.FINISHED);
GetApplicationReportResponse response =
mock(GetApplicationReportResponse.class);
when(response.getApplicationReport()).thenReturn(report);
return response;
}
}

View File

@ -20,6 +20,10 @@
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@ -33,6 +37,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
@ -41,6 +46,8 @@
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
@ -65,6 +72,23 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
private static final Log LOG = LogFactory
.getLog(AppLogAggregatorImpl.class);
private static final int THREAD_SLEEP_TIME = 1000;
// This is temporary solution. The configuration will be deleted once
// we find a more scalable method to only write a single log file per LRS.
private static final String NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP
= YarnConfiguration.NM_PREFIX + "log-aggregation.num-log-files-per-app";
private static final int
DEFAULT_NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP = 30;
// This configuration is for debug and test purpose. By setting
// this configuration as true. We can break the lower bound of
// NM_LOG_AGGREGATION_ROLL_MONITORING_INTERVAL_SECONDS.
private static final String NM_LOG_AGGREGATION_DEBUG_ENABLED
= YarnConfiguration.NM_PREFIX + "log-aggregation.debug-enabled";
private static final boolean
DEFAULT_NM_LOG_AGGREGATION_DEBUG_ENABLED = false;
private static final long
NM_LOG_AGGREGATION_MIN_ROLL_MONITORING_INTERVAL_SECONDS = 3600;
private final LocalDirsHandlerService dirsHandler;
private final Dispatcher dispatcher;
@ -85,13 +109,16 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
private final Map<ApplicationAccessType, String> appAcls;
private final LogAggregationContext logAggregationContext;
private final Context context;
private final int retentionSize;
private final long rollingMonitorInterval;
private final NodeId nodeId;
private final Map<ContainerId, ContainerLogAggregator> containerLogAggregators =
new HashMap<ContainerId, ContainerLogAggregator>();
public AppLogAggregatorImpl(Dispatcher dispatcher,
DeletionService deletionService, Configuration conf,
ApplicationId appId, UserGroupInformation userUgi,
ApplicationId appId, UserGroupInformation userUgi, NodeId nodeId,
LocalDirsHandlerService dirsHandler, Path remoteNodeLogFileForApp,
ContainerLogsRetentionPolicy retentionPolicy,
Map<ApplicationAccessType, String> appAcls,
@ -111,6 +138,51 @@ public AppLogAggregatorImpl(Dispatcher dispatcher,
this.appAcls = appAcls;
this.logAggregationContext = logAggregationContext;
this.context = context;
this.nodeId = nodeId;
int configuredRentionSize =
conf.getInt(NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP,
DEFAULT_NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP);
if (configuredRentionSize <= 0) {
this.retentionSize =
DEFAULT_NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP;
} else {
this.retentionSize = configuredRentionSize;
}
long configuredRollingMonitorInterval =
this.logAggregationContext == null ? -1 : this.logAggregationContext
.getRollingIntervalSeconds();
boolean debug_mode =
conf.getBoolean(NM_LOG_AGGREGATION_DEBUG_ENABLED,
DEFAULT_NM_LOG_AGGREGATION_DEBUG_ENABLED);
if (configuredRollingMonitorInterval > 0
&& configuredRollingMonitorInterval <
NM_LOG_AGGREGATION_MIN_ROLL_MONITORING_INTERVAL_SECONDS) {
if (debug_mode) {
this.rollingMonitorInterval = configuredRollingMonitorInterval;
} else {
LOG.warn(
"rollingMonitorIntervall should be more than or equal to "
+ NM_LOG_AGGREGATION_MIN_ROLL_MONITORING_INTERVAL_SECONDS
+ " seconds. Using "
+ NM_LOG_AGGREGATION_MIN_ROLL_MONITORING_INTERVAL_SECONDS
+ " seconds instead.");
this.rollingMonitorInterval =
NM_LOG_AGGREGATION_MIN_ROLL_MONITORING_INTERVAL_SECONDS;
}
} else {
if (configuredRollingMonitorInterval <= 0) {
LOG.warn("rollingMonitorInterval is set as "
+ configuredRollingMonitorInterval + ". "
+ "The log rolling mornitoring interval is disabled. "
+ "The logs will be aggregated after this application is finished.");
} else {
LOG.warn("rollingMonitorInterval is set as "
+ configuredRollingMonitorInterval + ". "
+ "The logs will be aggregated every "
+ configuredRollingMonitorInterval + " seconds");
}
this.rollingMonitorInterval = configuredRollingMonitorInterval;
}
}
private void uploadLogsForContainers() {
@ -181,12 +253,17 @@ private void uploadLogsForContainers() {
}
}
// Before upload logs, make sure the number of existing logs
// is smaller than the configured NM log aggregation retention size.
if (uploadedLogsInThisCycle) {
cleanOldLogs();
}
if (writer != null) {
writer.close();
}
final Path renamedPath = logAggregationContext == null ||
logAggregationContext.getRollingIntervalSeconds() <= 0
final Path renamedPath = this.rollingMonitorInterval <= 0
? remoteNodeLogFileForApp : new Path(
remoteNodeLogFileForApp.getParent(),
remoteNodeLogFileForApp.getName() + "_"
@ -198,9 +275,12 @@ private void uploadLogsForContainers() {
@Override
public Object run() throws Exception {
FileSystem remoteFS = FileSystem.get(conf);
if (remoteFS.exists(remoteNodeTmpLogFileForApp)
&& rename) {
remoteFS.rename(remoteNodeTmpLogFileForApp, renamedPath);
if (remoteFS.exists(remoteNodeTmpLogFileForApp)) {
if (rename) {
remoteFS.rename(remoteNodeTmpLogFileForApp, renamedPath);
} else {
remoteFS.delete(remoteNodeTmpLogFileForApp, false);
}
}
return null;
}
@ -218,6 +298,60 @@ public Object run() throws Exception {
}
}
private void cleanOldLogs() {
try {
final FileSystem remoteFS =
this.remoteNodeLogFileForApp.getFileSystem(conf);
Path appDir =
this.remoteNodeLogFileForApp.getParent().makeQualified(
remoteFS.getUri(), remoteFS.getWorkingDirectory());
Set<FileStatus> status =
new HashSet<FileStatus>(Arrays.asList(remoteFS.listStatus(appDir)));
Iterable<FileStatus> mask =
Iterables.filter(status, new Predicate<FileStatus>() {
@Override
public boolean apply(FileStatus next) {
return next.getPath().getName()
.contains(LogAggregationUtils.getNodeString(nodeId))
&& !next.getPath().getName().endsWith(
LogAggregationUtils.TMP_FILE_SUFFIX);
}
});
status = Sets.newHashSet(mask);
// Normally, we just need to delete one oldest log
// before we upload a new log.
// If we can not delete the older logs in this cycle,
// we will delete them in next cycle.
if (status.size() >= this.retentionSize) {
// sort by the lastModificationTime ascending
List<FileStatus> statusList = new ArrayList<FileStatus>(status);
Collections.sort(statusList, new Comparator<FileStatus>() {
public int compare(FileStatus s1, FileStatus s2) {
return s1.getModificationTime() < s2.getModificationTime() ? -1
: s1.getModificationTime() > s2.getModificationTime() ? 1 : 0;
}
});
for (int i = 0 ; i <= statusList.size() - this.retentionSize; i++) {
final FileStatus remove = statusList.get(i);
try {
userUgi.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
remoteFS.delete(remove.getPath(), false);
return null;
}
});
} catch (Exception e) {
LOG.error("Failed to delete " + remove.getPath(), e);
}
}
}
} catch (Exception e) {
LOG.error("Failed to clean old logs", e);
}
}
@Override
public void run() {
try {
@ -235,9 +369,8 @@ private void doAppLogAggregation() {
while (!this.appFinishing.get() && !this.aborted.get()) {
synchronized(this) {
try {
if (this.logAggregationContext != null && this.logAggregationContext
.getRollingIntervalSeconds() > 0) {
wait(this.logAggregationContext.getRollingIntervalSeconds() * 1000);
if (this.rollingMonitorInterval > 0) {
wait(this.rollingMonitorInterval * 1000);
if (this.appFinishing.get() || this.aborted.get()) {
break;
}

View File

@ -342,7 +342,7 @@ protected void initAppAggregator(final ApplicationId appId, String user,
// New application
final AppLogAggregator appLogAggregator =
new AppLogAggregatorImpl(this.dispatcher, this.deletionService,
getConfig(), appId, userUgi, dirsHandler,
getConfig(), appId, userUgi, this.nodeId, dirsHandler,
getRemoteNodeLogFileForApp(appId, user), logRetentionPolicy,
appAcls, logAggregationContext, this.context);
if (this.appLogAggregators.putIfAbsent(appId, appLogAggregator) != null) {

View File

@ -699,7 +699,7 @@ private void writeContainerLogs(File appLogDir, ContainerId containerId,
}
}
private void verifyContainerLogs(LogAggregationService logAggregationService,
private String verifyContainerLogs(LogAggregationService logAggregationService,
ApplicationId appId, ContainerId[] expectedContainerIds,
String[] logFiles, int numOfContainerLogs, boolean multiLogs)
throws IOException {
@ -811,6 +811,7 @@ private void verifyContainerLogs(LogAggregationService logAggregationService,
Assert.assertEquals(0, thisContainerMap.size());
}
Assert.assertEquals(0, logMap.size());
return targetNodeFile.getPath().getName();
} finally {
reader.close();
}
@ -1219,17 +1220,32 @@ public void testLogAggregationServiceWithPatterns() throws Exception {
dispatcher.stop();
}
@SuppressWarnings("unchecked")
@Test (timeout = 50000)
public void testLogAggregationServiceWithInterval() throws Exception {
final int maxAttempts = 50;
testLogAggregationService(false);
}
@Test (timeout = 50000)
public void testLogAggregationServiceWithRetention() throws Exception {
testLogAggregationService(true);
}
@SuppressWarnings("unchecked")
private void testLogAggregationService(boolean retentionSizeLimitation)
throws Exception {
LogAggregationContext logAggregationContextWithInterval =
Records.newRecord(LogAggregationContext.class);
logAggregationContextWithInterval.setRollingIntervalSeconds(5000);
this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
this.remoteRootLogDir.getAbsolutePath());
if (retentionSizeLimitation) {
// set the retention size as 1. The number of logs for one application
// in one NM should be 1.
this.conf.setInt(YarnConfiguration.NM_PREFIX
+ "log-aggregation.num-log-files-per-app", 1);
}
// by setting this configuration, the log files will not be deleted immediately after
// they are aggregated to remote directory.
// We could use it to test whether the previous aggregated log files will be aggregated
@ -1280,23 +1296,29 @@ public void testLogAggregationServiceWithInterval() throws Exception {
.get(application);
aggregator.doLogAggregationOutOfBand();
int count = 0;
while (numOfLogsAvailable(logAggregationService, application) != 1
&& count <= maxAttempts) {
Thread.sleep(100);
count++;
if (retentionSizeLimitation) {
Assert.assertTrue(waitAndCheckLogNum(logAggregationService, application,
50, 1, true, null));
} else {
Assert.assertTrue(waitAndCheckLogNum(logAggregationService, application,
50, 1, false, null));
}
String logFileInLastCycle = null;
// Container logs should be uploaded
verifyContainerLogs(logAggregationService, application,
logFileInLastCycle = verifyContainerLogs(logAggregationService, application,
new ContainerId[] { container }, logFiles1, 3, true);
Thread.sleep(2000);
// There is no log generated at this time. Do the log aggregation again.
aggregator.doLogAggregationOutOfBand();
// Same logs will not be aggregated again.
// Only one aggregated log file in Remote file directory.
Assert.assertEquals(numOfLogsAvailable(logAggregationService, application),
1);
Assert.assertEquals(numOfLogsAvailable(logAggregationService,
application, true, null), 1);
Thread.sleep(2000);
// Do log aggregation
String[] logFiles2 = new String[] { "stdout_1", "stderr_1", "syslog_1" };
@ -1304,16 +1326,19 @@ public void testLogAggregationServiceWithInterval() throws Exception {
aggregator.doLogAggregationOutOfBand();
count = 0;
while (numOfLogsAvailable(logAggregationService, application) != 2
&& count <= maxAttempts) {
Thread.sleep(100);
count ++;
if (retentionSizeLimitation) {
Assert.assertTrue(waitAndCheckLogNum(logAggregationService, application,
50, 1, true, logFileInLastCycle));
} else {
Assert.assertTrue(waitAndCheckLogNum(logAggregationService, application,
50, 2, false, null));
}
// Container logs should be uploaded
verifyContainerLogs(logAggregationService, application,
logFileInLastCycle = verifyContainerLogs(logAggregationService, application,
new ContainerId[] { container }, logFiles2, 3, true);
Thread.sleep(2000);
// create another logs
String[] logFiles3 = new String[] { "stdout_2", "stderr_2", "syslog_2" };
writeContainerLogs(appLogDir, container, logFiles3);
@ -1323,13 +1348,13 @@ public void testLogAggregationServiceWithInterval() throws Exception {
dispatcher.await();
logAggregationService.handle(new LogHandlerAppFinishedEvent(application));
count = 0;
while (numOfLogsAvailable(logAggregationService, application) != 3
&& count <= maxAttempts) {
Thread.sleep(100);
count ++;
if (retentionSizeLimitation) {
Assert.assertTrue(waitAndCheckLogNum(logAggregationService, application,
50, 1, true, logFileInLastCycle));
} else {
Assert.assertTrue(waitAndCheckLogNum(logAggregationService, application,
50, 3, false, null));
}
verifyContainerLogs(logAggregationService, application,
new ContainerId[] { container }, logFiles3, 3, true);
logAggregationService.stop();
@ -1338,7 +1363,8 @@ public void testLogAggregationServiceWithInterval() throws Exception {
}
private int numOfLogsAvailable(LogAggregationService logAggregationService,
ApplicationId appId) throws IOException {
ApplicationId appId, boolean sizeLimited, String lastLogFile)
throws IOException {
Path appLogDir = logAggregationService.getRemoteAppLogDir(appId, this.user);
RemoteIterator<FileStatus> nodeFiles = null;
try {
@ -1354,7 +1380,9 @@ private int numOfLogsAvailable(LogAggregationService logAggregationService,
while (nodeFiles.hasNext()) {
FileStatus status = nodeFiles.next();
String filename = status.getPath().getName();
if (filename.contains(LogAggregationUtils.TMP_FILE_SUFFIX)) {
if (filename.contains(LogAggregationUtils.TMP_FILE_SUFFIX)
|| (lastLogFile != null && filename.contains(lastLogFile)
&& sizeLimited)) {
return -1;
}
if (filename.contains(LogAggregationUtils
@ -1364,4 +1392,18 @@ private int numOfLogsAvailable(LogAggregationService logAggregationService,
}
return count;
}
private boolean waitAndCheckLogNum(
LogAggregationService logAggregationService, ApplicationId application,
int maxAttempts, int expectNum, boolean sizeLimited, String lastLogFile)
throws IOException, InterruptedException {
int count = 0;
while (numOfLogsAvailable(logAggregationService, application, sizeLimited,
lastLogFile) != expectNum && count <= maxAttempts) {
Thread.sleep(500);
count++;
}
return numOfLogsAvailable(logAggregationService, application, sizeLimited,
lastLogFile) == expectNum;
}
}