YARN-2583. Modified AggregatedLogDeletionService to be able to delete rolling aggregated logs. Contributed by Xuan Gong.
(cherry picked from commit cb81bac002
)
This commit is contained in:
parent
281041a57e
commit
16d7144337
|
@ -270,6 +270,9 @@ Release 2.6.0 - UNRELEASED
|
||||||
YARN-2629. Made the distributed shell use the domain-based timeline ACLs.
|
YARN-2629. Made the distributed shell use the domain-based timeline ACLs.
|
||||||
(zjshen)
|
(zjshen)
|
||||||
|
|
||||||
|
YARN-2583. Modified AggregatedLogDeletionService to be able to delete rolling
|
||||||
|
aggregated logs. (Xuan Gong via zjshen)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
|
@ -24,38 +24,53 @@ import java.util.TimerTask;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.ipc.RPC;
|
||||||
import org.apache.hadoop.security.AccessControlException;
|
import org.apache.hadoop.security.AccessControlException;
|
||||||
import org.apache.hadoop.service.AbstractService;
|
import org.apache.hadoop.service.AbstractService;
|
||||||
|
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||||
|
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||||
|
import org.apache.hadoop.yarn.client.ClientRMProxy;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
|
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A service that periodically deletes aggregated logs.
|
* A service that periodically deletes aggregated logs.
|
||||||
*/
|
*/
|
||||||
@Private
|
@InterfaceAudience.LimitedPrivate({"yarn", "mapreduce"})
|
||||||
public class AggregatedLogDeletionService extends AbstractService {
|
public class AggregatedLogDeletionService extends AbstractService {
|
||||||
private static final Log LOG = LogFactory.getLog(AggregatedLogDeletionService.class);
|
private static final Log LOG = LogFactory.getLog(AggregatedLogDeletionService.class);
|
||||||
|
|
||||||
private Timer timer = null;
|
private Timer timer = null;
|
||||||
private long checkIntervalMsecs;
|
private long checkIntervalMsecs;
|
||||||
|
private LogDeletionTask task;
|
||||||
|
|
||||||
static class LogDeletionTask extends TimerTask {
|
static class LogDeletionTask extends TimerTask {
|
||||||
private Configuration conf;
|
private Configuration conf;
|
||||||
private long retentionMillis;
|
private long retentionMillis;
|
||||||
private String suffix = null;
|
private String suffix = null;
|
||||||
private Path remoteRootLogDir = null;
|
private Path remoteRootLogDir = null;
|
||||||
|
private ApplicationClientProtocol rmClient = null;
|
||||||
|
|
||||||
public LogDeletionTask(Configuration conf, long retentionSecs) {
|
public LogDeletionTask(Configuration conf, long retentionSecs, ApplicationClientProtocol rmClient) {
|
||||||
this.conf = conf;
|
this.conf = conf;
|
||||||
this.retentionMillis = retentionSecs * 1000;
|
this.retentionMillis = retentionSecs * 1000;
|
||||||
this.suffix = LogAggregationUtils.getRemoteNodeLogDirSuffix(conf);
|
this.suffix = LogAggregationUtils.getRemoteNodeLogDirSuffix(conf);
|
||||||
this.remoteRootLogDir =
|
this.remoteRootLogDir =
|
||||||
new Path(conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
|
new Path(conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
|
||||||
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
|
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
|
||||||
|
this.rmClient = rmClient;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -64,11 +79,10 @@ public class AggregatedLogDeletionService extends AbstractService {
|
||||||
LOG.info("aggregated log deletion started.");
|
LOG.info("aggregated log deletion started.");
|
||||||
try {
|
try {
|
||||||
FileSystem fs = remoteRootLogDir.getFileSystem(conf);
|
FileSystem fs = remoteRootLogDir.getFileSystem(conf);
|
||||||
|
|
||||||
for(FileStatus userDir : fs.listStatus(remoteRootLogDir)) {
|
for(FileStatus userDir : fs.listStatus(remoteRootLogDir)) {
|
||||||
if(userDir.isDirectory()) {
|
if(userDir.isDirectory()) {
|
||||||
Path userDirPath = new Path(userDir.getPath(), suffix);
|
Path userDirPath = new Path(userDir.getPath(), suffix);
|
||||||
deleteOldLogDirsFrom(userDirPath, cutoffMillis, fs);
|
deleteOldLogDirsFrom(userDirPath, cutoffMillis, fs, rmClient);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
|
@ -79,18 +93,36 @@ public class AggregatedLogDeletionService extends AbstractService {
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void deleteOldLogDirsFrom(Path dir, long cutoffMillis,
|
private static void deleteOldLogDirsFrom(Path dir, long cutoffMillis,
|
||||||
FileSystem fs) {
|
FileSystem fs, ApplicationClientProtocol rmClient) {
|
||||||
try {
|
try {
|
||||||
for(FileStatus appDir : fs.listStatus(dir)) {
|
for(FileStatus appDir : fs.listStatus(dir)) {
|
||||||
if(appDir.isDirectory() &&
|
if(appDir.isDirectory() &&
|
||||||
appDir.getModificationTime() < cutoffMillis) {
|
appDir.getModificationTime() < cutoffMillis) {
|
||||||
if(shouldDeleteLogDir(appDir, cutoffMillis, fs)) {
|
boolean appTerminated =
|
||||||
|
isApplicationTerminated(ConverterUtils.toApplicationId(appDir
|
||||||
|
.getPath().getName()), rmClient);
|
||||||
|
if(appTerminated && shouldDeleteLogDir(appDir, cutoffMillis, fs)) {
|
||||||
try {
|
try {
|
||||||
LOG.info("Deleting aggregated logs in "+appDir.getPath());
|
LOG.info("Deleting aggregated logs in "+appDir.getPath());
|
||||||
fs.delete(appDir.getPath(), true);
|
fs.delete(appDir.getPath(), true);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
logIOException("Could not delete "+appDir.getPath(), e);
|
logIOException("Could not delete "+appDir.getPath(), e);
|
||||||
}
|
}
|
||||||
|
} else if (!appTerminated){
|
||||||
|
try {
|
||||||
|
for(FileStatus node: fs.listStatus(appDir.getPath())) {
|
||||||
|
if(node.getModificationTime() < cutoffMillis) {
|
||||||
|
try {
|
||||||
|
fs.delete(node.getPath(), true);
|
||||||
|
} catch (IOException ex) {
|
||||||
|
logIOException("Could not delete "+appDir.getPath(), ex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch(IOException e) {
|
||||||
|
logIOException(
|
||||||
|
"Error reading the contents of " + appDir.getPath(), e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -115,6 +147,29 @@ public class AggregatedLogDeletionService extends AbstractService {
|
||||||
}
|
}
|
||||||
return shouldDelete;
|
return shouldDelete;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static boolean isApplicationTerminated(ApplicationId appId,
|
||||||
|
ApplicationClientProtocol rmClient) throws IOException {
|
||||||
|
ApplicationReport appReport = null;
|
||||||
|
try {
|
||||||
|
appReport =
|
||||||
|
rmClient.getApplicationReport(
|
||||||
|
GetApplicationReportRequest.newInstance(appId))
|
||||||
|
.getApplicationReport();
|
||||||
|
} catch (ApplicationNotFoundException e) {
|
||||||
|
return true;
|
||||||
|
} catch (YarnException e) {
|
||||||
|
throw new IOException(e);
|
||||||
|
}
|
||||||
|
YarnApplicationState currentState = appReport.getYarnApplicationState();
|
||||||
|
return currentState == YarnApplicationState.FAILED
|
||||||
|
|| currentState == YarnApplicationState.KILLED
|
||||||
|
|| currentState == YarnApplicationState.FINISHED;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ApplicationClientProtocol getRMClient() {
|
||||||
|
return this.rmClient;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void logIOException(String comment, IOException e) {
|
private static void logIOException(String comment, IOException e) {
|
||||||
|
@ -140,6 +195,7 @@ public class AggregatedLogDeletionService extends AbstractService {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void serviceStop() throws Exception {
|
protected void serviceStop() throws Exception {
|
||||||
|
stopRMClient();
|
||||||
stopTimer();
|
stopTimer();
|
||||||
super.serviceStop();
|
super.serviceStop();
|
||||||
}
|
}
|
||||||
|
@ -156,10 +212,11 @@ public class AggregatedLogDeletionService extends AbstractService {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void refreshLogRetentionSettings() {
|
public void refreshLogRetentionSettings() throws IOException {
|
||||||
if (getServiceState() == STATE.STARTED) {
|
if (getServiceState() == STATE.STARTED) {
|
||||||
Configuration conf = createConf();
|
Configuration conf = createConf();
|
||||||
setConfig(conf);
|
setConfig(conf);
|
||||||
|
stopRMClient();
|
||||||
stopTimer();
|
stopTimer();
|
||||||
scheduleLogDeletionTask();
|
scheduleLogDeletionTask();
|
||||||
} else {
|
} else {
|
||||||
|
@ -167,7 +224,7 @@ public class AggregatedLogDeletionService extends AbstractService {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void scheduleLogDeletionTask() {
|
private void scheduleLogDeletionTask() throws IOException {
|
||||||
Configuration conf = getConfig();
|
Configuration conf = getConfig();
|
||||||
if (!conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
|
if (!conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
|
||||||
YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED)) {
|
YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED)) {
|
||||||
|
@ -183,7 +240,7 @@ public class AggregatedLogDeletionService extends AbstractService {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
setLogAggCheckIntervalMsecs(retentionSecs);
|
setLogAggCheckIntervalMsecs(retentionSecs);
|
||||||
TimerTask task = new LogDeletionTask(conf, retentionSecs);
|
task = new LogDeletionTask(conf, retentionSecs, creatRMClient());
|
||||||
timer = new Timer();
|
timer = new Timer();
|
||||||
timer.scheduleAtFixedRate(task, 0, checkIntervalMsecs);
|
timer.scheduleAtFixedRate(task, 0, checkIntervalMsecs);
|
||||||
}
|
}
|
||||||
|
@ -201,4 +258,20 @@ public class AggregatedLogDeletionService extends AbstractService {
|
||||||
protected Configuration createConf() {
|
protected Configuration createConf() {
|
||||||
return new Configuration();
|
return new Configuration();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Directly create and use ApplicationClientProtocol.
|
||||||
|
// We have already marked ApplicationClientProtocol.getApplicationReport
|
||||||
|
// as @Idempotent, it will automatically take care of RM restart/failover.
|
||||||
|
@VisibleForTesting
|
||||||
|
protected ApplicationClientProtocol creatRMClient() throws IOException {
|
||||||
|
return ClientRMProxy.createRMProxy(getConfig(),
|
||||||
|
ApplicationClientProtocol.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
protected void stopRMClient() {
|
||||||
|
if (task != null && task.getRMClient() != null) {
|
||||||
|
RPC.stopProxy(task.getRMClient());
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,6 +20,9 @@ package org.apache.hadoop.yarn.logaggregation;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
|
@ -27,6 +30,12 @@ import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.FilterFileSystem;
|
import org.apache.hadoop.fs.FilterFileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.security.AccessControlException;
|
import org.apache.hadoop.security.AccessControlException;
|
||||||
|
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||||
|
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
@ -51,7 +60,7 @@ public class TestAggregatedLogDeletionService {
|
||||||
String root = "mockfs://foo/";
|
String root = "mockfs://foo/";
|
||||||
String remoteRootLogDir = root+"tmp/logs";
|
String remoteRootLogDir = root+"tmp/logs";
|
||||||
String suffix = "logs";
|
String suffix = "logs";
|
||||||
Configuration conf = new Configuration();
|
final Configuration conf = new Configuration();
|
||||||
conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
|
conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
|
||||||
conf.set(YarnConfiguration.LOG_AGGREGATION_ENABLED, "true");
|
conf.set(YarnConfiguration.LOG_AGGREGATION_ENABLED, "true");
|
||||||
conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, "1800");
|
conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, "1800");
|
||||||
|
@ -70,21 +79,36 @@ public class TestAggregatedLogDeletionService {
|
||||||
when(mockFs.listStatus(remoteRootLogPath)).thenReturn(
|
when(mockFs.listStatus(remoteRootLogPath)).thenReturn(
|
||||||
new FileStatus[]{userDirStatus});
|
new FileStatus[]{userDirStatus});
|
||||||
|
|
||||||
|
ApplicationId appId1 =
|
||||||
|
ApplicationId.newInstance(System.currentTimeMillis(), 1);
|
||||||
Path userLogDir = new Path(userDir, suffix);
|
Path userLogDir = new Path(userDir, suffix);
|
||||||
Path app1Dir = new Path(userLogDir, "application_1_1");
|
Path app1Dir = new Path(userLogDir, appId1.toString());
|
||||||
FileStatus app1DirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, app1Dir);
|
FileStatus app1DirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, app1Dir);
|
||||||
|
|
||||||
Path app2Dir = new Path(userLogDir, "application_1_2");
|
ApplicationId appId2 =
|
||||||
|
ApplicationId.newInstance(System.currentTimeMillis(), 2);
|
||||||
|
Path app2Dir = new Path(userLogDir, appId2.toString());
|
||||||
FileStatus app2DirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, app2Dir);
|
FileStatus app2DirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, app2Dir);
|
||||||
|
|
||||||
Path app3Dir = new Path(userLogDir, "application_1_3");
|
ApplicationId appId3 =
|
||||||
|
ApplicationId.newInstance(System.currentTimeMillis(), 3);
|
||||||
|
Path app3Dir = new Path(userLogDir, appId3.toString());
|
||||||
FileStatus app3DirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, app3Dir);
|
FileStatus app3DirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, app3Dir);
|
||||||
|
|
||||||
Path app4Dir = new Path(userLogDir, "application_1_4");
|
ApplicationId appId4 =
|
||||||
|
ApplicationId.newInstance(System.currentTimeMillis(), 4);
|
||||||
|
Path app4Dir = new Path(userLogDir, appId4.toString());
|
||||||
FileStatus app4DirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, app4Dir);
|
FileStatus app4DirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, app4Dir);
|
||||||
|
|
||||||
|
ApplicationId appId5 =
|
||||||
|
ApplicationId.newInstance(System.currentTimeMillis(), 5);
|
||||||
|
Path app5Dir = new Path(userLogDir, appId5.toString());
|
||||||
|
FileStatus app5DirStatus =
|
||||||
|
new FileStatus(0, true, 0, 0, toDeleteTime, app5Dir);
|
||||||
|
|
||||||
when(mockFs.listStatus(userLogDir)).thenReturn(
|
when(mockFs.listStatus(userLogDir)).thenReturn(
|
||||||
new FileStatus[]{app1DirStatus, app2DirStatus, app3DirStatus, app4DirStatus});
|
new FileStatus[] { app1DirStatus, app2DirStatus, app3DirStatus,
|
||||||
|
app4DirStatus, app5DirStatus });
|
||||||
|
|
||||||
when(mockFs.listStatus(app1Dir)).thenReturn(
|
when(mockFs.listStatus(app1Dir)).thenReturn(
|
||||||
new FileStatus[]{});
|
new FileStatus[]{});
|
||||||
|
@ -118,19 +142,54 @@ public class TestAggregatedLogDeletionService {
|
||||||
when(mockFs.listStatus(app4Dir)).thenReturn(
|
when(mockFs.listStatus(app4Dir)).thenReturn(
|
||||||
new FileStatus[]{app4Log1Status, app4Log2Status});
|
new FileStatus[]{app4Log1Status, app4Log2Status});
|
||||||
|
|
||||||
AggregatedLogDeletionService.LogDeletionTask task =
|
Path app5Log1 = new Path(app5Dir, "host1");
|
||||||
new AggregatedLogDeletionService.LogDeletionTask(conf, 1800);
|
FileStatus app5Log1Status = new FileStatus(10, false, 1, 1, toDeleteTime, app5Log1);
|
||||||
|
|
||||||
task.run();
|
Path app5Log2 = new Path(app5Dir, "host2");
|
||||||
|
FileStatus app5Log2Status = new FileStatus(10, false, 1, 1, toKeepTime, app5Log2);
|
||||||
|
|
||||||
verify(mockFs).delete(app1Dir, true);
|
when(mockFs.listStatus(app5Dir)).thenReturn(
|
||||||
verify(mockFs, times(0)).delete(app2Dir, true);
|
new FileStatus[]{app5Log1Status, app5Log2Status});
|
||||||
verify(mockFs).delete(app3Dir, true);
|
|
||||||
verify(mockFs).delete(app4Dir, true);
|
final List<ApplicationId> finishedApplications =
|
||||||
|
Collections.unmodifiableList(Arrays.asList(appId1, appId2, appId3,
|
||||||
|
appId4));
|
||||||
|
final List<ApplicationId> runningApplications =
|
||||||
|
Collections.unmodifiableList(Arrays.asList(appId5));
|
||||||
|
|
||||||
|
AggregatedLogDeletionService deletionService =
|
||||||
|
new AggregatedLogDeletionService() {
|
||||||
|
@Override
|
||||||
|
protected ApplicationClientProtocol creatRMClient()
|
||||||
|
throws IOException {
|
||||||
|
try {
|
||||||
|
return createMockRMClient(finishedApplications,
|
||||||
|
runningApplications);
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new IOException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@Override
|
||||||
|
protected void stopRMClient() {
|
||||||
|
// DO NOTHING
|
||||||
|
}
|
||||||
|
};
|
||||||
|
deletionService.init(conf);
|
||||||
|
deletionService.start();
|
||||||
|
|
||||||
|
verify(mockFs, timeout(2000)).delete(app1Dir, true);
|
||||||
|
verify(mockFs, timeout(2000).times(0)).delete(app2Dir, true);
|
||||||
|
verify(mockFs, timeout(2000)).delete(app3Dir, true);
|
||||||
|
verify(mockFs, timeout(2000)).delete(app4Dir, true);
|
||||||
|
verify(mockFs, timeout(2000).times(0)).delete(app5Dir, true);
|
||||||
|
verify(mockFs, timeout(2000)).delete(app5Log1, true);
|
||||||
|
verify(mockFs, timeout(2000).times(0)).delete(app5Log2, true);
|
||||||
|
|
||||||
|
deletionService.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testRefreshLogRetentionSettings() throws IOException {
|
public void testRefreshLogRetentionSettings() throws Exception {
|
||||||
long now = System.currentTimeMillis();
|
long now = System.currentTimeMillis();
|
||||||
//time before 2000 sec
|
//time before 2000 sec
|
||||||
long before2000Secs = now - (2000 * 1000);
|
long before2000Secs = now - (2000 * 1000);
|
||||||
|
@ -163,13 +222,17 @@ public class TestAggregatedLogDeletionService {
|
||||||
|
|
||||||
Path userLogDir = new Path(userDir, suffix);
|
Path userLogDir = new Path(userDir, suffix);
|
||||||
|
|
||||||
|
ApplicationId appId1 =
|
||||||
|
ApplicationId.newInstance(System.currentTimeMillis(), 1);
|
||||||
//Set time last modified of app1Dir directory and its files to before2000Secs
|
//Set time last modified of app1Dir directory and its files to before2000Secs
|
||||||
Path app1Dir = new Path(userLogDir, "application_1_1");
|
Path app1Dir = new Path(userLogDir, appId1.toString());
|
||||||
FileStatus app1DirStatus = new FileStatus(0, true, 0, 0, before2000Secs,
|
FileStatus app1DirStatus = new FileStatus(0, true, 0, 0, before2000Secs,
|
||||||
app1Dir);
|
app1Dir);
|
||||||
|
|
||||||
|
ApplicationId appId2 =
|
||||||
|
ApplicationId.newInstance(System.currentTimeMillis(), 2);
|
||||||
//Set time last modified of app1Dir directory and its files to before50Secs
|
//Set time last modified of app1Dir directory and its files to before50Secs
|
||||||
Path app2Dir = new Path(userLogDir, "application_1_2");
|
Path app2Dir = new Path(userLogDir, appId2.toString());
|
||||||
FileStatus app2DirStatus = new FileStatus(0, true, 0, 0, before50Secs,
|
FileStatus app2DirStatus = new FileStatus(0, true, 0, 0, before50Secs,
|
||||||
app2Dir);
|
app2Dir);
|
||||||
|
|
||||||
|
@ -190,11 +253,27 @@ public class TestAggregatedLogDeletionService {
|
||||||
when(mockFs.listStatus(app2Dir)).thenReturn(
|
when(mockFs.listStatus(app2Dir)).thenReturn(
|
||||||
new FileStatus[] { app2Log1Status });
|
new FileStatus[] { app2Log1Status });
|
||||||
|
|
||||||
|
final List<ApplicationId> finishedApplications =
|
||||||
|
Collections.unmodifiableList(Arrays.asList(appId1, appId2));
|
||||||
|
|
||||||
AggregatedLogDeletionService deletionSvc = new AggregatedLogDeletionService() {
|
AggregatedLogDeletionService deletionSvc = new AggregatedLogDeletionService() {
|
||||||
@Override
|
@Override
|
||||||
protected Configuration createConf() {
|
protected Configuration createConf() {
|
||||||
return conf;
|
return conf;
|
||||||
}
|
}
|
||||||
|
@Override
|
||||||
|
protected ApplicationClientProtocol creatRMClient()
|
||||||
|
throws IOException {
|
||||||
|
try {
|
||||||
|
return createMockRMClient(finishedApplications, null);
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new IOException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@Override
|
||||||
|
protected void stopRMClient() {
|
||||||
|
// DO NOTHING
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
deletionSvc.init(conf);
|
deletionSvc.init(conf);
|
||||||
|
@ -253,8 +332,10 @@ public class TestAggregatedLogDeletionService {
|
||||||
when(mockFs.listStatus(remoteRootLogPath)).thenReturn(
|
when(mockFs.listStatus(remoteRootLogPath)).thenReturn(
|
||||||
new FileStatus[]{userDirStatus});
|
new FileStatus[]{userDirStatus});
|
||||||
|
|
||||||
|
ApplicationId appId1 =
|
||||||
|
ApplicationId.newInstance(System.currentTimeMillis(), 1);
|
||||||
Path userLogDir = new Path(userDir, suffix);
|
Path userLogDir = new Path(userDir, suffix);
|
||||||
Path app1Dir = new Path(userLogDir, "application_1_1");
|
Path app1Dir = new Path(userLogDir, appId1.toString());
|
||||||
FileStatus app1DirStatus = new FileStatus(0, true, 0, 0, now, app1Dir);
|
FileStatus app1DirStatus = new FileStatus(0, true, 0, 0, now, app1Dir);
|
||||||
|
|
||||||
when(mockFs.listStatus(userLogDir)).thenReturn(
|
when(mockFs.listStatus(userLogDir)).thenReturn(
|
||||||
|
@ -266,8 +347,25 @@ public class TestAggregatedLogDeletionService {
|
||||||
when(mockFs.listStatus(app1Dir)).thenReturn(
|
when(mockFs.listStatus(app1Dir)).thenReturn(
|
||||||
new FileStatus[]{app1Log1Status});
|
new FileStatus[]{app1Log1Status});
|
||||||
|
|
||||||
|
final List<ApplicationId> finishedApplications =
|
||||||
|
Collections.unmodifiableList(Arrays.asList(appId1));
|
||||||
|
|
||||||
AggregatedLogDeletionService deletionSvc =
|
AggregatedLogDeletionService deletionSvc =
|
||||||
new AggregatedLogDeletionService();
|
new AggregatedLogDeletionService() {
|
||||||
|
@Override
|
||||||
|
protected ApplicationClientProtocol creatRMClient()
|
||||||
|
throws IOException {
|
||||||
|
try {
|
||||||
|
return createMockRMClient(finishedApplications, null);
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new IOException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@Override
|
||||||
|
protected void stopRMClient() {
|
||||||
|
// DO NOTHING
|
||||||
|
}
|
||||||
|
};
|
||||||
deletionSvc.init(conf);
|
deletionSvc.init(conf);
|
||||||
deletionSvc.start();
|
deletionSvc.start();
|
||||||
|
|
||||||
|
@ -293,4 +391,54 @@ public class TestAggregatedLogDeletionService {
|
||||||
}
|
}
|
||||||
public void initialize(URI name, Configuration conf) throws IOException {}
|
public void initialize(URI name, Configuration conf) throws IOException {}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static ApplicationClientProtocol createMockRMClient(
|
||||||
|
List<ApplicationId> finishedApplicaitons,
|
||||||
|
List<ApplicationId> runningApplications) throws Exception {
|
||||||
|
final ApplicationClientProtocol mockProtocol =
|
||||||
|
mock(ApplicationClientProtocol.class);
|
||||||
|
if (finishedApplicaitons != null && !finishedApplicaitons.isEmpty()) {
|
||||||
|
for (ApplicationId appId : finishedApplicaitons) {
|
||||||
|
GetApplicationReportRequest request =
|
||||||
|
GetApplicationReportRequest.newInstance(appId);
|
||||||
|
GetApplicationReportResponse response =
|
||||||
|
createApplicationReportWithFinishedApplication();
|
||||||
|
when(mockProtocol.getApplicationReport(request))
|
||||||
|
.thenReturn(response);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (runningApplications != null && !runningApplications.isEmpty()) {
|
||||||
|
for (ApplicationId appId : runningApplications) {
|
||||||
|
GetApplicationReportRequest request =
|
||||||
|
GetApplicationReportRequest.newInstance(appId);
|
||||||
|
GetApplicationReportResponse response =
|
||||||
|
createApplicationReportWithRunningApplication();
|
||||||
|
when(mockProtocol.getApplicationReport(request))
|
||||||
|
.thenReturn(response);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return mockProtocol;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static GetApplicationReportResponse
|
||||||
|
createApplicationReportWithRunningApplication() {
|
||||||
|
ApplicationReport report = mock(ApplicationReport.class);
|
||||||
|
when(report.getYarnApplicationState()).thenReturn(
|
||||||
|
YarnApplicationState.RUNNING);
|
||||||
|
GetApplicationReportResponse response =
|
||||||
|
mock(GetApplicationReportResponse.class);
|
||||||
|
when(response.getApplicationReport()).thenReturn(report);
|
||||||
|
return response;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static GetApplicationReportResponse
|
||||||
|
createApplicationReportWithFinishedApplication() {
|
||||||
|
ApplicationReport report = mock(ApplicationReport.class);
|
||||||
|
when(report.getYarnApplicationState()).thenReturn(
|
||||||
|
YarnApplicationState.FINISHED);
|
||||||
|
GetApplicationReportResponse response =
|
||||||
|
mock(GetApplicationReportResponse.class);
|
||||||
|
when(response.getApplicationReport()).thenReturn(report);
|
||||||
|
return response;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,6 +20,10 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregatio
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.security.PrivilegedExceptionAction;
|
import java.security.PrivilegedExceptionAction;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.Comparator;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -33,6 +37,7 @@ import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
@ -41,6 +46,8 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
|
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
|
||||||
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||||
import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy;
|
import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy;
|
||||||
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
|
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
|
||||||
|
@ -65,6 +72,23 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
|
||||||
private static final Log LOG = LogFactory
|
private static final Log LOG = LogFactory
|
||||||
.getLog(AppLogAggregatorImpl.class);
|
.getLog(AppLogAggregatorImpl.class);
|
||||||
private static final int THREAD_SLEEP_TIME = 1000;
|
private static final int THREAD_SLEEP_TIME = 1000;
|
||||||
|
// This is temporary solution. The configuration will be deleted once
|
||||||
|
// we find a more scalable method to only write a single log file per LRS.
|
||||||
|
private static final String NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP
|
||||||
|
= YarnConfiguration.NM_PREFIX + "log-aggregation.num-log-files-per-app";
|
||||||
|
private static final int
|
||||||
|
DEFAULT_NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP = 30;
|
||||||
|
|
||||||
|
// This configuration is for debug and test purpose. By setting
|
||||||
|
// this configuration as true. We can break the lower bound of
|
||||||
|
// NM_LOG_AGGREGATION_ROLL_MONITORING_INTERVAL_SECONDS.
|
||||||
|
private static final String NM_LOG_AGGREGATION_DEBUG_ENABLED
|
||||||
|
= YarnConfiguration.NM_PREFIX + "log-aggregation.debug-enabled";
|
||||||
|
private static final boolean
|
||||||
|
DEFAULT_NM_LOG_AGGREGATION_DEBUG_ENABLED = false;
|
||||||
|
|
||||||
|
private static final long
|
||||||
|
NM_LOG_AGGREGATION_MIN_ROLL_MONITORING_INTERVAL_SECONDS = 3600;
|
||||||
|
|
||||||
private final LocalDirsHandlerService dirsHandler;
|
private final LocalDirsHandlerService dirsHandler;
|
||||||
private final Dispatcher dispatcher;
|
private final Dispatcher dispatcher;
|
||||||
|
@ -85,13 +109,16 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
|
||||||
private final Map<ApplicationAccessType, String> appAcls;
|
private final Map<ApplicationAccessType, String> appAcls;
|
||||||
private final LogAggregationContext logAggregationContext;
|
private final LogAggregationContext logAggregationContext;
|
||||||
private final Context context;
|
private final Context context;
|
||||||
|
private final int retentionSize;
|
||||||
|
private final long rollingMonitorInterval;
|
||||||
|
private final NodeId nodeId;
|
||||||
|
|
||||||
private final Map<ContainerId, ContainerLogAggregator> containerLogAggregators =
|
private final Map<ContainerId, ContainerLogAggregator> containerLogAggregators =
|
||||||
new HashMap<ContainerId, ContainerLogAggregator>();
|
new HashMap<ContainerId, ContainerLogAggregator>();
|
||||||
|
|
||||||
public AppLogAggregatorImpl(Dispatcher dispatcher,
|
public AppLogAggregatorImpl(Dispatcher dispatcher,
|
||||||
DeletionService deletionService, Configuration conf,
|
DeletionService deletionService, Configuration conf,
|
||||||
ApplicationId appId, UserGroupInformation userUgi,
|
ApplicationId appId, UserGroupInformation userUgi, NodeId nodeId,
|
||||||
LocalDirsHandlerService dirsHandler, Path remoteNodeLogFileForApp,
|
LocalDirsHandlerService dirsHandler, Path remoteNodeLogFileForApp,
|
||||||
ContainerLogsRetentionPolicy retentionPolicy,
|
ContainerLogsRetentionPolicy retentionPolicy,
|
||||||
Map<ApplicationAccessType, String> appAcls,
|
Map<ApplicationAccessType, String> appAcls,
|
||||||
|
@ -111,6 +138,51 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
|
||||||
this.appAcls = appAcls;
|
this.appAcls = appAcls;
|
||||||
this.logAggregationContext = logAggregationContext;
|
this.logAggregationContext = logAggregationContext;
|
||||||
this.context = context;
|
this.context = context;
|
||||||
|
this.nodeId = nodeId;
|
||||||
|
int configuredRentionSize =
|
||||||
|
conf.getInt(NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP,
|
||||||
|
DEFAULT_NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP);
|
||||||
|
if (configuredRentionSize <= 0) {
|
||||||
|
this.retentionSize =
|
||||||
|
DEFAULT_NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP;
|
||||||
|
} else {
|
||||||
|
this.retentionSize = configuredRentionSize;
|
||||||
|
}
|
||||||
|
long configuredRollingMonitorInterval =
|
||||||
|
this.logAggregationContext == null ? -1 : this.logAggregationContext
|
||||||
|
.getRollingIntervalSeconds();
|
||||||
|
boolean debug_mode =
|
||||||
|
conf.getBoolean(NM_LOG_AGGREGATION_DEBUG_ENABLED,
|
||||||
|
DEFAULT_NM_LOG_AGGREGATION_DEBUG_ENABLED);
|
||||||
|
if (configuredRollingMonitorInterval > 0
|
||||||
|
&& configuredRollingMonitorInterval <
|
||||||
|
NM_LOG_AGGREGATION_MIN_ROLL_MONITORING_INTERVAL_SECONDS) {
|
||||||
|
if (debug_mode) {
|
||||||
|
this.rollingMonitorInterval = configuredRollingMonitorInterval;
|
||||||
|
} else {
|
||||||
|
LOG.warn(
|
||||||
|
"rollingMonitorIntervall should be more than or equal to "
|
||||||
|
+ NM_LOG_AGGREGATION_MIN_ROLL_MONITORING_INTERVAL_SECONDS
|
||||||
|
+ " seconds. Using "
|
||||||
|
+ NM_LOG_AGGREGATION_MIN_ROLL_MONITORING_INTERVAL_SECONDS
|
||||||
|
+ " seconds instead.");
|
||||||
|
this.rollingMonitorInterval =
|
||||||
|
NM_LOG_AGGREGATION_MIN_ROLL_MONITORING_INTERVAL_SECONDS;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if (configuredRollingMonitorInterval <= 0) {
|
||||||
|
LOG.warn("rollingMonitorInterval is set as "
|
||||||
|
+ configuredRollingMonitorInterval + ". "
|
||||||
|
+ "The log rolling mornitoring interval is disabled. "
|
||||||
|
+ "The logs will be aggregated after this application is finished.");
|
||||||
|
} else {
|
||||||
|
LOG.warn("rollingMonitorInterval is set as "
|
||||||
|
+ configuredRollingMonitorInterval + ". "
|
||||||
|
+ "The logs will be aggregated every "
|
||||||
|
+ configuredRollingMonitorInterval + " seconds");
|
||||||
|
}
|
||||||
|
this.rollingMonitorInterval = configuredRollingMonitorInterval;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void uploadLogsForContainers() {
|
private void uploadLogsForContainers() {
|
||||||
|
@ -181,12 +253,17 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Before upload logs, make sure the number of existing logs
|
||||||
|
// is smaller than the configured NM log aggregation retention size.
|
||||||
|
if (uploadedLogsInThisCycle) {
|
||||||
|
cleanOldLogs();
|
||||||
|
}
|
||||||
|
|
||||||
if (writer != null) {
|
if (writer != null) {
|
||||||
writer.close();
|
writer.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
final Path renamedPath = logAggregationContext == null ||
|
final Path renamedPath = this.rollingMonitorInterval <= 0
|
||||||
logAggregationContext.getRollingIntervalSeconds() <= 0
|
|
||||||
? remoteNodeLogFileForApp : new Path(
|
? remoteNodeLogFileForApp : new Path(
|
||||||
remoteNodeLogFileForApp.getParent(),
|
remoteNodeLogFileForApp.getParent(),
|
||||||
remoteNodeLogFileForApp.getName() + "_"
|
remoteNodeLogFileForApp.getName() + "_"
|
||||||
|
@ -198,9 +275,12 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
|
||||||
@Override
|
@Override
|
||||||
public Object run() throws Exception {
|
public Object run() throws Exception {
|
||||||
FileSystem remoteFS = FileSystem.get(conf);
|
FileSystem remoteFS = FileSystem.get(conf);
|
||||||
if (remoteFS.exists(remoteNodeTmpLogFileForApp)
|
if (remoteFS.exists(remoteNodeTmpLogFileForApp)) {
|
||||||
&& rename) {
|
if (rename) {
|
||||||
remoteFS.rename(remoteNodeTmpLogFileForApp, renamedPath);
|
remoteFS.rename(remoteNodeTmpLogFileForApp, renamedPath);
|
||||||
|
} else {
|
||||||
|
remoteFS.delete(remoteNodeTmpLogFileForApp, false);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
@ -218,6 +298,60 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void cleanOldLogs() {
|
||||||
|
try {
|
||||||
|
final FileSystem remoteFS =
|
||||||
|
this.remoteNodeLogFileForApp.getFileSystem(conf);
|
||||||
|
Path appDir =
|
||||||
|
this.remoteNodeLogFileForApp.getParent().makeQualified(
|
||||||
|
remoteFS.getUri(), remoteFS.getWorkingDirectory());
|
||||||
|
Set<FileStatus> status =
|
||||||
|
new HashSet<FileStatus>(Arrays.asList(remoteFS.listStatus(appDir)));
|
||||||
|
|
||||||
|
Iterable<FileStatus> mask =
|
||||||
|
Iterables.filter(status, new Predicate<FileStatus>() {
|
||||||
|
@Override
|
||||||
|
public boolean apply(FileStatus next) {
|
||||||
|
return next.getPath().getName()
|
||||||
|
.contains(LogAggregationUtils.getNodeString(nodeId))
|
||||||
|
&& !next.getPath().getName().endsWith(
|
||||||
|
LogAggregationUtils.TMP_FILE_SUFFIX);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
status = Sets.newHashSet(mask);
|
||||||
|
// Normally, we just need to delete one oldest log
|
||||||
|
// before we upload a new log.
|
||||||
|
// If we can not delete the older logs in this cycle,
|
||||||
|
// we will delete them in next cycle.
|
||||||
|
if (status.size() >= this.retentionSize) {
|
||||||
|
// sort by the lastModificationTime ascending
|
||||||
|
List<FileStatus> statusList = new ArrayList<FileStatus>(status);
|
||||||
|
Collections.sort(statusList, new Comparator<FileStatus>() {
|
||||||
|
public int compare(FileStatus s1, FileStatus s2) {
|
||||||
|
return s1.getModificationTime() < s2.getModificationTime() ? -1
|
||||||
|
: s1.getModificationTime() > s2.getModificationTime() ? 1 : 0;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
for (int i = 0 ; i <= statusList.size() - this.retentionSize; i++) {
|
||||||
|
final FileStatus remove = statusList.get(i);
|
||||||
|
try {
|
||||||
|
userUgi.doAs(new PrivilegedExceptionAction<Object>() {
|
||||||
|
@Override
|
||||||
|
public Object run() throws Exception {
|
||||||
|
remoteFS.delete(remove.getPath(), false);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.error("Failed to delete " + remove.getPath(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.error("Failed to clean old logs", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
try {
|
try {
|
||||||
|
@ -235,9 +369,8 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
|
||||||
while (!this.appFinishing.get() && !this.aborted.get()) {
|
while (!this.appFinishing.get() && !this.aborted.get()) {
|
||||||
synchronized(this) {
|
synchronized(this) {
|
||||||
try {
|
try {
|
||||||
if (this.logAggregationContext != null && this.logAggregationContext
|
if (this.rollingMonitorInterval > 0) {
|
||||||
.getRollingIntervalSeconds() > 0) {
|
wait(this.rollingMonitorInterval * 1000);
|
||||||
wait(this.logAggregationContext.getRollingIntervalSeconds() * 1000);
|
|
||||||
if (this.appFinishing.get() || this.aborted.get()) {
|
if (this.appFinishing.get() || this.aborted.get()) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
|
@ -342,7 +342,7 @@ public class LogAggregationService extends AbstractService implements
|
||||||
// New application
|
// New application
|
||||||
final AppLogAggregator appLogAggregator =
|
final AppLogAggregator appLogAggregator =
|
||||||
new AppLogAggregatorImpl(this.dispatcher, this.deletionService,
|
new AppLogAggregatorImpl(this.dispatcher, this.deletionService,
|
||||||
getConfig(), appId, userUgi, dirsHandler,
|
getConfig(), appId, userUgi, this.nodeId, dirsHandler,
|
||||||
getRemoteNodeLogFileForApp(appId, user), logRetentionPolicy,
|
getRemoteNodeLogFileForApp(appId, user), logRetentionPolicy,
|
||||||
appAcls, logAggregationContext, this.context);
|
appAcls, logAggregationContext, this.context);
|
||||||
if (this.appLogAggregators.putIfAbsent(appId, appLogAggregator) != null) {
|
if (this.appLogAggregators.putIfAbsent(appId, appLogAggregator) != null) {
|
||||||
|
|
|
@ -699,7 +699,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void verifyContainerLogs(LogAggregationService logAggregationService,
|
private String verifyContainerLogs(LogAggregationService logAggregationService,
|
||||||
ApplicationId appId, ContainerId[] expectedContainerIds,
|
ApplicationId appId, ContainerId[] expectedContainerIds,
|
||||||
String[] logFiles, int numOfContainerLogs, boolean multiLogs)
|
String[] logFiles, int numOfContainerLogs, boolean multiLogs)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
@ -811,6 +811,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
|
||||||
Assert.assertEquals(0, thisContainerMap.size());
|
Assert.assertEquals(0, thisContainerMap.size());
|
||||||
}
|
}
|
||||||
Assert.assertEquals(0, logMap.size());
|
Assert.assertEquals(0, logMap.size());
|
||||||
|
return targetNodeFile.getPath().getName();
|
||||||
} finally {
|
} finally {
|
||||||
reader.close();
|
reader.close();
|
||||||
}
|
}
|
||||||
|
@ -1219,17 +1220,32 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
|
||||||
dispatcher.stop();
|
dispatcher.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
@Test (timeout = 50000)
|
@Test (timeout = 50000)
|
||||||
public void testLogAggregationServiceWithInterval() throws Exception {
|
public void testLogAggregationServiceWithInterval() throws Exception {
|
||||||
final int maxAttempts = 50;
|
testLogAggregationService(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test (timeout = 50000)
|
||||||
|
public void testLogAggregationServiceWithRetention() throws Exception {
|
||||||
|
testLogAggregationService(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
private void testLogAggregationService(boolean retentionSizeLimitation)
|
||||||
|
throws Exception {
|
||||||
LogAggregationContext logAggregationContextWithInterval =
|
LogAggregationContext logAggregationContextWithInterval =
|
||||||
Records.newRecord(LogAggregationContext.class);
|
Records.newRecord(LogAggregationContext.class);
|
||||||
logAggregationContextWithInterval.setRollingIntervalSeconds(5000);
|
logAggregationContextWithInterval.setRollingIntervalSeconds(5000);
|
||||||
|
|
||||||
this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
|
this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
|
||||||
this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
|
this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
|
||||||
this.remoteRootLogDir.getAbsolutePath());
|
this.remoteRootLogDir.getAbsolutePath());
|
||||||
|
if (retentionSizeLimitation) {
|
||||||
|
// set the retention size as 1. The number of logs for one application
|
||||||
|
// in one NM should be 1.
|
||||||
|
this.conf.setInt(YarnConfiguration.NM_PREFIX
|
||||||
|
+ "log-aggregation.num-log-files-per-app", 1);
|
||||||
|
}
|
||||||
|
|
||||||
// by setting this configuration, the log files will not be deleted immediately after
|
// by setting this configuration, the log files will not be deleted immediately after
|
||||||
// they are aggregated to remote directory.
|
// they are aggregated to remote directory.
|
||||||
// We could use it to test whether the previous aggregated log files will be aggregated
|
// We could use it to test whether the previous aggregated log files will be aggregated
|
||||||
|
@ -1280,23 +1296,29 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
|
||||||
.get(application);
|
.get(application);
|
||||||
aggregator.doLogAggregationOutOfBand();
|
aggregator.doLogAggregationOutOfBand();
|
||||||
|
|
||||||
int count = 0;
|
if (retentionSizeLimitation) {
|
||||||
while (numOfLogsAvailable(logAggregationService, application) != 1
|
Assert.assertTrue(waitAndCheckLogNum(logAggregationService, application,
|
||||||
&& count <= maxAttempts) {
|
50, 1, true, null));
|
||||||
Thread.sleep(100);
|
} else {
|
||||||
count++;
|
Assert.assertTrue(waitAndCheckLogNum(logAggregationService, application,
|
||||||
|
50, 1, false, null));
|
||||||
}
|
}
|
||||||
|
String logFileInLastCycle = null;
|
||||||
// Container logs should be uploaded
|
// Container logs should be uploaded
|
||||||
verifyContainerLogs(logAggregationService, application,
|
logFileInLastCycle = verifyContainerLogs(logAggregationService, application,
|
||||||
new ContainerId[] { container }, logFiles1, 3, true);
|
new ContainerId[] { container }, logFiles1, 3, true);
|
||||||
|
|
||||||
|
Thread.sleep(2000);
|
||||||
|
|
||||||
// There is no log generated at this time. Do the log aggregation again.
|
// There is no log generated at this time. Do the log aggregation again.
|
||||||
aggregator.doLogAggregationOutOfBand();
|
aggregator.doLogAggregationOutOfBand();
|
||||||
|
|
||||||
// Same logs will not be aggregated again.
|
// Same logs will not be aggregated again.
|
||||||
// Only one aggregated log file in Remote file directory.
|
// Only one aggregated log file in Remote file directory.
|
||||||
Assert.assertEquals(numOfLogsAvailable(logAggregationService, application),
|
Assert.assertEquals(numOfLogsAvailable(logAggregationService,
|
||||||
1);
|
application, true, null), 1);
|
||||||
|
|
||||||
|
Thread.sleep(2000);
|
||||||
|
|
||||||
// Do log aggregation
|
// Do log aggregation
|
||||||
String[] logFiles2 = new String[] { "stdout_1", "stderr_1", "syslog_1" };
|
String[] logFiles2 = new String[] { "stdout_1", "stderr_1", "syslog_1" };
|
||||||
|
@ -1304,16 +1326,19 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
|
||||||
|
|
||||||
aggregator.doLogAggregationOutOfBand();
|
aggregator.doLogAggregationOutOfBand();
|
||||||
|
|
||||||
count = 0;
|
if (retentionSizeLimitation) {
|
||||||
while (numOfLogsAvailable(logAggregationService, application) != 2
|
Assert.assertTrue(waitAndCheckLogNum(logAggregationService, application,
|
||||||
&& count <= maxAttempts) {
|
50, 1, true, logFileInLastCycle));
|
||||||
Thread.sleep(100);
|
} else {
|
||||||
count ++;
|
Assert.assertTrue(waitAndCheckLogNum(logAggregationService, application,
|
||||||
|
50, 2, false, null));
|
||||||
}
|
}
|
||||||
// Container logs should be uploaded
|
// Container logs should be uploaded
|
||||||
verifyContainerLogs(logAggregationService, application,
|
logFileInLastCycle = verifyContainerLogs(logAggregationService, application,
|
||||||
new ContainerId[] { container }, logFiles2, 3, true);
|
new ContainerId[] { container }, logFiles2, 3, true);
|
||||||
|
|
||||||
|
Thread.sleep(2000);
|
||||||
|
|
||||||
// create another logs
|
// create another logs
|
||||||
String[] logFiles3 = new String[] { "stdout_2", "stderr_2", "syslog_2" };
|
String[] logFiles3 = new String[] { "stdout_2", "stderr_2", "syslog_2" };
|
||||||
writeContainerLogs(appLogDir, container, logFiles3);
|
writeContainerLogs(appLogDir, container, logFiles3);
|
||||||
|
@ -1323,13 +1348,13 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
|
||||||
|
|
||||||
dispatcher.await();
|
dispatcher.await();
|
||||||
logAggregationService.handle(new LogHandlerAppFinishedEvent(application));
|
logAggregationService.handle(new LogHandlerAppFinishedEvent(application));
|
||||||
count = 0;
|
if (retentionSizeLimitation) {
|
||||||
while (numOfLogsAvailable(logAggregationService, application) != 3
|
Assert.assertTrue(waitAndCheckLogNum(logAggregationService, application,
|
||||||
&& count <= maxAttempts) {
|
50, 1, true, logFileInLastCycle));
|
||||||
Thread.sleep(100);
|
} else {
|
||||||
count ++;
|
Assert.assertTrue(waitAndCheckLogNum(logAggregationService, application,
|
||||||
|
50, 3, false, null));
|
||||||
}
|
}
|
||||||
|
|
||||||
verifyContainerLogs(logAggregationService, application,
|
verifyContainerLogs(logAggregationService, application,
|
||||||
new ContainerId[] { container }, logFiles3, 3, true);
|
new ContainerId[] { container }, logFiles3, 3, true);
|
||||||
logAggregationService.stop();
|
logAggregationService.stop();
|
||||||
|
@ -1338,7 +1363,8 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
private int numOfLogsAvailable(LogAggregationService logAggregationService,
|
private int numOfLogsAvailable(LogAggregationService logAggregationService,
|
||||||
ApplicationId appId) throws IOException {
|
ApplicationId appId, boolean sizeLimited, String lastLogFile)
|
||||||
|
throws IOException {
|
||||||
Path appLogDir = logAggregationService.getRemoteAppLogDir(appId, this.user);
|
Path appLogDir = logAggregationService.getRemoteAppLogDir(appId, this.user);
|
||||||
RemoteIterator<FileStatus> nodeFiles = null;
|
RemoteIterator<FileStatus> nodeFiles = null;
|
||||||
try {
|
try {
|
||||||
|
@ -1354,7 +1380,9 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
|
||||||
while (nodeFiles.hasNext()) {
|
while (nodeFiles.hasNext()) {
|
||||||
FileStatus status = nodeFiles.next();
|
FileStatus status = nodeFiles.next();
|
||||||
String filename = status.getPath().getName();
|
String filename = status.getPath().getName();
|
||||||
if (filename.contains(LogAggregationUtils.TMP_FILE_SUFFIX)) {
|
if (filename.contains(LogAggregationUtils.TMP_FILE_SUFFIX)
|
||||||
|
|| (lastLogFile != null && filename.contains(lastLogFile)
|
||||||
|
&& sizeLimited)) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
if (filename.contains(LogAggregationUtils
|
if (filename.contains(LogAggregationUtils
|
||||||
|
@ -1364,4 +1392,18 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
|
||||||
}
|
}
|
||||||
return count;
|
return count;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private boolean waitAndCheckLogNum(
|
||||||
|
LogAggregationService logAggregationService, ApplicationId application,
|
||||||
|
int maxAttempts, int expectNum, boolean sizeLimited, String lastLogFile)
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
int count = 0;
|
||||||
|
while (numOfLogsAvailable(logAggregationService, application, sizeLimited,
|
||||||
|
lastLogFile) != expectNum && count <= maxAttempts) {
|
||||||
|
Thread.sleep(500);
|
||||||
|
count++;
|
||||||
|
}
|
||||||
|
return numOfLogsAvailable(logAggregationService, application, sizeLimited,
|
||||||
|
lastLogFile) == expectNum;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue