diff --git a/hadoop-tools/hadoop-archive-logs/src/main/java/org/apache/hadoop/tools/HadoopArchiveLogsRunner.java b/hadoop-tools/hadoop-archive-logs/src/main/java/org/apache/hadoop/tools/HadoopArchiveLogsRunner.java index b3c2de61125..b736694651a 100644 --- a/hadoop-tools/hadoop-archive-logs/src/main/java/org/apache/hadoop/tools/HadoopArchiveLogsRunner.java +++ b/hadoop-tools/hadoop-archive-logs/src/main/java/org/apache/hadoop/tools/HadoopArchiveLogsRunner.java @@ -18,6 +18,7 @@ package org.apache.hadoop.tools; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.GnuParser; @@ -65,6 +66,9 @@ public class HadoopArchiveLogsRunner implements Tool { private JobConf conf; + @VisibleForTesting + HadoopArchives hadoopArchives; + private static final FsPermission HAR_DIR_PERM = new FsPermission(FsAction.ALL, FsAction.READ_EXECUTE, FsAction.NONE); private static final FsPermission HAR_INNER_FILES_PERM = @@ -72,6 +76,7 @@ public class HadoopArchiveLogsRunner implements Tool { public HadoopArchiveLogsRunner(Configuration conf) { setConf(conf); + hadoopArchives = new HadoopArchives(conf); } public static void main(String[] args) { @@ -132,10 +137,10 @@ public class HadoopArchiveLogsRunner implements Tool { conf.set("mapreduce.framework.name", "local"); // Set the umask so we get 640 files and 750 dirs conf.set("fs.permissions.umask-mode", "027"); - HadoopArchives ha = new HadoopArchives(conf); + String harName = appId + ".har"; String[] haArgs = { "-archiveName", - appId + ".har", + harName, "-p", remoteAppLogDir, "*", @@ -146,15 +151,26 @@ public class HadoopArchiveLogsRunner implements Tool { sb.append("\n\t").append(haArg); } LOG.info(sb.toString()); - ha.run(haArgs); + int exitCode = hadoopArchives.run(haArgs); + if (exitCode != 0) { + LOG.warn("Failed to create archives for " + appId); + return -1; + } FileSystem fs = null; // Move har file to correct location and delete original logs try { fs = FileSystem.get(conf); - Path harDest = new Path(remoteAppLogDir, appId + ".har"); + Path harPath = new Path(workingDir, harName); + if (!fs.exists(harPath) || + fs.listStatus(harPath).length == 0) { + LOG.warn("The created archive \"" + harName + + "\" is missing or empty."); + return -1; + } + Path harDest = new Path(remoteAppLogDir, harName); LOG.info("Moving har to original location"); - fs.rename(new Path(workingDir, appId + ".har"), harDest); + fs.rename(harPath, harDest); LOG.info("Deleting original logs"); for (FileStatus original : fs.listStatus(new Path(remoteAppLogDir), new PathFilter() { diff --git a/hadoop-tools/hadoop-archive-logs/src/test/java/org/apache/hadoop/tools/TestHadoopArchiveLogsRunner.java b/hadoop-tools/hadoop-archive-logs/src/test/java/org/apache/hadoop/tools/TestHadoopArchiveLogsRunner.java index fad9b974ac9..5369338d486 100644 --- a/hadoop-tools/hadoop-archive-logs/src/test/java/org/apache/hadoop/tools/TestHadoopArchiveLogsRunner.java +++ b/hadoop-tools/hadoop-archive-logs/src/test/java/org/apache/hadoop/tools/TestHadoopArchiveLogsRunner.java @@ -32,112 +32,148 @@ import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.MiniYARNCluster; +import org.junit.After; import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.Timeout; +import org.mockito.Mockito; import java.io.IOException; import java.util.Arrays; import java.util.Comparator; import java.util.Random; -import static org.junit.Assert.assertEquals; +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; public class TestHadoopArchiveLogsRunner { private static final int FILE_SIZE_INCREMENT = 4096; + private static final int[] FILE_SIZES = {3, 4, 2}; + private static final int FILE_COUNT = FILE_SIZES.length; private static final byte[] DUMMY_DATA = new byte[FILE_SIZE_INCREMENT]; static { new Random().nextBytes(DUMMY_DATA); } - @Test(timeout = 50000) - public void testHadoopArchiveLogs() throws Exception { - MiniDFSCluster dfsCluster = null; - FileSystem fs = null; - try (MiniYARNCluster yarnCluster = - new MiniYARNCluster(TestHadoopArchiveLogsRunner.class.getSimpleName(), - 1, 2, 1, 1)) { - Configuration conf = new YarnConfiguration(); - conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true); - conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, true); - yarnCluster.init(conf); - yarnCluster.start(); - conf = yarnCluster.getConfig(); - dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); - conf = new JobConf(conf); + private Configuration conf; + private MiniDFSCluster dfsCluster; + private MiniYARNCluster yarnCluster; + private FileSystem fs; + private ApplicationId app1; + private Path app1Path; + private Path workingDir; + private Path remoteRootLogDir; + private String suffix; - ApplicationId app1 = - ApplicationId.newInstance(System.currentTimeMillis(), 1); - fs = FileSystem.get(conf); - Path remoteRootLogDir = new Path(conf.get( - YarnConfiguration.NM_REMOTE_APP_LOG_DIR, - YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR)); - Path workingDir = new Path(remoteRootLogDir, "archive-logs-work"); - String suffix = "logs"; - Path logDir = new Path(remoteRootLogDir, - new Path(System.getProperty("user.name"), suffix)); - fs.mkdirs(logDir); - Path app1Path = new Path(logDir, app1.toString()); - fs.mkdirs(app1Path); - createFile(fs, new Path(app1Path, "log1"), 3); - createFile(fs, new Path(app1Path, "log2"), 4); - createFile(fs, new Path(app1Path, "log3"), 2); - FileStatus[] app1Files = fs.listStatus(app1Path); - Assert.assertEquals(3, app1Files.length); + @Rule + public Timeout globalTimeout = new Timeout(50000); - String[] args = new String[]{ - "-appId", app1.toString(), - "-user", System.getProperty("user.name"), - "-workingDir", workingDir.toString(), - "-remoteRootLogDir", remoteRootLogDir.toString(), - "-suffix", suffix}; - final HadoopArchiveLogsRunner halr = new HadoopArchiveLogsRunner(conf); - assertEquals(0, ToolRunner.run(halr, args)); + @Before + public void setup() throws Exception { + yarnCluster = new MiniYARNCluster( + TestHadoopArchiveLogsRunner.class.getSimpleName(), 1, 2, 1, 1); + conf = new YarnConfiguration(); + conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true); + conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, true); + yarnCluster.init(conf); + yarnCluster.start(); + conf = yarnCluster.getConfig(); + dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); + conf = new JobConf(conf); - fs = FileSystem.get(conf); - app1Files = fs.listStatus(app1Path); - Assert.assertEquals(1, app1Files.length); - FileStatus harFile = app1Files[0]; - Assert.assertEquals(app1.toString() + ".har", harFile.getPath().getName()); - Path harPath = new Path("har:///" + harFile.getPath().toUri().getRawPath()); - FileStatus[] harLogs = HarFs.get(harPath.toUri(), conf).listStatus(harPath); - Assert.assertEquals(3, harLogs.length); - Arrays.sort(harLogs, new Comparator() { - @Override - public int compare(FileStatus o1, FileStatus o2) { - return o1.getPath().getName().compareTo(o2.getPath().getName()); - } - }); - Assert.assertEquals("log1", harLogs[0].getPath().getName()); - Assert.assertEquals(3 * FILE_SIZE_INCREMENT, harLogs[0].getLen()); - Assert.assertEquals( - new FsPermission(FsAction.READ_WRITE, FsAction.READ, FsAction.NONE), - harLogs[0].getPermission()); - Assert.assertEquals(System.getProperty("user.name"), - harLogs[0].getOwner()); - Assert.assertEquals("log2", harLogs[1].getPath().getName()); - Assert.assertEquals(4 * FILE_SIZE_INCREMENT, harLogs[1].getLen()); - Assert.assertEquals( - new FsPermission(FsAction.READ_WRITE, FsAction.READ, FsAction.NONE), - harLogs[1].getPermission()); - Assert.assertEquals(System.getProperty("user.name"), - harLogs[1].getOwner()); - Assert.assertEquals("log3", harLogs[2].getPath().getName()); - Assert.assertEquals(2 * FILE_SIZE_INCREMENT, harLogs[2].getLen()); - Assert.assertEquals( - new FsPermission(FsAction.READ_WRITE, FsAction.READ, FsAction.NONE), - harLogs[2].getPermission()); - Assert.assertEquals(System.getProperty("user.name"), - harLogs[2].getOwner()); - Assert.assertEquals(0, fs.listStatus(workingDir).length); - } finally { - if (fs != null) { - fs.close(); - } - if (dfsCluster != null) { - dfsCluster.shutdown(); - } + app1 = ApplicationId.newInstance(System.currentTimeMillis(), 1); + fs = FileSystem.get(conf); + remoteRootLogDir = new Path(conf.get( + YarnConfiguration.NM_REMOTE_APP_LOG_DIR, + YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR)); + workingDir = new Path(remoteRootLogDir, "archive-logs-work"); + suffix = "logs"; + Path logDir = new Path(remoteRootLogDir, + new Path(System.getProperty("user.name"), suffix)); + fs.mkdirs(logDir); + app1Path = new Path(logDir, app1.toString()); + fs.mkdirs(app1Path); + for (int i = 0; i < FILE_COUNT; i++) { + createFile(fs, new Path(app1Path, "log" + (i + 1)), FILE_SIZES[i]); } + FileStatus[] app1Files = fs.listStatus(app1Path); + Assert.assertEquals(FILE_COUNT, app1Files.length); + } + + @After + public void teardown() throws IOException { + if (fs != null) { + fs.close(); + } + if (yarnCluster != null) { + yarnCluster.close(); + } + if (dfsCluster != null) { + dfsCluster.shutdown(); + } + } + + @Test + public void testHadoopArchiveLogs() throws Exception { + String[] args = getArgs(); + final HadoopArchiveLogsRunner halr = new HadoopArchiveLogsRunner(conf); + assertEquals(0, ToolRunner.run(halr, args)); + + fs = FileSystem.get(conf); + FileStatus[] app1Files = fs.listStatus(app1Path); + Assert.assertEquals(1, app1Files.length); + FileStatus harFile = app1Files[0]; + Assert.assertEquals(app1.toString() + ".har", harFile.getPath().getName()); + Path harPath = new Path("har:///" + harFile.getPath().toUri().getRawPath()); + FileStatus[] harLogs = HarFs.get(harPath.toUri(), conf).listStatus(harPath); + Assert.assertEquals(FILE_COUNT, harLogs.length); + Arrays.sort(harLogs, new Comparator() { + @Override + public int compare(FileStatus o1, FileStatus o2) { + return o1.getPath().getName().compareTo(o2.getPath().getName()); + } + }); + for (int i = 0; i < FILE_COUNT; i++) { + FileStatus harLog = harLogs[i]; + Assert.assertEquals("log" + (i + 1), harLog.getPath().getName()); + Assert.assertEquals(FILE_SIZES[i] * FILE_SIZE_INCREMENT, harLog.getLen()); + Assert.assertEquals( + new FsPermission(FsAction.READ_WRITE, FsAction.READ, FsAction.NONE), + harLog.getPermission()); + Assert.assertEquals(System.getProperty("user.name"), + harLog.getOwner()); + } + Assert.assertEquals(0, fs.listStatus(workingDir).length); + } + + @Test + public void testHadoopArchiveLogsWithArchiveError() throws Exception { + String[] args = getArgs(); + final HadoopArchiveLogsRunner halr = new HadoopArchiveLogsRunner(conf); + HadoopArchives mockHadoopArchives = mock(HadoopArchives.class); + when(mockHadoopArchives.run(Mockito.any())).thenReturn(-1); + halr.hadoopArchives = mockHadoopArchives; + assertNotEquals(0, ToolRunner.run(halr, args)); + + // Make sure the original log files are intact + FileStatus[] app1Files = fs.listStatus(app1Path); + assertEquals(FILE_COUNT, app1Files.length); + for (int i = 0; i < FILE_COUNT; i++) { + Assert.assertEquals(FILE_SIZES[i] * FILE_SIZE_INCREMENT, + app1Files[i].getLen()); + } + } + + private String[] getArgs() { + return new String[]{ + "-appId", app1.toString(), + "-user", System.getProperty("user.name"), + "-workingDir", workingDir.toString(), + "-remoteRootLogDir", remoteRootLogDir.toString(), + "-suffix", suffix}; } private static void createFile(FileSystem fs, Path p, long sizeMultiple)