From 6e41aec1095d9b254997430172bf3b86036424c0 Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Thu, 6 Apr 2017 16:26:24 -0500 Subject: [PATCH] YARN-6288. Exceptions during aggregated log writes are mishandled. Contributed by Akira Ajisaka --- .../hadoop/yarn/client/cli/TestLogsCLI.java | 54 ++++----- .../logaggregation/AggregatedLogFormat.java | 31 +++-- .../TestAggregatedLogFormat.java | 114 +++++++++--------- .../TestAggregatedLogsBlock.java | 20 +-- .../logaggregation/AppLogAggregatorImpl.java | 68 +++++------ 5 files changed, 147 insertions(+), 140 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java index aec7caec9dd..69d93ffd4e1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java @@ -390,18 +390,18 @@ public class TestLogsCLI { Path path = new Path(appDir, LogAggregationUtils.getNodeString(nodeId) + System.currentTimeMillis()); - AggregatedLogFormat.LogWriter writer = - new AggregatedLogFormat.LogWriter(configuration, path, ugi); - writer.writeApplicationOwner(ugi.getUserName()); + try (AggregatedLogFormat.LogWriter writer = + new AggregatedLogFormat.LogWriter()) { + writer.initialize(configuration, path, ugi); + writer.writeApplicationOwner(ugi.getUserName()); - Map appAcls = - new HashMap(); - appAcls.put(ApplicationAccessType.VIEW_APP, ugi.getUserName()); - writer.writeApplicationACLs(appAcls); - writer.append(new AggregatedLogFormat.LogKey(containerId), - new AggregatedLogFormat.LogValue(rootLogDirs, containerId, - UserGroupInformation.getCurrentUser().getShortUserName())); - writer.close(); + Map appAcls = new HashMap<>(); + appAcls.put(ApplicationAccessType.VIEW_APP, ugi.getUserName()); + writer.writeApplicationACLs(appAcls); + writer.append(new AggregatedLogFormat.LogKey(containerId), + new AggregatedLogFormat.LogValue(rootLogDirs, containerId, + UserGroupInformation.getCurrentUser().getShortUserName())); + } } private static void uploadEmptyContainerLogIntoRemoteDir(UserGroupInformation ugi, @@ -410,23 +410,23 @@ public class TestLogsCLI { Path path = new Path(appDir, LogAggregationUtils.getNodeString(nodeId) + System.currentTimeMillis()); - AggregatedLogFormat.LogWriter writer = - new AggregatedLogFormat.LogWriter(configuration, path, ugi); - writer.writeApplicationOwner(ugi.getUserName()); + try (AggregatedLogFormat.LogWriter writer = + new AggregatedLogFormat.LogWriter()) { + writer.initialize(configuration, path, ugi); + writer.writeApplicationOwner(ugi.getUserName()); - Map appAcls = - new HashMap(); - appAcls.put(ApplicationAccessType.VIEW_APP, ugi.getUserName()); - writer.writeApplicationACLs(appAcls); - DataOutputStream out = writer.getWriter().prepareAppendKey(-1); - new AggregatedLogFormat.LogKey(containerId).write(out); - out.close(); - out = writer.getWriter().prepareAppendValue(-1); - new AggregatedLogFormat.LogValue(rootLogDirs, containerId, - UserGroupInformation.getCurrentUser().getShortUserName()).write(out, - new HashSet()); - out.close(); - writer.close(); + Map appAcls = new HashMap<>(); + appAcls.put(ApplicationAccessType.VIEW_APP, ugi.getUserName()); + writer.writeApplicationACLs(appAcls); + DataOutputStream out = writer.getWriter().prepareAppendKey(-1); + new AggregatedLogFormat.LogKey(containerId).write(out); + out.close(); + out = writer.getWriter().prepareAppendValue(-1); + new AggregatedLogFormat.LogValue(rootLogDirs, containerId, + UserGroupInformation.getCurrentUser().getShortUserName()).write(out, + new HashSet()); + out.close(); + } } private YarnClient createMockYarnClient(YarnApplicationState appState) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java index 9fa58e1473b..5e4648a89a5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java @@ -70,7 +70,6 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.LogAggregationContext; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; -import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.Times; import com.google.common.annotations.VisibleForTesting; @@ -378,14 +377,23 @@ public class AggregatedLogFormat { * The writer that writes out the aggregated logs. */ @Private - public static class LogWriter { + public static class LogWriter implements AutoCloseable { - private final FSDataOutputStream fsDataOStream; - private final TFile.Writer writer; + private FSDataOutputStream fsDataOStream; + private TFile.Writer writer; private FileContext fc; - public LogWriter(final Configuration conf, final Path remoteAppLogFile, - UserGroupInformation userUgi) throws IOException { + /** + * Initialize the LogWriter. + * Must be called just after the instance is created. + * @param conf Configuration + * @param remoteAppLogFile remote log file path + * @param userUgi Ugi of the user + * @throws IOException Failed to initialize + */ + public void initialize(final Configuration conf, + final Path remoteAppLogFile, + UserGroupInformation userUgi) throws IOException { try { this.fsDataOStream = userUgi.doAs(new PrivilegedExceptionAction() { @@ -463,11 +471,14 @@ public class AggregatedLogFormat { } } + @Override public void close() { - try { - this.writer.close(); - } catch (IOException e) { - LOG.warn("Exception closing writer", e); + if (writer != null) { + try { + this.writer.close(); + } catch (IOException e) { + LOG.warn("Exception closing writer", e); + } } IOUtils.closeStream(fsDataOStream); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogFormat.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogFormat.java index 21e1655ccea..6447a0a766f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogFormat.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogFormat.java @@ -140,43 +140,44 @@ public class TestAggregatedLogFormat { final int ch = filler; UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); - LogWriter logWriter = new LogWriter(conf, remoteAppLogFile, ugi); + try (LogWriter logWriter = new LogWriter()) { + logWriter.initialize(conf, remoteAppLogFile, ugi); - LogKey logKey = new LogKey(testContainerId); - LogValue logValue = - spy(new LogValue(Collections.singletonList(srcFileRoot.toString()), - testContainerId, ugi.getShortUserName())); + LogKey logKey = new LogKey(testContainerId); + LogValue logValue = + spy(new LogValue(Collections.singletonList(srcFileRoot.toString()), + testContainerId, ugi.getShortUserName())); - final CountDownLatch latch = new CountDownLatch(1); + final CountDownLatch latch = new CountDownLatch(1); - Thread t = new Thread() { - public void run() { - try { - for(int i=0; i < length/3; i++) { + Thread t = new Thread() { + public void run() { + try { + for (int i = 0; i < length / 3; i++) { osw.write(ch); - } + } - latch.countDown(); + latch.countDown(); - for(int i=0; i < (2*length)/3; i++) { - osw.write(ch); + for (int i = 0; i < (2 * length) / 3; i++) { + osw.write(ch); + } + osw.close(); + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); } - osw.close(); - } catch (IOException e) { - // TODO Auto-generated catch block - e.printStackTrace(); } - } - }; - t.start(); + }; + t.start(); - //Wait till the osw is partially written - //aggregation starts once the ows has completed 1/3rd of its work - latch.await(); + //Wait till the osw is partially written + //aggregation starts once the ows has completed 1/3rd of its work + latch.await(); - //Aggregate The Logs - logWriter.append(logKey, logValue); - logWriter.close(); + //Aggregate The Logs + logWriter.append(logKey, logValue); + } } @Test @@ -215,22 +216,22 @@ public class TestAggregatedLogFormat { writeSrcFile(srcFilePath, "stdout", numChars); UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); - LogWriter logWriter = new LogWriter(conf, remoteAppLogFile, ugi); + try (LogWriter logWriter = new LogWriter()) { + logWriter.initialize(conf, remoteAppLogFile, ugi); + LogKey logKey = new LogKey(testContainerId); + LogValue logValue = + new LogValue(Collections.singletonList(srcFileRoot.toString()), + testContainerId, ugi.getShortUserName()); - LogKey logKey = new LogKey(testContainerId); - LogValue logValue = - new LogValue(Collections.singletonList(srcFileRoot.toString()), - testContainerId, ugi.getShortUserName()); + // When we try to open FileInputStream for stderr, it will throw out an + // IOException. Skip the log aggregation for stderr. + LogValue spyLogValue = spy(logValue); + File errorFile = new File((new Path(srcFilePath, "stderr")).toString()); + doThrow(new IOException("Mock can not open FileInputStream")).when( + spyLogValue).secureOpenFile(errorFile); - // When we try to open FileInputStream for stderr, it will throw out an IOException. - // Skip the log aggregation for stderr. - LogValue spyLogValue = spy(logValue); - File errorFile = new File((new Path(srcFilePath, "stderr")).toString()); - doThrow(new IOException("Mock can not open FileInputStream")).when( - spyLogValue).secureOpenFile(errorFile); - - logWriter.append(logKey, spyLogValue); - logWriter.close(); + logWriter.append(logKey, spyLogValue); + } // make sure permission are correct on the file FileStatus fsStatus = fs.getFileStatus(remoteAppLogFile); @@ -310,24 +311,23 @@ public class TestAggregatedLogFormat { UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); - LogWriter logWriter = new LogWriter(conf, remoteAppLogFile, ugi); + try (LogWriter logWriter = new LogWriter()) { + logWriter.initialize(conf, remoteAppLogFile, ugi); + LogKey logKey = new LogKey(testContainerId1); + String randomUser = "randomUser"; + LogValue logValue = + spy(new LogValue(Collections.singletonList(srcFileRoot.toString()), + testContainerId1, randomUser)); - LogKey logKey = new LogKey(testContainerId1); - String randomUser = "randomUser"; - LogValue logValue = - spy(new LogValue(Collections.singletonList(srcFileRoot.toString()), - testContainerId1, randomUser)); - - // It is trying simulate a situation where first log file is owned by - // different user (probably symlink) and second one by the user itself. - // The first file should not be aggregated. Because this log file has the invalid - // user name. - when(logValue.getUser()).thenReturn(randomUser).thenReturn( - ugi.getShortUserName()); - logWriter.append(logKey, logValue); + // It is trying simulate a situation where first log file is owned by + // different user (probably symlink) and second one by the user itself. + // The first file should not be aggregated. Because this log file has the + // invalid user name. + when(logValue.getUser()).thenReturn(randomUser).thenReturn( + ugi.getShortUserName()); + logWriter.append(logKey, logValue); + } - logWriter.close(); - BufferedReader in = new BufferedReader(new FileReader(new File(remoteAppLogFile .toUri().getRawPath()))); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogsBlock.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogsBlock.java index 798406de991..5a1292c5af9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogsBlock.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogsBlock.java @@ -275,17 +275,19 @@ public class TestAggregatedLogsBlock { List rootLogDirs = Arrays.asList("target/logs/logs"); UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); - AggregatedLogFormat.LogWriter writer = new AggregatedLogFormat.LogWriter( - configuration, new Path(path), ugi); - writer.writeApplicationOwner(ugi.getUserName()); + try (AggregatedLogFormat.LogWriter writer = + new AggregatedLogFormat.LogWriter()) { + writer.initialize(configuration, new Path(path), ugi); + writer.writeApplicationOwner(ugi.getUserName()); - Map appAcls = new HashMap(); - appAcls.put(ApplicationAccessType.VIEW_APP, ugi.getUserName()); - writer.writeApplicationACLs(appAcls); + Map appAcls = new HashMap(); + appAcls.put(ApplicationAccessType.VIEW_APP, ugi.getUserName()); + writer.writeApplicationACLs(appAcls); - writer.append(new AggregatedLogFormat.LogKey("container_0_0001_01_000001"), - new AggregatedLogFormat.LogValue(rootLogDirs, containerId,UserGroupInformation.getCurrentUser().getShortUserName())); - writer.close(); + writer.append(new AggregatedLogFormat.LogKey("container_0_0001_01_000001"), + new AggregatedLogFormat.LogValue(rootLogDirs, containerId, + UserGroupInformation.getCurrentUser().getShortUserName())); + } } private void writeLogs(String dirName) throws Exception { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java index 54606a6a38c..df972f01209 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java @@ -68,7 +68,6 @@ import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType; -import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Times; @@ -313,24 +312,21 @@ public class AppLogAggregatorImpl implements AppLogAggregator { } } - LogWriter writer = null; + if (pendingContainerInThisCycle.isEmpty()) { + sendLogAggregationReport(true, "", appFinished); + return; + } + + logAggregationTimes++; String diagnosticMessage = ""; boolean logAggregationSucceedInThisCycle = true; - try { - if (pendingContainerInThisCycle.isEmpty()) { - return; - } - - logAggregationTimes++; - + try (LogWriter writer = new LogWriter()){ try { - writer = - new LogWriter(this.conf, this.remoteNodeTmpLogFileForApp, - this.userUgi); + writer.initialize(this.conf, this.remoteNodeTmpLogFileForApp, + this.userUgi); // Write ACLs once when the writer is created. writer.writeApplicationACLs(appAcls); writer.writeApplicationOwner(this.userUgi.getShortUserName()); - } catch (IOException e1) { logAggregationSucceedInThisCycle = false; LOG.error("Cannot create writer for app " + this.applicationId @@ -371,11 +367,6 @@ public class AppLogAggregatorImpl implements AppLogAggregator { cleanupOldLogTimes++; } - if (writer != null) { - writer.close(); - writer = null; - } - long currentTime = System.currentTimeMillis(); final Path renamedPath = this.rollingMonitorInterval <= 0 ? remoteNodeLogFileForApp : new Path( @@ -416,29 +407,32 @@ public class AppLogAggregatorImpl implements AppLogAggregator { logAggregationSucceedInThisCycle = false; } } finally { - LogAggregationStatus logAggregationStatus = - logAggregationSucceedInThisCycle - ? LogAggregationStatus.RUNNING - : LogAggregationStatus.RUNNING_WITH_FAILURE; - sendLogAggregationReport(logAggregationStatus, diagnosticMessage); - if (appFinished) { - // If the app is finished, one extra final report with log aggregation - // status SUCCEEDED/FAILED will be sent to RM to inform the RM - // that the log aggregation in this NM is completed. - LogAggregationStatus finalLogAggregationStatus = - renameTemporaryLogFileFailed || !logAggregationSucceedInThisCycle - ? LogAggregationStatus.FAILED - : LogAggregationStatus.SUCCEEDED; - sendLogAggregationReport(finalLogAggregationStatus, ""); - } - - if (writer != null) { - writer.close(); - } + sendLogAggregationReport(logAggregationSucceedInThisCycle, + diagnosticMessage, appFinished); } } private void sendLogAggregationReport( + boolean logAggregationSucceedInThisCycle, String diagnosticMessage, + boolean appFinished) { + LogAggregationStatus logAggregationStatus = + logAggregationSucceedInThisCycle + ? LogAggregationStatus.RUNNING + : LogAggregationStatus.RUNNING_WITH_FAILURE; + sendLogAggregationReportInternal(logAggregationStatus, diagnosticMessage); + if (appFinished) { + // If the app is finished, one extra final report with log aggregation + // status SUCCEEDED/FAILED will be sent to RM to inform the RM + // that the log aggregation in this NM is completed. + LogAggregationStatus finalLogAggregationStatus = + renameTemporaryLogFileFailed || !logAggregationSucceedInThisCycle + ? LogAggregationStatus.FAILED + : LogAggregationStatus.SUCCEEDED; + sendLogAggregationReportInternal(finalLogAggregationStatus, ""); + } + } + + private void sendLogAggregationReportInternal( LogAggregationStatus logAggregationStatus, String diagnosticMessage) { LogAggregationReport report = Records.newRecord(LogAggregationReport.class);