YARN-8273. Log aggregation does not warn if HDFS quota in target directory is exceeded (grepas via rkanter)

This commit is contained in:
Robert Kanter 2018-05-22 14:24:38 -07:00
parent 83f53e5c62
commit b22f56c471
9 changed files with 183 additions and 32 deletions

View File

@ -40,6 +40,10 @@
<artifactId>hadoop-common</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs-client</artifactId>
</dependency>
<!-- 'mvn dependency:analyze' fails to detect use of this dependency -->

View File

@ -58,6 +58,7 @@ import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.SecureIOUtils;
import org.apache.hadoop.io.Writable;
@ -547,7 +548,7 @@ public class AggregatedLogFormat {
}
@Override
public void close() {
public void close() throws DSQuotaExceededException {
try {
if (writer != null) {
writer.close();
@ -555,7 +556,16 @@ public class AggregatedLogFormat {
} catch (Exception e) {
LOG.warn("Exception closing writer", e);
} finally {
IOUtils.cleanupWithLogger(LOG, this.fsDataOStream);
try {
this.fsDataOStream.close();
} catch (DSQuotaExceededException e) {
LOG.error("Exception in closing {}",
this.fsDataOStream.getClass(), e);
throw e;
} catch (Throwable e) {
LOG.error("Exception in closing {}",
this.fsDataOStream.getClass(), e);
}
}
}
}

View File

@ -0,0 +1,45 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.logaggregation.filecontroller;
import org.apache.hadoop.yarn.exceptions.YarnException;
/**
* This exception class indicates an issue during log aggregation.
*/
public class LogAggregationDFSException extends YarnException {
private static final long serialVersionUID = -6691549081090183145L;
public LogAggregationDFSException() {
}
public LogAggregationDFSException(String message) {
super(message);
}
public LogAggregationDFSException(Throwable cause) {
super(cause);
}
public LogAggregationDFSException(String message, Throwable cause) {
super(message, cause);
}
}

View File

@ -162,8 +162,10 @@ public abstract class LogAggregationFileController {
/**
* Close the writer.
* @throws LogAggregationDFSException if the closing of the writer fails
* (for example due to HDFS quota being exceeded)
*/
public abstract void closeWriter();
public abstract void closeWriter() throws LogAggregationDFSException;
/**
* Write the log content.

View File

@ -38,6 +38,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.HarFs;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@ -46,6 +47,7 @@ import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogReader;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogWriter;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationDFSException;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerContext;
import org.apache.hadoop.yarn.logaggregation.ContainerLogAggregationType;
@ -95,10 +97,15 @@ public class LogAggregationTFileController
}
@Override
public void closeWriter() {
public void closeWriter() throws LogAggregationDFSException {
if (this.writer != null) {
this.writer.close();
this.writer = null;
try {
this.writer.close();
} catch (DSQuotaExceededException e) {
throw new LogAggregationDFSException(e);
} finally {
this.writer = null;
}
}
}

View File

@ -65,7 +65,7 @@ public final class TestContainerLogsUtils {
public static void createContainerLogFileInRemoteFS(Configuration conf,
FileSystem fs, String rootLogDir, ContainerId containerId, NodeId nodeId,
String fileName, String user, String content,
boolean deleteRemoteLogDir) throws IOException {
boolean deleteRemoteLogDir) throws Exception {
UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user);
//prepare the logs for remote directory
ApplicationId appId = containerId.getApplicationAttemptId()
@ -113,7 +113,7 @@ public final class TestContainerLogsUtils {
private static void uploadContainerLogIntoRemoteDir(UserGroupInformation ugi,
Configuration configuration, List<String> rootLogDirs, NodeId nodeId,
ContainerId containerId, Path appDir, FileSystem fs) throws IOException {
ContainerId containerId, Path appDir, FileSystem fs) throws Exception {
Path path =
new Path(appDir, LogAggregationUtils.getNodeString(nodeId));
LogAggregationFileControllerFactory factory

View File

@ -51,6 +51,7 @@ import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationDFSException;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerContext;
import org.apache.hadoop.yarn.logaggregation.filecontroller.tfile.LogAggregationTFileController;
@ -263,7 +264,8 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
return params;
}
private void uploadLogsForContainers(boolean appFinished) {
private void uploadLogsForContainers(boolean appFinished)
throws LogAggregationDFSException {
if (this.logAggregationDisabled) {
return;
}
@ -301,6 +303,7 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
logAggregationTimes++;
String diagnosticMessage = "";
boolean logAggregationSucceedInThisCycle = true;
DeletionTask deletionTask = null;
try {
try {
logAggregationFileController.initializeWriter(logControllerContext);
@ -327,10 +330,9 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
uploadedLogsInThisCycle = true;
List<Path> uploadedFilePathsInThisCycleList = new ArrayList<>();
uploadedFilePathsInThisCycleList.addAll(uploadedFilePathsInThisCycle);
DeletionTask deletionTask = new FileDeletionTask(delService,
deletionTask = new FileDeletionTask(delService,
this.userUgi.getShortUserName(), null,
uploadedFilePathsInThisCycleList);
delService.delete(deletionTask);
}
// This container is finished, and all its logs have been uploaded,
@ -356,9 +358,23 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
logAggregationSucceedInThisCycle = false;
}
} finally {
LogAggregationDFSException exc = null;
try {
this.logAggregationFileController.closeWriter();
} catch (LogAggregationDFSException e) {
diagnosticMessage = e.getMessage();
renameTemporaryLogFileFailed = true;
logAggregationSucceedInThisCycle = false;
exc = e;
}
if (logAggregationSucceedInThisCycle && deletionTask != null) {
delService.delete(deletionTask);
}
sendLogAggregationReport(logAggregationSucceedInThisCycle,
diagnosticMessage, appFinished);
logAggregationFileController.closeWriter();
if (exc != null) {
throw exc;
}
}
}
@ -413,13 +429,18 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
diagnosticMessage, finalized);
}
@SuppressWarnings("unchecked")
@Override
public void run() {
try {
doAppLogAggregation();
} catch (LogAggregationDFSException e) {
// if the log aggregation could not be performed due to DFS issues
// let's not clean up the log files, since that can result in
// loss of logs
LOG.error("Error occurred while aggregating the log for the application "
+ appId, e);
} catch (Exception e) {
// do post clean up of log directories on any exception
// do post clean up of log directories on any other exception
LOG.error("Error occurred while aggregating the log for the application "
+ appId, e);
doAppLogAggregationPostCleanUp();
@ -434,8 +455,7 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
}
}
@SuppressWarnings("unchecked")
private void doAppLogAggregation() {
private void doAppLogAggregation() throws LogAggregationDFSException {
while (!this.appFinishing.get() && !this.aborted.get()) {
synchronized(this) {
try {
@ -452,6 +472,9 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
} catch (InterruptedException e) {
LOG.warn("PendingContainers queue is interrupted");
this.appFinishing.set(true);
} catch (LogAggregationDFSException e) {
this.appFinishing.set(true);
throw e;
}
}
}
@ -460,10 +483,14 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
return;
}
// App is finished, upload the container logs.
uploadLogsForContainers(true);
try {
// App is finished, upload the container logs.
uploadLogsForContainers(true);
doAppLogAggregationPostCleanUp();
doAppLogAggregationPostCleanUp();
} catch (LogAggregationDFSException e) {
LOG.error("Error during log aggregation", e);
}
this.dispatcher.getEventHandler().handle(
new ApplicationEvent(this.appId,

View File

@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationDFSException;
import org.apache.hadoop.yarn.logaggregation.filecontroller.tfile.LogAggregationTFileController;
import org.apache.hadoop.yarn.server.api.ContainerLogContext;
import org.apache.hadoop.yarn.server.api.ContainerType;
@ -42,7 +43,9 @@ import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.FileDeletionTask;
import org.apache.hadoop.yarn.server.nodemanager.logaggregation.tracker.NMLogAggregationStatusTracker;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
@ -52,12 +55,14 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@ -228,10 +233,15 @@ public class TestAppLogAggregatorImpl {
config.setLong(
YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, logRetentionSecs);
LogAggregationTFileController format = spy(
new LogAggregationTFileController());
format.initialize(config, "TFile");
Context context = createContext(config);
final AppLogAggregatorInTest appLogAggregator =
createAppLogAggregator(appId, LOCAL_LOG_DIR.getAbsolutePath(),
config, recoveredLogInitedTimeMillis,
deletionServiceWithExpectedFiles);
config, context, recoveredLogInitedTimeMillis,
deletionServiceWithExpectedFiles, format);
appLogAggregator.startContainerLogAggregation(
new ContainerLogContext(containerId, ContainerType.TASK, 0));
// set app finished flag first
@ -269,8 +279,10 @@ public class TestAppLogAggregatorImpl {
private static AppLogAggregatorInTest createAppLogAggregator(
ApplicationId applicationId, String rootLogDir,
YarnConfiguration config, long recoveredLogInitedTimeMillis,
DeletionService deletionServiceWithFilesToExpect)
YarnConfiguration config, Context context,
long recoveredLogInitedTimeMillis,
DeletionService deletionServiceWithFilesToExpect,
LogAggregationTFileController tFileController)
throws IOException {
final Dispatcher dispatcher = createNullDispatcher();
@ -284,16 +296,12 @@ public class TestAppLogAggregatorImpl {
final LogAggregationContext logAggregationContext = null;
final Map<ApplicationAccessType, String> appAcls = new HashMap<>();
final Context context = createContext(config);
final FileContext fakeLfs = mock(FileContext.class);
final Path remoteLogDirForApp = new Path(REMOTE_LOG_FILE.getAbsolutePath());
LogAggregationTFileController format = spy(
new LogAggregationTFileController());
format.initialize(config, "TFile");
return new AppLogAggregatorInTest(dispatcher, deletionService,
config, applicationId, ugi, nodeId, dirsService,
remoteLogDirForApp, appAcls, logAggregationContext,
context, fakeLfs, recoveredLogInitedTimeMillis, format);
context, fakeLfs, recoveredLogInitedTimeMillis, tFileController);
}
/**
@ -423,4 +431,53 @@ public class TestAppLogAggregatorImpl {
this.logValue = ArgumentCaptor.forClass(LogValue.class);
}
}
@Test
public void testDFSQuotaExceeded() throws Exception {
// the expectation is that no log files are deleted if the quota has
// been exceeded, since that would result in loss of logs
DeletionService deletionServiceWithExpectedFiles =
createDeletionServiceWithExpectedFile2Delete(Collections.emptySet());
final YarnConfiguration config = new YarnConfiguration();
ApplicationId appId = ApplicationId.newInstance(1357543L, 1);
// we need a LogAggregationTFileController that throws a
// LogAggregationDFSException
LogAggregationTFileController format =
Mockito.mock(LogAggregationTFileController.class);
Mockito.doThrow(new LogAggregationDFSException())
.when(format).closeWriter();
NodeManager.NMContext context = (NMContext) createContext(config);
context.setNMLogAggregationStatusTracker(
Mockito.mock(NMLogAggregationStatusTracker.class));
final AppLogAggregatorInTest appLogAggregator =
createAppLogAggregator(appId, LOCAL_LOG_DIR.getAbsolutePath(),
config, context, 1000L, deletionServiceWithExpectedFiles, format);
appLogAggregator.startContainerLogAggregation(
new ContainerLogContext(
ContainerId.newContainerId(
ApplicationAttemptId.newInstance(appId, 0), 0),
ContainerType.TASK, 0));
// set app finished flag first
appLogAggregator.finishLogAggregation();
appLogAggregator.run();
// verify that no files have been uploaded
ArgumentCaptor<LogValue> logValCaptor =
ArgumentCaptor.forClass(LogValue.class);
verify(appLogAggregator.getLogAggregationFileController()).write(
any(LogKey.class), logValCaptor.capture());
Set<String> filesUploaded = new HashSet<>();
LogValue logValue = logValCaptor.getValue();
for (File file: logValue.getPendingLogFilesToUploadForThisContainer()) {
filesUploaded.add(file.getAbsolutePath());
}
verifyFilesUploaded(filesUploaded, Collections.emptySet());
}
}

View File

@ -87,7 +87,6 @@ import javax.ws.rs.core.MediaType;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringReader;
import java.net.HttpURLConnection;
@ -356,7 +355,7 @@ public class TestNMWebServices extends JerseyTestBase {
}
@Test (timeout = 5000)
public void testContainerLogsWithNewAPI() throws IOException, JSONException{
public void testContainerLogsWithNewAPI() throws Exception {
final ContainerId containerId = BuilderUtils.newContainerId(0, 0, 0, 0);
WebResource r = resource();
r = r.path("ws").path("v1").path("node").path("containers")
@ -365,7 +364,7 @@ public class TestNMWebServices extends JerseyTestBase {
}
@Test (timeout = 5000)
public void testContainerLogsWithOldAPI() throws IOException, JSONException{
public void testContainerLogsWithOldAPI() throws Exception {
final ContainerId containerId = BuilderUtils.newContainerId(1, 1, 0, 1);
WebResource r = resource();
r = r.path("ws").path("v1").path("node").path("containerlogs")
@ -538,7 +537,7 @@ public class TestNMWebServices extends JerseyTestBase {
}
private void testContainerLogs(WebResource r, ContainerId containerId)
throws IOException {
throws Exception {
final String containerIdStr = containerId.toString();
final ApplicationAttemptId appAttemptId = containerId
.getApplicationAttemptId();