YARN-2583. Modified AggregatedLogDeletionService to be able to delete rolling aggregated logs. Contributed by Xuan Gong.

(cherry picked from commit cb81bac002)
This commit is contained in:
Zhijie Shen 2014-10-10 00:10:39 -07:00
parent b81641a310
commit 1e6d81a886
6 changed files with 467 additions and 68 deletions

View File

@ -303,6 +303,9 @@ Release 2.6.0 - UNRELEASED
YARN-2629. Made the distributed shell use the domain-based timeline ACLs. YARN-2629. Made the distributed shell use the domain-based timeline ACLs.
(zjshen) (zjshen)
YARN-2583. Modified AggregatedLogDeletionService to be able to delete rolling
aggregated logs. (Xuan Gong via zjshen)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -24,38 +24,53 @@
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.ClientRMProxy;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.ConverterUtils;
import com.google.common.annotations.VisibleForTesting;
/** /**
* A service that periodically deletes aggregated logs. * A service that periodically deletes aggregated logs.
*/ */
@Private @InterfaceAudience.LimitedPrivate({"yarn", "mapreduce"})
public class AggregatedLogDeletionService extends AbstractService { public class AggregatedLogDeletionService extends AbstractService {
private static final Log LOG = LogFactory.getLog(AggregatedLogDeletionService.class); private static final Log LOG = LogFactory.getLog(AggregatedLogDeletionService.class);
private Timer timer = null; private Timer timer = null;
private long checkIntervalMsecs; private long checkIntervalMsecs;
private LogDeletionTask task;
static class LogDeletionTask extends TimerTask { static class LogDeletionTask extends TimerTask {
private Configuration conf; private Configuration conf;
private long retentionMillis; private long retentionMillis;
private String suffix = null; private String suffix = null;
private Path remoteRootLogDir = null; private Path remoteRootLogDir = null;
private ApplicationClientProtocol rmClient = null;
public LogDeletionTask(Configuration conf, long retentionSecs) { public LogDeletionTask(Configuration conf, long retentionSecs, ApplicationClientProtocol rmClient) {
this.conf = conf; this.conf = conf;
this.retentionMillis = retentionSecs * 1000; this.retentionMillis = retentionSecs * 1000;
this.suffix = LogAggregationUtils.getRemoteNodeLogDirSuffix(conf); this.suffix = LogAggregationUtils.getRemoteNodeLogDirSuffix(conf);
this.remoteRootLogDir = this.remoteRootLogDir =
new Path(conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, new Path(conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR)); YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
this.rmClient = rmClient;
} }
@Override @Override
@ -64,11 +79,10 @@ public void run() {
LOG.info("aggregated log deletion started."); LOG.info("aggregated log deletion started.");
try { try {
FileSystem fs = remoteRootLogDir.getFileSystem(conf); FileSystem fs = remoteRootLogDir.getFileSystem(conf);
for(FileStatus userDir : fs.listStatus(remoteRootLogDir)) { for(FileStatus userDir : fs.listStatus(remoteRootLogDir)) {
if(userDir.isDirectory()) { if(userDir.isDirectory()) {
Path userDirPath = new Path(userDir.getPath(), suffix); Path userDirPath = new Path(userDir.getPath(), suffix);
deleteOldLogDirsFrom(userDirPath, cutoffMillis, fs); deleteOldLogDirsFrom(userDirPath, cutoffMillis, fs, rmClient);
} }
} }
} catch (IOException e) { } catch (IOException e) {
@ -79,18 +93,36 @@ public void run() {
} }
private static void deleteOldLogDirsFrom(Path dir, long cutoffMillis, private static void deleteOldLogDirsFrom(Path dir, long cutoffMillis,
FileSystem fs) { FileSystem fs, ApplicationClientProtocol rmClient) {
try { try {
for(FileStatus appDir : fs.listStatus(dir)) { for(FileStatus appDir : fs.listStatus(dir)) {
if(appDir.isDirectory() && if(appDir.isDirectory() &&
appDir.getModificationTime() < cutoffMillis) { appDir.getModificationTime() < cutoffMillis) {
if(shouldDeleteLogDir(appDir, cutoffMillis, fs)) { boolean appTerminated =
isApplicationTerminated(ConverterUtils.toApplicationId(appDir
.getPath().getName()), rmClient);
if(appTerminated && shouldDeleteLogDir(appDir, cutoffMillis, fs)) {
try { try {
LOG.info("Deleting aggregated logs in "+appDir.getPath()); LOG.info("Deleting aggregated logs in "+appDir.getPath());
fs.delete(appDir.getPath(), true); fs.delete(appDir.getPath(), true);
} catch (IOException e) { } catch (IOException e) {
logIOException("Could not delete "+appDir.getPath(), e); logIOException("Could not delete "+appDir.getPath(), e);
} }
} else if (!appTerminated){
try {
for(FileStatus node: fs.listStatus(appDir.getPath())) {
if(node.getModificationTime() < cutoffMillis) {
try {
fs.delete(node.getPath(), true);
} catch (IOException ex) {
logIOException("Could not delete "+appDir.getPath(), ex);
}
}
}
} catch(IOException e) {
logIOException(
"Error reading the contents of " + appDir.getPath(), e);
}
} }
} }
} }
@ -115,6 +147,29 @@ private static boolean shouldDeleteLogDir(FileStatus dir, long cutoffMillis,
} }
return shouldDelete; return shouldDelete;
} }
private static boolean isApplicationTerminated(ApplicationId appId,
ApplicationClientProtocol rmClient) throws IOException {
ApplicationReport appReport = null;
try {
appReport =
rmClient.getApplicationReport(
GetApplicationReportRequest.newInstance(appId))
.getApplicationReport();
} catch (ApplicationNotFoundException e) {
return true;
} catch (YarnException e) {
throw new IOException(e);
}
YarnApplicationState currentState = appReport.getYarnApplicationState();
return currentState == YarnApplicationState.FAILED
|| currentState == YarnApplicationState.KILLED
|| currentState == YarnApplicationState.FINISHED;
}
public ApplicationClientProtocol getRMClient() {
return this.rmClient;
}
} }
private static void logIOException(String comment, IOException e) { private static void logIOException(String comment, IOException e) {
@ -140,6 +195,7 @@ protected void serviceStart() throws Exception {
@Override @Override
protected void serviceStop() throws Exception { protected void serviceStop() throws Exception {
stopRMClient();
stopTimer(); stopTimer();
super.serviceStop(); super.serviceStop();
} }
@ -156,10 +212,11 @@ private void setLogAggCheckIntervalMsecs(long retentionSecs) {
} }
} }
public void refreshLogRetentionSettings() { public void refreshLogRetentionSettings() throws IOException {
if (getServiceState() == STATE.STARTED) { if (getServiceState() == STATE.STARTED) {
Configuration conf = createConf(); Configuration conf = createConf();
setConfig(conf); setConfig(conf);
stopRMClient();
stopTimer(); stopTimer();
scheduleLogDeletionTask(); scheduleLogDeletionTask();
} else { } else {
@ -167,7 +224,7 @@ public void refreshLogRetentionSettings() {
} }
} }
private void scheduleLogDeletionTask() { private void scheduleLogDeletionTask() throws IOException {
Configuration conf = getConfig(); Configuration conf = getConfig();
if (!conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, if (!conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED)) { YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED)) {
@ -183,7 +240,7 @@ private void scheduleLogDeletionTask() {
return; return;
} }
setLogAggCheckIntervalMsecs(retentionSecs); setLogAggCheckIntervalMsecs(retentionSecs);
TimerTask task = new LogDeletionTask(conf, retentionSecs); task = new LogDeletionTask(conf, retentionSecs, creatRMClient());
timer = new Timer(); timer = new Timer();
timer.scheduleAtFixedRate(task, 0, checkIntervalMsecs); timer.scheduleAtFixedRate(task, 0, checkIntervalMsecs);
} }
@ -201,4 +258,20 @@ public long getCheckIntervalMsecs() {
protected Configuration createConf() { protected Configuration createConf() {
return new Configuration(); return new Configuration();
} }
// Directly create and use ApplicationClientProtocol.
// We have already marked ApplicationClientProtocol.getApplicationReport
// as @Idempotent, it will automatically take care of RM restart/failover.
@VisibleForTesting
protected ApplicationClientProtocol creatRMClient() throws IOException {
return ClientRMProxy.createRMProxy(getConfig(),
ApplicationClientProtocol.class);
}
@VisibleForTesting
protected void stopRMClient() {
if (task != null && task.getRMClient() != null) {
RPC.stopProxy(task.getRMClient());
}
}
} }

View File

@ -20,6 +20,9 @@
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
@ -27,6 +30,12 @@
import org.apache.hadoop.fs.FilterFileSystem; import org.apache.hadoop.fs.FilterFileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -51,7 +60,7 @@ public void testDeletion() throws Exception {
String root = "mockfs://foo/"; String root = "mockfs://foo/";
String remoteRootLogDir = root+"tmp/logs"; String remoteRootLogDir = root+"tmp/logs";
String suffix = "logs"; String suffix = "logs";
Configuration conf = new Configuration(); final Configuration conf = new Configuration();
conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class); conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
conf.set(YarnConfiguration.LOG_AGGREGATION_ENABLED, "true"); conf.set(YarnConfiguration.LOG_AGGREGATION_ENABLED, "true");
conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, "1800"); conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, "1800");
@ -70,21 +79,36 @@ public void testDeletion() throws Exception {
when(mockFs.listStatus(remoteRootLogPath)).thenReturn( when(mockFs.listStatus(remoteRootLogPath)).thenReturn(
new FileStatus[]{userDirStatus}); new FileStatus[]{userDirStatus});
ApplicationId appId1 =
ApplicationId.newInstance(System.currentTimeMillis(), 1);
Path userLogDir = new Path(userDir, suffix); Path userLogDir = new Path(userDir, suffix);
Path app1Dir = new Path(userLogDir, "application_1_1"); Path app1Dir = new Path(userLogDir, appId1.toString());
FileStatus app1DirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, app1Dir); FileStatus app1DirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, app1Dir);
Path app2Dir = new Path(userLogDir, "application_1_2"); ApplicationId appId2 =
ApplicationId.newInstance(System.currentTimeMillis(), 2);
Path app2Dir = new Path(userLogDir, appId2.toString());
FileStatus app2DirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, app2Dir); FileStatus app2DirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, app2Dir);
Path app3Dir = new Path(userLogDir, "application_1_3"); ApplicationId appId3 =
ApplicationId.newInstance(System.currentTimeMillis(), 3);
Path app3Dir = new Path(userLogDir, appId3.toString());
FileStatus app3DirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, app3Dir); FileStatus app3DirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, app3Dir);
Path app4Dir = new Path(userLogDir, "application_1_4"); ApplicationId appId4 =
ApplicationId.newInstance(System.currentTimeMillis(), 4);
Path app4Dir = new Path(userLogDir, appId4.toString());
FileStatus app4DirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, app4Dir); FileStatus app4DirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, app4Dir);
ApplicationId appId5 =
ApplicationId.newInstance(System.currentTimeMillis(), 5);
Path app5Dir = new Path(userLogDir, appId5.toString());
FileStatus app5DirStatus =
new FileStatus(0, true, 0, 0, toDeleteTime, app5Dir);
when(mockFs.listStatus(userLogDir)).thenReturn( when(mockFs.listStatus(userLogDir)).thenReturn(
new FileStatus[]{app1DirStatus, app2DirStatus, app3DirStatus, app4DirStatus}); new FileStatus[] { app1DirStatus, app2DirStatus, app3DirStatus,
app4DirStatus, app5DirStatus });
when(mockFs.listStatus(app1Dir)).thenReturn( when(mockFs.listStatus(app1Dir)).thenReturn(
new FileStatus[]{}); new FileStatus[]{});
@ -118,19 +142,54 @@ public void testDeletion() throws Exception {
when(mockFs.listStatus(app4Dir)).thenReturn( when(mockFs.listStatus(app4Dir)).thenReturn(
new FileStatus[]{app4Log1Status, app4Log2Status}); new FileStatus[]{app4Log1Status, app4Log2Status});
AggregatedLogDeletionService.LogDeletionTask task = Path app5Log1 = new Path(app5Dir, "host1");
new AggregatedLogDeletionService.LogDeletionTask(conf, 1800); FileStatus app5Log1Status = new FileStatus(10, false, 1, 1, toDeleteTime, app5Log1);
task.run(); Path app5Log2 = new Path(app5Dir, "host2");
FileStatus app5Log2Status = new FileStatus(10, false, 1, 1, toKeepTime, app5Log2);
verify(mockFs).delete(app1Dir, true); when(mockFs.listStatus(app5Dir)).thenReturn(
verify(mockFs, times(0)).delete(app2Dir, true); new FileStatus[]{app5Log1Status, app5Log2Status});
verify(mockFs).delete(app3Dir, true);
verify(mockFs).delete(app4Dir, true); final List<ApplicationId> finishedApplications =
Collections.unmodifiableList(Arrays.asList(appId1, appId2, appId3,
appId4));
final List<ApplicationId> runningApplications =
Collections.unmodifiableList(Arrays.asList(appId5));
AggregatedLogDeletionService deletionService =
new AggregatedLogDeletionService() {
@Override
protected ApplicationClientProtocol creatRMClient()
throws IOException {
try {
return createMockRMClient(finishedApplications,
runningApplications);
} catch (Exception e) {
throw new IOException(e);
}
}
@Override
protected void stopRMClient() {
// DO NOTHING
}
};
deletionService.init(conf);
deletionService.start();
verify(mockFs, timeout(2000)).delete(app1Dir, true);
verify(mockFs, timeout(2000).times(0)).delete(app2Dir, true);
verify(mockFs, timeout(2000)).delete(app3Dir, true);
verify(mockFs, timeout(2000)).delete(app4Dir, true);
verify(mockFs, timeout(2000).times(0)).delete(app5Dir, true);
verify(mockFs, timeout(2000)).delete(app5Log1, true);
verify(mockFs, timeout(2000).times(0)).delete(app5Log2, true);
deletionService.stop();
} }
@Test @Test
public void testRefreshLogRetentionSettings() throws IOException { public void testRefreshLogRetentionSettings() throws Exception {
long now = System.currentTimeMillis(); long now = System.currentTimeMillis();
//time before 2000 sec //time before 2000 sec
long before2000Secs = now - (2000 * 1000); long before2000Secs = now - (2000 * 1000);
@ -163,13 +222,17 @@ public void testRefreshLogRetentionSettings() throws IOException {
Path userLogDir = new Path(userDir, suffix); Path userLogDir = new Path(userDir, suffix);
ApplicationId appId1 =
ApplicationId.newInstance(System.currentTimeMillis(), 1);
//Set time last modified of app1Dir directory and its files to before2000Secs //Set time last modified of app1Dir directory and its files to before2000Secs
Path app1Dir = new Path(userLogDir, "application_1_1"); Path app1Dir = new Path(userLogDir, appId1.toString());
FileStatus app1DirStatus = new FileStatus(0, true, 0, 0, before2000Secs, FileStatus app1DirStatus = new FileStatus(0, true, 0, 0, before2000Secs,
app1Dir); app1Dir);
ApplicationId appId2 =
ApplicationId.newInstance(System.currentTimeMillis(), 2);
//Set time last modified of app1Dir directory and its files to before50Secs //Set time last modified of app1Dir directory and its files to before50Secs
Path app2Dir = new Path(userLogDir, "application_1_2"); Path app2Dir = new Path(userLogDir, appId2.toString());
FileStatus app2DirStatus = new FileStatus(0, true, 0, 0, before50Secs, FileStatus app2DirStatus = new FileStatus(0, true, 0, 0, before50Secs,
app2Dir); app2Dir);
@ -190,11 +253,27 @@ public void testRefreshLogRetentionSettings() throws IOException {
when(mockFs.listStatus(app2Dir)).thenReturn( when(mockFs.listStatus(app2Dir)).thenReturn(
new FileStatus[] { app2Log1Status }); new FileStatus[] { app2Log1Status });
final List<ApplicationId> finishedApplications =
Collections.unmodifiableList(Arrays.asList(appId1, appId2));
AggregatedLogDeletionService deletionSvc = new AggregatedLogDeletionService() { AggregatedLogDeletionService deletionSvc = new AggregatedLogDeletionService() {
@Override @Override
protected Configuration createConf() { protected Configuration createConf() {
return conf; return conf;
} }
@Override
protected ApplicationClientProtocol creatRMClient()
throws IOException {
try {
return createMockRMClient(finishedApplications, null);
} catch (Exception e) {
throw new IOException(e);
}
}
@Override
protected void stopRMClient() {
// DO NOTHING
}
}; };
deletionSvc.init(conf); deletionSvc.init(conf);
@ -253,8 +332,10 @@ public void testCheckInterval() throws Exception {
when(mockFs.listStatus(remoteRootLogPath)).thenReturn( when(mockFs.listStatus(remoteRootLogPath)).thenReturn(
new FileStatus[]{userDirStatus}); new FileStatus[]{userDirStatus});
ApplicationId appId1 =
ApplicationId.newInstance(System.currentTimeMillis(), 1);
Path userLogDir = new Path(userDir, suffix); Path userLogDir = new Path(userDir, suffix);
Path app1Dir = new Path(userLogDir, "application_1_1"); Path app1Dir = new Path(userLogDir, appId1.toString());
FileStatus app1DirStatus = new FileStatus(0, true, 0, 0, now, app1Dir); FileStatus app1DirStatus = new FileStatus(0, true, 0, 0, now, app1Dir);
when(mockFs.listStatus(userLogDir)).thenReturn( when(mockFs.listStatus(userLogDir)).thenReturn(
@ -266,8 +347,25 @@ public void testCheckInterval() throws Exception {
when(mockFs.listStatus(app1Dir)).thenReturn( when(mockFs.listStatus(app1Dir)).thenReturn(
new FileStatus[]{app1Log1Status}); new FileStatus[]{app1Log1Status});
final List<ApplicationId> finishedApplications =
Collections.unmodifiableList(Arrays.asList(appId1));
AggregatedLogDeletionService deletionSvc = AggregatedLogDeletionService deletionSvc =
new AggregatedLogDeletionService(); new AggregatedLogDeletionService() {
@Override
protected ApplicationClientProtocol creatRMClient()
throws IOException {
try {
return createMockRMClient(finishedApplications, null);
} catch (Exception e) {
throw new IOException(e);
}
}
@Override
protected void stopRMClient() {
// DO NOTHING
}
};
deletionSvc.init(conf); deletionSvc.init(conf);
deletionSvc.start(); deletionSvc.start();
@ -293,4 +391,54 @@ static class MockFileSystem extends FilterFileSystem {
} }
public void initialize(URI name, Configuration conf) throws IOException {} public void initialize(URI name, Configuration conf) throws IOException {}
} }
private static ApplicationClientProtocol createMockRMClient(
List<ApplicationId> finishedApplicaitons,
List<ApplicationId> runningApplications) throws Exception {
final ApplicationClientProtocol mockProtocol =
mock(ApplicationClientProtocol.class);
if (finishedApplicaitons != null && !finishedApplicaitons.isEmpty()) {
for (ApplicationId appId : finishedApplicaitons) {
GetApplicationReportRequest request =
GetApplicationReportRequest.newInstance(appId);
GetApplicationReportResponse response =
createApplicationReportWithFinishedApplication();
when(mockProtocol.getApplicationReport(request))
.thenReturn(response);
}
}
if (runningApplications != null && !runningApplications.isEmpty()) {
for (ApplicationId appId : runningApplications) {
GetApplicationReportRequest request =
GetApplicationReportRequest.newInstance(appId);
GetApplicationReportResponse response =
createApplicationReportWithRunningApplication();
when(mockProtocol.getApplicationReport(request))
.thenReturn(response);
}
}
return mockProtocol;
}
private static GetApplicationReportResponse
createApplicationReportWithRunningApplication() {
ApplicationReport report = mock(ApplicationReport.class);
when(report.getYarnApplicationState()).thenReturn(
YarnApplicationState.RUNNING);
GetApplicationReportResponse response =
mock(GetApplicationReportResponse.class);
when(response.getApplicationReport()).thenReturn(report);
return response;
}
private static GetApplicationReportResponse
createApplicationReportWithFinishedApplication() {
ApplicationReport report = mock(ApplicationReport.class);
when(report.getYarnApplicationState()).thenReturn(
YarnApplicationState.FINISHED);
GetApplicationReportResponse response =
mock(GetApplicationReportResponse.class);
when(response.getApplicationReport()).thenReturn(report);
return response;
}
} }

View File

@ -20,6 +20,10 @@
import java.io.IOException; import java.io.IOException;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
@ -33,6 +37,7 @@
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
@ -41,6 +46,8 @@
import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LogAggregationContext; import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy; import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
@ -65,6 +72,23 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
private static final Log LOG = LogFactory private static final Log LOG = LogFactory
.getLog(AppLogAggregatorImpl.class); .getLog(AppLogAggregatorImpl.class);
private static final int THREAD_SLEEP_TIME = 1000; private static final int THREAD_SLEEP_TIME = 1000;
// This is temporary solution. The configuration will be deleted once
// we find a more scalable method to only write a single log file per LRS.
private static final String NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP
= YarnConfiguration.NM_PREFIX + "log-aggregation.num-log-files-per-app";
private static final int
DEFAULT_NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP = 30;
// This configuration is for debug and test purpose. By setting
// this configuration as true. We can break the lower bound of
// NM_LOG_AGGREGATION_ROLL_MONITORING_INTERVAL_SECONDS.
private static final String NM_LOG_AGGREGATION_DEBUG_ENABLED
= YarnConfiguration.NM_PREFIX + "log-aggregation.debug-enabled";
private static final boolean
DEFAULT_NM_LOG_AGGREGATION_DEBUG_ENABLED = false;
private static final long
NM_LOG_AGGREGATION_MIN_ROLL_MONITORING_INTERVAL_SECONDS = 3600;
private final LocalDirsHandlerService dirsHandler; private final LocalDirsHandlerService dirsHandler;
private final Dispatcher dispatcher; private final Dispatcher dispatcher;
@ -85,13 +109,16 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
private final Map<ApplicationAccessType, String> appAcls; private final Map<ApplicationAccessType, String> appAcls;
private final LogAggregationContext logAggregationContext; private final LogAggregationContext logAggregationContext;
private final Context context; private final Context context;
private final int retentionSize;
private final long rollingMonitorInterval;
private final NodeId nodeId;
private final Map<ContainerId, ContainerLogAggregator> containerLogAggregators = private final Map<ContainerId, ContainerLogAggregator> containerLogAggregators =
new HashMap<ContainerId, ContainerLogAggregator>(); new HashMap<ContainerId, ContainerLogAggregator>();
public AppLogAggregatorImpl(Dispatcher dispatcher, public AppLogAggregatorImpl(Dispatcher dispatcher,
DeletionService deletionService, Configuration conf, DeletionService deletionService, Configuration conf,
ApplicationId appId, UserGroupInformation userUgi, ApplicationId appId, UserGroupInformation userUgi, NodeId nodeId,
LocalDirsHandlerService dirsHandler, Path remoteNodeLogFileForApp, LocalDirsHandlerService dirsHandler, Path remoteNodeLogFileForApp,
ContainerLogsRetentionPolicy retentionPolicy, ContainerLogsRetentionPolicy retentionPolicy,
Map<ApplicationAccessType, String> appAcls, Map<ApplicationAccessType, String> appAcls,
@ -111,6 +138,51 @@ public AppLogAggregatorImpl(Dispatcher dispatcher,
this.appAcls = appAcls; this.appAcls = appAcls;
this.logAggregationContext = logAggregationContext; this.logAggregationContext = logAggregationContext;
this.context = context; this.context = context;
this.nodeId = nodeId;
int configuredRentionSize =
conf.getInt(NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP,
DEFAULT_NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP);
if (configuredRentionSize <= 0) {
this.retentionSize =
DEFAULT_NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP;
} else {
this.retentionSize = configuredRentionSize;
}
long configuredRollingMonitorInterval =
this.logAggregationContext == null ? -1 : this.logAggregationContext
.getRollingIntervalSeconds();
boolean debug_mode =
conf.getBoolean(NM_LOG_AGGREGATION_DEBUG_ENABLED,
DEFAULT_NM_LOG_AGGREGATION_DEBUG_ENABLED);
if (configuredRollingMonitorInterval > 0
&& configuredRollingMonitorInterval <
NM_LOG_AGGREGATION_MIN_ROLL_MONITORING_INTERVAL_SECONDS) {
if (debug_mode) {
this.rollingMonitorInterval = configuredRollingMonitorInterval;
} else {
LOG.warn(
"rollingMonitorIntervall should be more than or equal to "
+ NM_LOG_AGGREGATION_MIN_ROLL_MONITORING_INTERVAL_SECONDS
+ " seconds. Using "
+ NM_LOG_AGGREGATION_MIN_ROLL_MONITORING_INTERVAL_SECONDS
+ " seconds instead.");
this.rollingMonitorInterval =
NM_LOG_AGGREGATION_MIN_ROLL_MONITORING_INTERVAL_SECONDS;
}
} else {
if (configuredRollingMonitorInterval <= 0) {
LOG.warn("rollingMonitorInterval is set as "
+ configuredRollingMonitorInterval + ". "
+ "The log rolling mornitoring interval is disabled. "
+ "The logs will be aggregated after this application is finished.");
} else {
LOG.warn("rollingMonitorInterval is set as "
+ configuredRollingMonitorInterval + ". "
+ "The logs will be aggregated every "
+ configuredRollingMonitorInterval + " seconds");
}
this.rollingMonitorInterval = configuredRollingMonitorInterval;
}
} }
private void uploadLogsForContainers() { private void uploadLogsForContainers() {
@ -181,12 +253,17 @@ private void uploadLogsForContainers() {
} }
} }
// Before upload logs, make sure the number of existing logs
// is smaller than the configured NM log aggregation retention size.
if (uploadedLogsInThisCycle) {
cleanOldLogs();
}
if (writer != null) { if (writer != null) {
writer.close(); writer.close();
} }
final Path renamedPath = logAggregationContext == null || final Path renamedPath = this.rollingMonitorInterval <= 0
logAggregationContext.getRollingIntervalSeconds() <= 0
? remoteNodeLogFileForApp : new Path( ? remoteNodeLogFileForApp : new Path(
remoteNodeLogFileForApp.getParent(), remoteNodeLogFileForApp.getParent(),
remoteNodeLogFileForApp.getName() + "_" remoteNodeLogFileForApp.getName() + "_"
@ -198,9 +275,12 @@ private void uploadLogsForContainers() {
@Override @Override
public Object run() throws Exception { public Object run() throws Exception {
FileSystem remoteFS = FileSystem.get(conf); FileSystem remoteFS = FileSystem.get(conf);
if (remoteFS.exists(remoteNodeTmpLogFileForApp) if (remoteFS.exists(remoteNodeTmpLogFileForApp)) {
&& rename) { if (rename) {
remoteFS.rename(remoteNodeTmpLogFileForApp, renamedPath); remoteFS.rename(remoteNodeTmpLogFileForApp, renamedPath);
} else {
remoteFS.delete(remoteNodeTmpLogFileForApp, false);
}
} }
return null; return null;
} }
@ -218,6 +298,60 @@ public Object run() throws Exception {
} }
} }
private void cleanOldLogs() {
try {
final FileSystem remoteFS =
this.remoteNodeLogFileForApp.getFileSystem(conf);
Path appDir =
this.remoteNodeLogFileForApp.getParent().makeQualified(
remoteFS.getUri(), remoteFS.getWorkingDirectory());
Set<FileStatus> status =
new HashSet<FileStatus>(Arrays.asList(remoteFS.listStatus(appDir)));
Iterable<FileStatus> mask =
Iterables.filter(status, new Predicate<FileStatus>() {
@Override
public boolean apply(FileStatus next) {
return next.getPath().getName()
.contains(LogAggregationUtils.getNodeString(nodeId))
&& !next.getPath().getName().endsWith(
LogAggregationUtils.TMP_FILE_SUFFIX);
}
});
status = Sets.newHashSet(mask);
// Normally, we just need to delete one oldest log
// before we upload a new log.
// If we can not delete the older logs in this cycle,
// we will delete them in next cycle.
if (status.size() >= this.retentionSize) {
// sort by the lastModificationTime ascending
List<FileStatus> statusList = new ArrayList<FileStatus>(status);
Collections.sort(statusList, new Comparator<FileStatus>() {
public int compare(FileStatus s1, FileStatus s2) {
return s1.getModificationTime() < s2.getModificationTime() ? -1
: s1.getModificationTime() > s2.getModificationTime() ? 1 : 0;
}
});
for (int i = 0 ; i <= statusList.size() - this.retentionSize; i++) {
final FileStatus remove = statusList.get(i);
try {
userUgi.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
remoteFS.delete(remove.getPath(), false);
return null;
}
});
} catch (Exception e) {
LOG.error("Failed to delete " + remove.getPath(), e);
}
}
}
} catch (Exception e) {
LOG.error("Failed to clean old logs", e);
}
}
@Override @Override
public void run() { public void run() {
try { try {
@ -235,9 +369,8 @@ private void doAppLogAggregation() {
while (!this.appFinishing.get() && !this.aborted.get()) { while (!this.appFinishing.get() && !this.aborted.get()) {
synchronized(this) { synchronized(this) {
try { try {
if (this.logAggregationContext != null && this.logAggregationContext if (this.rollingMonitorInterval > 0) {
.getRollingIntervalSeconds() > 0) { wait(this.rollingMonitorInterval * 1000);
wait(this.logAggregationContext.getRollingIntervalSeconds() * 1000);
if (this.appFinishing.get() || this.aborted.get()) { if (this.appFinishing.get() || this.aborted.get()) {
break; break;
} }

View File

@ -342,7 +342,7 @@ protected void initAppAggregator(final ApplicationId appId, String user,
// New application // New application
final AppLogAggregator appLogAggregator = final AppLogAggregator appLogAggregator =
new AppLogAggregatorImpl(this.dispatcher, this.deletionService, new AppLogAggregatorImpl(this.dispatcher, this.deletionService,
getConfig(), appId, userUgi, dirsHandler, getConfig(), appId, userUgi, this.nodeId, dirsHandler,
getRemoteNodeLogFileForApp(appId, user), logRetentionPolicy, getRemoteNodeLogFileForApp(appId, user), logRetentionPolicy,
appAcls, logAggregationContext, this.context); appAcls, logAggregationContext, this.context);
if (this.appLogAggregators.putIfAbsent(appId, appLogAggregator) != null) { if (this.appLogAggregators.putIfAbsent(appId, appLogAggregator) != null) {

View File

@ -699,7 +699,7 @@ private void writeContainerLogs(File appLogDir, ContainerId containerId,
} }
} }
private void verifyContainerLogs(LogAggregationService logAggregationService, private String verifyContainerLogs(LogAggregationService logAggregationService,
ApplicationId appId, ContainerId[] expectedContainerIds, ApplicationId appId, ContainerId[] expectedContainerIds,
String[] logFiles, int numOfContainerLogs, boolean multiLogs) String[] logFiles, int numOfContainerLogs, boolean multiLogs)
throws IOException { throws IOException {
@ -811,6 +811,7 @@ private void verifyContainerLogs(LogAggregationService logAggregationService,
Assert.assertEquals(0, thisContainerMap.size()); Assert.assertEquals(0, thisContainerMap.size());
} }
Assert.assertEquals(0, logMap.size()); Assert.assertEquals(0, logMap.size());
return targetNodeFile.getPath().getName();
} finally { } finally {
reader.close(); reader.close();
} }
@ -1219,17 +1220,32 @@ public void testLogAggregationServiceWithPatterns() throws Exception {
dispatcher.stop(); dispatcher.stop();
} }
@SuppressWarnings("unchecked")
@Test (timeout = 50000) @Test (timeout = 50000)
public void testLogAggregationServiceWithInterval() throws Exception { public void testLogAggregationServiceWithInterval() throws Exception {
final int maxAttempts = 50; testLogAggregationService(false);
}
@Test (timeout = 50000)
public void testLogAggregationServiceWithRetention() throws Exception {
testLogAggregationService(true);
}
@SuppressWarnings("unchecked")
private void testLogAggregationService(boolean retentionSizeLimitation)
throws Exception {
LogAggregationContext logAggregationContextWithInterval = LogAggregationContext logAggregationContextWithInterval =
Records.newRecord(LogAggregationContext.class); Records.newRecord(LogAggregationContext.class);
logAggregationContextWithInterval.setRollingIntervalSeconds(5000); logAggregationContextWithInterval.setRollingIntervalSeconds(5000);
this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath()); this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
this.remoteRootLogDir.getAbsolutePath()); this.remoteRootLogDir.getAbsolutePath());
if (retentionSizeLimitation) {
// set the retention size as 1. The number of logs for one application
// in one NM should be 1.
this.conf.setInt(YarnConfiguration.NM_PREFIX
+ "log-aggregation.num-log-files-per-app", 1);
}
// by setting this configuration, the log files will not be deleted immediately after // by setting this configuration, the log files will not be deleted immediately after
// they are aggregated to remote directory. // they are aggregated to remote directory.
// We could use it to test whether the previous aggregated log files will be aggregated // We could use it to test whether the previous aggregated log files will be aggregated
@ -1280,23 +1296,29 @@ public void testLogAggregationServiceWithInterval() throws Exception {
.get(application); .get(application);
aggregator.doLogAggregationOutOfBand(); aggregator.doLogAggregationOutOfBand();
int count = 0; if (retentionSizeLimitation) {
while (numOfLogsAvailable(logAggregationService, application) != 1 Assert.assertTrue(waitAndCheckLogNum(logAggregationService, application,
&& count <= maxAttempts) { 50, 1, true, null));
Thread.sleep(100); } else {
count++; Assert.assertTrue(waitAndCheckLogNum(logAggregationService, application,
50, 1, false, null));
} }
String logFileInLastCycle = null;
// Container logs should be uploaded // Container logs should be uploaded
verifyContainerLogs(logAggregationService, application, logFileInLastCycle = verifyContainerLogs(logAggregationService, application,
new ContainerId[] { container }, logFiles1, 3, true); new ContainerId[] { container }, logFiles1, 3, true);
Thread.sleep(2000);
// There is no log generated at this time. Do the log aggregation again. // There is no log generated at this time. Do the log aggregation again.
aggregator.doLogAggregationOutOfBand(); aggregator.doLogAggregationOutOfBand();
// Same logs will not be aggregated again. // Same logs will not be aggregated again.
// Only one aggregated log file in Remote file directory. // Only one aggregated log file in Remote file directory.
Assert.assertEquals(numOfLogsAvailable(logAggregationService, application), Assert.assertEquals(numOfLogsAvailable(logAggregationService,
1); application, true, null), 1);
Thread.sleep(2000);
// Do log aggregation // Do log aggregation
String[] logFiles2 = new String[] { "stdout_1", "stderr_1", "syslog_1" }; String[] logFiles2 = new String[] { "stdout_1", "stderr_1", "syslog_1" };
@ -1304,16 +1326,19 @@ public void testLogAggregationServiceWithInterval() throws Exception {
aggregator.doLogAggregationOutOfBand(); aggregator.doLogAggregationOutOfBand();
count = 0; if (retentionSizeLimitation) {
while (numOfLogsAvailable(logAggregationService, application) != 2 Assert.assertTrue(waitAndCheckLogNum(logAggregationService, application,
&& count <= maxAttempts) { 50, 1, true, logFileInLastCycle));
Thread.sleep(100); } else {
count ++; Assert.assertTrue(waitAndCheckLogNum(logAggregationService, application,
50, 2, false, null));
} }
// Container logs should be uploaded // Container logs should be uploaded
verifyContainerLogs(logAggregationService, application, logFileInLastCycle = verifyContainerLogs(logAggregationService, application,
new ContainerId[] { container }, logFiles2, 3, true); new ContainerId[] { container }, logFiles2, 3, true);
Thread.sleep(2000);
// create another logs // create another logs
String[] logFiles3 = new String[] { "stdout_2", "stderr_2", "syslog_2" }; String[] logFiles3 = new String[] { "stdout_2", "stderr_2", "syslog_2" };
writeContainerLogs(appLogDir, container, logFiles3); writeContainerLogs(appLogDir, container, logFiles3);
@ -1323,13 +1348,13 @@ public void testLogAggregationServiceWithInterval() throws Exception {
dispatcher.await(); dispatcher.await();
logAggregationService.handle(new LogHandlerAppFinishedEvent(application)); logAggregationService.handle(new LogHandlerAppFinishedEvent(application));
count = 0; if (retentionSizeLimitation) {
while (numOfLogsAvailable(logAggregationService, application) != 3 Assert.assertTrue(waitAndCheckLogNum(logAggregationService, application,
&& count <= maxAttempts) { 50, 1, true, logFileInLastCycle));
Thread.sleep(100); } else {
count ++; Assert.assertTrue(waitAndCheckLogNum(logAggregationService, application,
50, 3, false, null));
} }
verifyContainerLogs(logAggregationService, application, verifyContainerLogs(logAggregationService, application,
new ContainerId[] { container }, logFiles3, 3, true); new ContainerId[] { container }, logFiles3, 3, true);
logAggregationService.stop(); logAggregationService.stop();
@ -1338,7 +1363,8 @@ public void testLogAggregationServiceWithInterval() throws Exception {
} }
private int numOfLogsAvailable(LogAggregationService logAggregationService, private int numOfLogsAvailable(LogAggregationService logAggregationService,
ApplicationId appId) throws IOException { ApplicationId appId, boolean sizeLimited, String lastLogFile)
throws IOException {
Path appLogDir = logAggregationService.getRemoteAppLogDir(appId, this.user); Path appLogDir = logAggregationService.getRemoteAppLogDir(appId, this.user);
RemoteIterator<FileStatus> nodeFiles = null; RemoteIterator<FileStatus> nodeFiles = null;
try { try {
@ -1354,7 +1380,9 @@ private int numOfLogsAvailable(LogAggregationService logAggregationService,
while (nodeFiles.hasNext()) { while (nodeFiles.hasNext()) {
FileStatus status = nodeFiles.next(); FileStatus status = nodeFiles.next();
String filename = status.getPath().getName(); String filename = status.getPath().getName();
if (filename.contains(LogAggregationUtils.TMP_FILE_SUFFIX)) { if (filename.contains(LogAggregationUtils.TMP_FILE_SUFFIX)
|| (lastLogFile != null && filename.contains(lastLogFile)
&& sizeLimited)) {
return -1; return -1;
} }
if (filename.contains(LogAggregationUtils if (filename.contains(LogAggregationUtils
@ -1364,4 +1392,18 @@ private int numOfLogsAvailable(LogAggregationService logAggregationService,
} }
return count; return count;
} }
private boolean waitAndCheckLogNum(
LogAggregationService logAggregationService, ApplicationId application,
int maxAttempts, int expectNum, boolean sizeLimited, String lastLogFile)
throws IOException, InterruptedException {
int count = 0;
while (numOfLogsAvailable(logAggregationService, application, sizeLimited,
lastLogFile) != expectNum && count <= maxAttempts) {
Thread.sleep(500);
count++;
}
return numOfLogsAvailable(logAggregationService, application, sizeLimited,
lastLogFile) == expectNum;
}
} }