merge -r 1374423:1374424 from trunk. FIXES: YARN-25

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1374425 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Thomas Graves 2012-08-17 20:31:09 +00:00
parent 8ab1c7a47b
commit e94632c765
10 changed files with 359 additions and 2 deletions

View File

@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogDeletionService;
import org.apache.hadoop.yarn.service.CompositeService;
/******************************************************************
@ -53,6 +54,7 @@ public class JobHistoryServer extends CompositeService {
private HistoryClientService clientService;
private JobHistory jobHistoryService;
private JHSDelegationTokenSecretManager jhsDTSecretManager;
private AggregatedLogDeletionService aggLogDelService;
public JobHistoryServer() {
super(JobHistoryServer.class.getName());
@ -74,8 +76,10 @@ public class JobHistoryServer extends CompositeService {
this.jhsDTSecretManager = createJHSSecretManager(conf);
clientService = new HistoryClientService(historyContext,
this.jhsDTSecretManager);
aggLogDelService = new AggregatedLogDeletionService();
addService(jobHistoryService);
addService(clientService);
addService(aggLogDelService);
super.init(config);
}

View File

@ -49,3 +49,5 @@ Release 0.23.3 - Unreleased
YARN-14. Symlinks to peer distributed cache files no longer work
(Jason Lowe via bobby)
YARN-25. remove old aggregated logs (Robert Evans via tgraves)

View File

@ -353,6 +353,14 @@ public class YarnConfiguration extends Configuration {
+ "log-aggregation-enable";
public static final boolean DEFAULT_LOG_AGGREGATION_ENABLED = false;
/**
* How long to wait before deleting aggregated logs, -1 disables.
* Be careful set this too small and you will spam the name node.
*/
public static final String LOG_AGGREGATION_RETAIN_SECONDS = YARN_PREFIX
+ "log-aggregation.retain-seconds";
public static final long DEFAULT_LOG_AGGREGATION_RETAIN_SECONDS = -1;
/**
* Number of seconds to retain logs on the NodeManager. Only applicable if Log
* aggregation is disabled

View File

@ -0,0 +1,156 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.logaggregation;
import java.io.IOException;
import java.util.Timer;
import java.util.TimerTask;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.service.AbstractService;
public class AggregatedLogDeletionService extends AbstractService {
private static final Log LOG = LogFactory.getLog(AggregatedLogDeletionService.class);
private Timer timer = null;
static class LogDeletionTask extends TimerTask {
private Configuration conf;
private long retentionMillis;
private String suffix = null;
private Path remoteRootLogDir = null;
public LogDeletionTask(Configuration conf, long retentionSecs) {
this.conf = conf;
this.retentionMillis = retentionSecs * 1000;
this.suffix = LogAggregationUtils.getRemoteNodeLogDirSuffix(conf);
this.remoteRootLogDir =
new Path(conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
}
@Override
public void run() {
long cutoffMillis = System.currentTimeMillis() - retentionMillis;
LOG.info("aggregated log deletion started.");
try {
FileSystem fs = remoteRootLogDir.getFileSystem(conf);
for(FileStatus userDir : fs.listStatus(remoteRootLogDir)) {
if(userDir.isDirectory()) {
Path userDirPath = new Path(userDir.getPath(), suffix);
deleteOldLogDirsFrom(userDirPath, cutoffMillis, fs);
}
}
} catch (IOException e) {
logIOException("Error reading root log dir this deletion " +
"attempt is being aborted", e);
}
LOG.info("aggregated log deletion finished.");
}
private static void deleteOldLogDirsFrom(Path dir, long cutoffMillis,
FileSystem fs) {
try {
for(FileStatus appDir : fs.listStatus(dir)) {
if(appDir.isDirectory() &&
appDir.getModificationTime() < cutoffMillis) {
if(shouldDeleteLogDir(appDir, cutoffMillis, fs)) {
try {
LOG.info("Deleting aggregated logs in "+appDir.getPath());
fs.delete(appDir.getPath(), true);
} catch (IOException e) {
logIOException("Could not delete "+appDir.getPath(), e);
}
}
}
}
} catch (IOException e) {
logIOException("Could not read the contents of " + dir, e);
}
}
private static boolean shouldDeleteLogDir(FileStatus dir, long cutoffMillis,
FileSystem fs) {
boolean shouldDelete = true;
try {
for(FileStatus node: fs.listStatus(dir.getPath())) {
if(node.getModificationTime() >= cutoffMillis) {
shouldDelete = false;
break;
}
}
} catch(IOException e) {
logIOException("Error reading the contents of " + dir.getPath(), e);
shouldDelete = false;
}
return shouldDelete;
}
}
private static void logIOException(String comment, IOException e) {
if(e instanceof AccessControlException) {
String message = e.getMessage();
//TODO fix this after HADOOP-8661
message = message.split("\n")[0];
LOG.warn(comment + " " + message);
} else {
LOG.error(comment, e);
}
}
public AggregatedLogDeletionService() {
super(AggregatedLogDeletionService.class.getName());
}
public void start() {
Configuration conf = getConfig();
if (!conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED)) {
//Log aggregation is not enabled so don't bother
return;
}
long retentionSecs = conf.getLong(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS,
YarnConfiguration.DEFAULT_LOG_AGGREGATION_RETAIN_SECONDS);
if(retentionSecs < 0) {
LOG.info("Log Aggregation deletion is disabled because retention is" +
" too small (" + retentionSecs + ")");
return;
}
TimerTask task = new LogDeletionTask(conf, retentionSecs);
timer = new Timer();
timer.scheduleAtFixedRate(task, 0, retentionSecs * 1000);
super.start();
}
@Override
public void stop() {
if(timer != null) {
timer.cancel();
}
super.stop();
}
}

View File

@ -1,3 +1,21 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.webapp.log;
import static org.apache.hadoop.yarn.webapp.YarnWebParams.APP_OWNER;

View File

@ -1,3 +1,21 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.webapp.log;
import static org.apache.hadoop.yarn.webapp.YarnWebParams.CONTAINER_ID;

View File

@ -367,6 +367,13 @@
<name>yarn.log-aggregation-enable</name>
<value>false</value>
</property>
<property>
<description>How long to keep aggregation logs before deleting them. -1 disables.
Be careful set this too small and you will spam the name node.</description>
<name>yarn.log-aggregation.retain-seconds</name>
<value>-1</value>
</property>
<property>
<description>Time in seconds to retain user logs. Only applicable if

View File

@ -0,0 +1,131 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.logaggregation;
import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FilterFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.junit.Test;
import static org.mockito.Mockito.*;
public class TestAggregatedLogDeletionService {
@Test
public void testDeletion() throws Exception {
long now = System.currentTimeMillis();
long toDeleteTime = now - (2000*1000);
long toKeepTime = now - (1500*1000);
String root = "mockfs://foo/";
String remoteRootLogDir = root+"tmp/logs";
String suffix = "logs";
Configuration conf = new Configuration();
conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
conf.set(YarnConfiguration.LOG_AGGREGATION_ENABLED, "true");
conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, "1800");
conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteRootLogDir);
conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, suffix);
Path rootPath = new Path(root);
FileSystem rootFs = rootPath.getFileSystem(conf);
FileSystem mockFs = ((FilterFileSystem)rootFs).getRawFileSystem();
Path remoteRootLogPath = new Path(remoteRootLogDir);
Path userDir = new Path(remoteRootLogPath, "me");
FileStatus userDirStatus = new FileStatus(0, true, 0, 0, toKeepTime, userDir);
when(mockFs.listStatus(remoteRootLogPath)).thenReturn(
new FileStatus[]{userDirStatus});
Path userLogDir = new Path(userDir, suffix);
Path app1Dir = new Path(userLogDir, "application_1_1");
FileStatus app1DirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, app1Dir);
Path app2Dir = new Path(userLogDir, "application_1_2");
FileStatus app2DirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, app2Dir);
Path app3Dir = new Path(userLogDir, "application_1_3");
FileStatus app3DirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, app3Dir);
Path app4Dir = new Path(userLogDir, "application_1_4");
FileStatus app4DirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, app4Dir);
when(mockFs.listStatus(userLogDir)).thenReturn(
new FileStatus[]{app1DirStatus, app2DirStatus, app3DirStatus, app4DirStatus});
when(mockFs.listStatus(app1Dir)).thenReturn(
new FileStatus[]{});
Path app2Log1 = new Path(app2Dir, "host1");
FileStatus app2Log1Status = new FileStatus(10, false, 1, 1, toDeleteTime, app2Log1);
Path app2Log2 = new Path(app2Dir, "host2");
FileStatus app2Log2Status = new FileStatus(10, false, 1, 1, toKeepTime, app2Log2);
when(mockFs.listStatus(app2Dir)).thenReturn(
new FileStatus[]{app2Log1Status, app2Log2Status});
Path app3Log1 = new Path(app3Dir, "host1");
FileStatus app3Log1Status = new FileStatus(10, false, 1, 1, toDeleteTime, app3Log1);
Path app3Log2 = new Path(app3Dir, "host2");
FileStatus app3Log2Status = new FileStatus(10, false, 1, 1, toDeleteTime, app3Log2);
when(mockFs.delete(app3Dir, true)).thenThrow(new AccessControlException("Injected Error\nStack Trace :("));
when(mockFs.listStatus(app3Dir)).thenReturn(
new FileStatus[]{app3Log1Status, app3Log2Status});
Path app4Log1 = new Path(app4Dir, "host1");
FileStatus app4Log1Status = new FileStatus(10, false, 1, 1, toDeleteTime, app4Log1);
Path app4Log2 = new Path(app4Dir, "host2");
FileStatus app4Log2Status = new FileStatus(10, false, 1, 1, toDeleteTime, app4Log2);
when(mockFs.listStatus(app4Dir)).thenReturn(
new FileStatus[]{app4Log1Status, app4Log2Status});
AggregatedLogDeletionService.LogDeletionTask task =
new AggregatedLogDeletionService.LogDeletionTask(conf, 1800);
task.run();
verify(mockFs).delete(app1Dir, true);
verify(mockFs, times(0)).delete(app2Dir, true);
verify(mockFs).delete(app3Dir, true);
verify(mockFs).delete(app4Dir, true);
}
static class MockFileSystem extends FilterFileSystem {
MockFileSystem() {
super(mock(FileSystem.class));
}
public void initialize(URI name, Configuration conf) throws IOException {}
}
}

View File

@ -70,7 +70,7 @@ public class LogAggregationService extends AbstractService implements
/*
* Expected deployment TLD will be 1777, owner=<NMOwner>, group=<NMGroup -
* Group to which NMOwner belongs> App dirs will be created as 750,
* Group to which NMOwner belongs> App dirs will be created as 770,
* owner=<AppOwner>, group=<NMGroup>: so that the owner and <NMOwner> can
* access / modify the files.
* <NMGroup> should obviously be a limited access group.
@ -85,7 +85,7 @@ public class LogAggregationService extends AbstractService implements
* Permissions for the Application directory.
*/
private static final FsPermission APP_DIR_PERMISSIONS = FsPermission
.createImmutable((short) 0750);
.createImmutable((short) 0770);
private final Context context;
private final DeletionService deletionService;

View File

@ -314,6 +314,19 @@ Hadoop MapReduce Next Generation - Cluster Setup
| | | Shuffle service that needs to be set for Map Reduce applications. |
*-------------------------+-------------------------+------------------------+
* Configurations for History Server (Needs to be moved elsewhere):
*-------------------------+-------------------------+------------------------+
|| Parameter || Value || Notes |
*-------------------------+-------------------------+------------------------+
| <<<yarn.log-aggregation.retain-seconds>>> | | |
| | <-1> | |
| | | How long to keep aggregation logs before deleting them. -1 disables. |
| | | Be careful, set this too small and you will spam the name node. |
*-------------------------+-------------------------+------------------------+
* <<<conf/mapred-site.xml>>>
* Configurations for MapReduce Applications: