From 2112ef61e083104b80c62819bacc90f01b3065d2 Mon Sep 17 00:00:00 2001 From: Akira Ajisaka Date: Wed, 13 Apr 2022 12:22:17 +0900 Subject: [PATCH] YARN-10553. Refactor TestDistributedShell (#4159) (cherry picked from commit 890f2da624465473a5f401a3bcfc4bbd068289a1) Conflicts: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSWithMultipleNodeManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java Co-authored-by: Ahmed Hussein <50450311+amahussein@users.noreply.github.com> --- .../distributedshell/ApplicationMaster.java | 2 +- .../applications/distributedshell/Client.java | 8 +- .../DistributedShellBaseTest.java | 607 ++++++ .../TestDSSleepingAppMaster.java | 9 +- .../distributedshell/TestDSTimelineV10.java | 843 ++++++++ .../distributedshell/TestDSTimelineV15.java | 100 + .../distributedshell/TestDSTimelineV20.java | 484 +++++ .../TestDSWithMultipleNodeManager.java | 570 ++--- .../TestDistributedShell.java | 1862 ----------------- 9 files changed, 2375 insertions(+), 2110 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/DistributedShellBaseTest.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSTimelineV10.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSTimelineV15.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSTimelineV20.java delete mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index ae14d0931ab..765ca822304 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -781,7 +781,7 @@ public class ApplicationMaster { new HelpFormatter().printHelp("ApplicationMaster", opts); } - private void cleanup() { + protected void cleanup() { try { appSubmitterUgi.doAs(new PrivilegedExceptionAction() { @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java index 5121beb8465..212d1957aec 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java @@ -1414,21 +1414,19 @@ public class Client { } int waitCount = 0; LOG.info("Waiting for Client to exit loop"); - while (!isRunning.get()) { + while (isRunning.get()) { try { Thread.sleep(50); } catch (InterruptedException ie) { // do nothing } finally { - waitCount++; - if (isRunning.get() || waitCount > 2000) { + if (++waitCount > 2000) { break; } } } - LOG.info("Stopping yarnClient within the Client"); + LOG.info("Stopping yarnClient within the DS Client"); yarnClient.stop(); - yarnClient.waitForServiceToStop(clientTimeout); LOG.info("done stopping Client"); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/DistributedShellBaseTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/DistributedShellBaseTest.java new file mode 100644 index 00000000000..28cdf8f8223 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/DistributedShellBaseTest.java @@ -0,0 +1,607 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.applications.distributedshell; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.nio.file.StandardCopyOption; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.rules.TemporaryFolder; +import org.junit.rules.TestName; +import org.junit.rules.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.net.ServerSocketUtil; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.util.JarFinder; +import org.apache.hadoop.util.Shell; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.ContainerReport; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.MiniYARNCluster; +import org.apache.hadoop.yarn.server.nodemanager.NodeManager; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.server.timeline.NameValuePair; +import org.apache.hadoop.yarn.util.LinuxResourceCalculatorPlugin; +import org.apache.hadoop.yarn.util.ProcfsBasedProcessTree; + +/** + * Base class for testing DistributedShell features. + */ +public abstract class DistributedShellBaseTest { + protected static final int MIN_ALLOCATION_MB = 128; + protected static final int NUM_DATA_NODES = 1; + protected static final int TEST_TIME_OUT = 160000; + // set the timeout of the yarnClient to be 95% of the globalTimeout. + protected static final int TEST_TIME_WINDOW_EXPIRE = + (TEST_TIME_OUT * 90) / 100; + private static final Logger LOG = + LoggerFactory.getLogger(DistributedShellBaseTest.class); + private static final String APP_MASTER_JAR = + JarFinder.getJar(ApplicationMaster.class); + private static final int NUM_NMS = 1; + // set the timeout of the yarnClient to be 95% of the globalTimeout. + private static final String YARN_CLIENT_TIMEOUT = + String.valueOf(TEST_TIME_WINDOW_EXPIRE); + private static final String[] COMMON_ARGS = { + "--jar", + APP_MASTER_JAR, + "--timeout", + YARN_CLIENT_TIMEOUT, + "--appname", + "" + }; + private static MiniDFSCluster hdfsCluster = null; + private static MiniYARNCluster yarnCluster = null; + private static String yarnSiteBackupPath = null; + private static String yarnSitePath = null; + @Rule + public Timeout globalTimeout = new Timeout(TEST_TIME_OUT, + TimeUnit.MILLISECONDS); + @Rule + public TemporaryFolder tmpFolder = new TemporaryFolder(); + @Rule + public TestName name = new TestName(); + private Client dsClient; + private YarnConfiguration conf = null; + // location of the filesystem timeline writer for timeline service v.2 + private String timelineV2StorageDir = null; + + @BeforeClass + public static void setupUnitTests() throws Exception { + URL url = Thread.currentThread().getContextClassLoader().getResource( + "yarn-site.xml"); + if (url == null) { + throw new RuntimeException( + "Could not find 'yarn-site.xml' dummy file in classpath"); + } + // backup the original yarn-site file. + yarnSitePath = url.getPath(); + yarnSiteBackupPath = url.getPath() + "-backup"; + Files.copy(Paths.get(yarnSitePath), + Paths.get(yarnSiteBackupPath), + StandardCopyOption.COPY_ATTRIBUTES, + StandardCopyOption.REPLACE_EXISTING); + } + + @AfterClass + public static void tearDownUnitTests() throws Exception { + // shutdown the clusters. + shutdownYarnCluster(); + shutdownHdfsCluster(); + if (yarnSitePath == null || yarnSiteBackupPath == null) { + return; + } + // restore the original yarn-site file. + if (Files.exists(Paths.get(yarnSiteBackupPath))) { + Files.move(Paths.get(yarnSiteBackupPath), Paths.get(yarnSitePath), + StandardCopyOption.REPLACE_EXISTING); + } + } + + /** + * Utility function to merge two String arrays to form a new String array for + * our arguments. + * + * @param args the first set of the arguments. + * @param newArgs the second set of the arguments. + * @return a String array consists of {args, newArgs} + */ + protected static String[] mergeArgs(String[] args, String[] newArgs) { + int length = args.length + newArgs.length; + String[] result = new String[length]; + System.arraycopy(args, 0, result, 0, args.length); + System.arraycopy(newArgs, 0, result, args.length, newArgs.length); + return result; + } + + protected static String[] createArguments(Supplier testNameProvider, + String... args) { + String[] res = mergeArgs(COMMON_ARGS, args); + // set the application name so we can track down which command is running. + res[COMMON_ARGS.length - 1] = testNameProvider.get(); + return res; + } + + protected static String getSleepCommand(int sec) { + // Windows doesn't have a sleep command, ping -n does the trick + return Shell.WINDOWS ? "ping -n " + (sec + 1) + " 127.0.0.1 >nul" + : "sleep " + sec; + } + + protected static String getListCommand() { + return Shell.WINDOWS ? "dir" : "ls"; + } + + protected static String getCatCommand() { + return Shell.WINDOWS ? "type" : "cat"; + } + + protected static void shutdownYarnCluster() { + if (yarnCluster != null) { + try { + yarnCluster.stop(); + } finally { + yarnCluster = null; + } + } + } + + protected static void shutdownHdfsCluster() { + if (hdfsCluster != null) { + try { + hdfsCluster.shutdown(); + } finally { + hdfsCluster = null; + } + } + } + + public String getTimelineV2StorageDir() { + return timelineV2StorageDir; + } + + public void setTimelineV2StorageDir() throws Exception { + timelineV2StorageDir = tmpFolder.newFolder().getAbsolutePath(); + } + + @Before + public void setup() throws Exception { + setupInternal(NUM_NMS, new YarnConfiguration()); + } + + @After + public void tearDown() throws IOException { + cleanUpDFSClient(); + FileContext fsContext = FileContext.getLocalFSFileContext(); + fsContext + .delete( + new Path(conf.get(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH)), + true); + shutdownYarnCluster(); + shutdownHdfsCluster(); + } + + protected String[] createArgumentsWithAppName(String... args) { + return createArguments(() -> generateAppName(), args); + } + + protected void waitForContainersLaunch(YarnClient client, int nContainers, + AtomicReference appAttemptReportRef, + AtomicReference> containersListRef, + AtomicReference appAttemptIdRef, + AtomicReference thrownErrorRef) throws Exception { + GenericTestUtils.waitFor(() -> { + try { + List apps = client.getApplications(); + if (apps == null || apps.isEmpty()) { + return false; + } + ApplicationId appId = apps.get(0).getApplicationId(); + List appAttempts = + client.getApplicationAttempts(appId); + if (appAttempts == null || appAttempts.isEmpty()) { + return false; + } + ApplicationAttemptId attemptId = + appAttempts.get(0).getApplicationAttemptId(); + List containers = client.getContainers(attemptId); + if (containers == null || containers.size() < nContainers) { + return false; + } + containersListRef.set(containers); + appAttemptIdRef.set(attemptId); + appAttemptReportRef.set(appAttempts.get(0)); + } catch (Exception e) { + LOG.error("Exception waiting for Containers Launch", e); + thrownErrorRef.set(e); + } + return true; + }, 10, TEST_TIME_WINDOW_EXPIRE); + } + + protected abstract void customizeConfiguration(YarnConfiguration config) + throws Exception; + + protected String[] appendFlowArgsForTestDSShell(String[] args, + boolean defaultFlow) { + return args; + } + + protected String[] appendDomainArgsForTestDSShell(String[] args, + boolean haveDomain) { + String[] result = args; + if (haveDomain) { + String[] domainArgs = { + "--domain", + "TEST_DOMAIN", + "--view_acls", + "reader_user reader_group", + "--modify_acls", + "writer_user writer_group", + "--create" + }; + result = mergeArgs(args, domainArgs); + } + return result; + } + + protected Client setAndGetDSClient(Configuration config) throws Exception { + dsClient = new Client(config); + return dsClient; + } + + protected Client setAndGetDSClient(String appMasterMainClass, + Configuration config) throws Exception { + dsClient = new Client(appMasterMainClass, config); + return dsClient; + } + + protected void baseTestDSShell(boolean haveDomain, boolean defaultFlow) + throws Exception { + String[] baseArgs = createArgumentsWithAppName( + "--num_containers", + "2", + "--shell_command", + getListCommand(), + "--master_memory", + "512", + "--master_vcores", + "2", + "--container_memory", + "128", + "--container_vcores", + "1"); + String[] domainArgs = appendDomainArgsForTestDSShell(baseArgs, haveDomain); + String[] args = appendFlowArgsForTestDSShell(domainArgs, defaultFlow); + + LOG.info("Initializing DS Client"); + YarnClient yarnClient; + dsClient = setAndGetDSClient(new Configuration(yarnCluster.getConfig())); + boolean initSuccess = dsClient.init(args); + Assert.assertTrue(initSuccess); + LOG.info("Running DS Client"); + final AtomicBoolean result = new AtomicBoolean(false); + Thread t = new Thread(() -> { + try { + result.set(dsClient.run()); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + t.start(); + + yarnClient = YarnClient.createYarnClient(); + yarnClient.init(new Configuration(yarnCluster.getConfig())); + yarnClient.start(); + + AtomicInteger waitResult = new AtomicInteger(0); + AtomicReference appIdRef = + new AtomicReference<>(null); + AtomicReference appReportRef = + new AtomicReference<>(null); + GenericTestUtils.waitFor(() -> { + try { + List apps = yarnClient.getApplications(); + if (apps.size() == 0) { + return false; + } + ApplicationReport appReport = apps.get(0); + appReportRef.set(appReport); + appIdRef.set(appReport.getApplicationId()); + if (appReport.getHost().equals("N/A")) { + return false; + } + if (appReport.getRpcPort() == -1) { + waitResult.set(1); + } + if (appReport.getYarnApplicationState() == YarnApplicationState.FINISHED + && appReport.getFinalApplicationStatus() != + FinalApplicationStatus.UNDEFINED) { + return true; + } + } catch (Exception e) { + LOG.error("Exception get application from Yarn Client", e); + waitResult.set(2); + } + return waitResult.get() != 0; + }, 10, TEST_TIME_WINDOW_EXPIRE); + t.join(); + if (waitResult.get() == 2) { + // Exception was raised + Assert.fail("Exception in getting application report. Failed"); + } + if (waitResult.get() == 1) { + Assert.assertEquals("Failed waiting for expected rpc port to be -1.", + -1, appReportRef.get().getRpcPort()); + } + checkTimeline(appIdRef.get(), defaultFlow, haveDomain, appReportRef.get()); + } + + protected void baseTestDSShell(boolean haveDomain) throws Exception { + baseTestDSShell(haveDomain, true); + } + + protected void checkTimeline(ApplicationId appId, + boolean defaultFlow, boolean haveDomain, + ApplicationReport appReport) throws Exception { + TimelineDomain domain = null; + if (haveDomain) { + domain = yarnCluster.getApplicationHistoryServer() + .getTimelineStore().getDomain("TEST_DOMAIN"); + Assert.assertNotNull(domain); + Assert.assertEquals("reader_user reader_group", domain.getReaders()); + Assert.assertEquals("writer_user writer_group", domain.getWriters()); + } + TimelineEntities entitiesAttempts = yarnCluster + .getApplicationHistoryServer() + .getTimelineStore() + .getEntities(ApplicationMaster.DSEntity.DS_APP_ATTEMPT.toString(), + null, null, null, null, null, null, null, null, null); + Assert.assertNotNull(entitiesAttempts); + Assert.assertEquals(1, entitiesAttempts.getEntities().size()); + Assert.assertEquals(2, entitiesAttempts.getEntities().get(0).getEvents() + .size()); + Assert.assertEquals(entitiesAttempts.getEntities().get(0).getEntityType(), + ApplicationMaster.DSEntity.DS_APP_ATTEMPT.toString()); + Assert.assertEquals(haveDomain ? domain.getId() : "DEFAULT", + entitiesAttempts.getEntities().get(0).getDomainId()); + String currAttemptEntityId = + entitiesAttempts.getEntities().get(0).getEntityId(); + ApplicationAttemptId attemptId = ApplicationAttemptId.fromString( + currAttemptEntityId); + NameValuePair primaryFilter = new NameValuePair( + ApplicationMaster.APPID_TIMELINE_FILTER_NAME, + attemptId.getApplicationId().toString()); + TimelineEntities entities = yarnCluster + .getApplicationHistoryServer() + .getTimelineStore() + .getEntities(ApplicationMaster.DSEntity.DS_CONTAINER.toString(), null, + null, null, null, null, primaryFilter, null, null, null); + Assert.assertNotNull(entities); + Assert.assertEquals(2, entities.getEntities().size()); + Assert.assertEquals(entities.getEntities().get(0).getEntityType(), + ApplicationMaster.DSEntity.DS_CONTAINER.toString()); + + String entityId = entities.getEntities().get(0).getEntityId(); + TimelineEntity entity = + yarnCluster.getApplicationHistoryServer().getTimelineStore() + .getEntity(entityId, + ApplicationMaster.DSEntity.DS_CONTAINER.toString(), null); + Assert.assertNotNull(entity); + Assert.assertEquals(entityId, entity.getEntityId()); + Assert.assertEquals(haveDomain ? domain.getId() : "DEFAULT", + entities.getEntities().get(0).getDomainId()); + } + + protected String[] createArgsWithPostFix(int index, String... args) { + String[] res = mergeArgs(COMMON_ARGS, args); + // set the application name so we can track down which command is running. + res[COMMON_ARGS.length - 1] = generateAppName(String.format("%03d", + index)); + return res; + } + + protected String generateAppName() { + return generateAppName(null); + } + + protected String generateAppName(String postFix) { + return name.getMethodName().replaceFirst("test", "") + .concat(postFix == null ? "" : "-" + postFix); + } + + protected void setUpHDFSCluster() throws IOException { + if (hdfsCluster == null) { + HdfsConfiguration hdfsConfig = new HdfsConfiguration(); + hdfsCluster = new MiniDFSCluster.Builder(hdfsConfig) + .numDataNodes(NUM_DATA_NODES).build(); + hdfsCluster.waitActive(); + } + } + + protected void setUpYarnCluster(int numNodeManagers, + YarnConfiguration yarnConfig) throws Exception { + if (yarnCluster != null) { + return; + } + yarnCluster = + new MiniYARNCluster(getClass().getSimpleName(), 1, numNodeManagers, + 1, 1); + yarnCluster.init(yarnConfig); + yarnCluster.start(); + // wait for the node managers to register. + waitForNMsToRegister(); + conf.set( + YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS, + MiniYARNCluster.getHostname() + ":" + + yarnCluster.getApplicationHistoryServer().getPort()); + Configuration yarnClusterConfig = yarnCluster.getConfig(); + yarnClusterConfig.set(YarnConfiguration.YARN_APPLICATION_CLASSPATH, + new File(yarnSitePath).getParent()); + // write the document to a buffer (not directly to the file, as that + // can cause the file being written to get read -which will then fail. + ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); + yarnClusterConfig.writeXml(bytesOut); + bytesOut.close(); + // write the bytes to the file in the classpath + OutputStream os = new FileOutputStream(yarnSitePath); + os.write(bytesOut.toByteArray()); + os.close(); + } + + protected void setupInternal(int numNodeManagers, + YarnConfiguration yarnConfig) throws Exception { + LOG.info("========== Setting UP UnitTest {}#{} ==========", + getClass().getCanonicalName(), name.getMethodName()); + LOG.info("Starting up YARN cluster. Timeline version {}", + getTimelineVersion()); + conf = yarnConfig; + conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, + MIN_ALLOCATION_MB); + // reduce the tearDown waiting time + conf.setLong(YarnConfiguration.DISPATCHER_DRAIN_EVENTS_TIMEOUT, 1000); + conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 500); + conf.set("yarn.log.dir", "target"); + conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); + // mark if we need to launch the v1 timeline server + // disable aux-service based timeline aggregators + conf.set(YarnConfiguration.NM_AUX_SERVICES, ""); + conf.setBoolean(YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED, true); + + conf.set(YarnConfiguration.NM_VMEM_PMEM_RATIO, "8"); + conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true); + conf.set("mapreduce.jobhistory.address", + "0.0.0.0:" + ServerSocketUtil.getPort(10021, 10)); + // Enable ContainersMonitorImpl + conf.set(YarnConfiguration.NM_CONTAINER_MON_RESOURCE_CALCULATOR, + LinuxResourceCalculatorPlugin.class.getName()); + conf.set(YarnConfiguration.NM_CONTAINER_MON_PROCESS_TREE, + ProcfsBasedProcessTree.class.getName()); + conf.setBoolean(YarnConfiguration.NM_PMEM_CHECK_ENABLED, true); + conf.setBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED, true); + conf.setBoolean( + YarnConfiguration.YARN_MINICLUSTER_CONTROL_RESOURCE_MONITORING, true); + conf.setBoolean(YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_ENABLED, + true); + conf.setBoolean( + YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true); + conf.setInt(YarnConfiguration.NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH, + 10); + conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER, + YarnConfiguration.PROCESSOR_RM_PLACEMENT_CONSTRAINTS_HANDLER); + // ATS version specific settings + conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, + getTimelineVersion()); + // setup the configuration of relevant for each TimelineService version. + customizeConfiguration(conf); + // setup the yarn cluster. + setUpYarnCluster(numNodeManagers, conf); + } + + protected NodeManager getNodeManager(int index) { + return yarnCluster.getNodeManager(index); + } + + protected MiniYARNCluster getYarnCluster() { + return yarnCluster; + } + + protected void setConfiguration(String key, String value) { + conf.set(key, value); + } + + protected Configuration getYarnClusterConfiguration() { + return yarnCluster.getConfig(); + } + + protected Configuration getConfiguration() { + return conf; + } + + protected ResourceManager getResourceManager() { + return yarnCluster.getResourceManager(); + } + + protected ResourceManager getResourceManager(int index) { + return yarnCluster.getResourceManager(index); + } + + protected Client getDSClient() { + return dsClient; + } + + protected void resetDSClient() { + dsClient = null; + } + + protected abstract float getTimelineVersion(); + + protected void cleanUpDFSClient() { + if (getDSClient() != null) { + getDSClient().sendStopSignal(); + resetDSClient(); + } + } + + private void waitForNMsToRegister() throws Exception { + GenericTestUtils.waitFor(() -> { + RMContext rmContext = yarnCluster.getResourceManager().getRMContext(); + return (rmContext.getRMNodes().size() >= NUM_NMS); + }, 100, 60000); + } + + protected MiniDFSCluster getHDFSCluster() { + return hdfsCluster; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSSleepingAppMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSSleepingAppMaster.java index 25975bf8daf..ae25ece1f82 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSSleepingAppMaster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSSleepingAppMaster.java @@ -18,11 +18,10 @@ package org.apache.hadoop.yarn.applications.distributedshell; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class TestDSSleepingAppMaster extends ApplicationMaster{ +public class TestDSSleepingAppMaster extends ApplicationMaster { private static final Logger LOG = LoggerFactory .getLogger(TestDSSleepingAppMaster.class); @@ -30,8 +29,8 @@ public class TestDSSleepingAppMaster extends ApplicationMaster{ public static void main(String[] args) { boolean result = false; + TestDSSleepingAppMaster appMaster = new TestDSSleepingAppMaster(); try { - TestDSSleepingAppMaster appMaster = new TestDSSleepingAppMaster(); boolean doRun = appMaster.init(args); if (!doRun) { System.exit(0); @@ -48,6 +47,10 @@ public class TestDSSleepingAppMaster extends ApplicationMaster{ result = appMaster.finish(); } catch (Throwable t) { System.exit(1); + } finally { + if (appMaster != null) { + appMaster.cleanup(); + } } if (result) { LOG.info("Application Master completed successfully. exiting"); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSTimelineV10.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSTimelineV10.java new file mode 100644 index 00000000000..15dc1cb04ee --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSTimelineV10.java @@ -0,0 +1,843 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.applications.distributedshell; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.io.PrintWriter; +import java.io.UncheckedIOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; + +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.cli.MissingArgumentException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.test.LambdaTestUtils; +import org.apache.hadoop.util.Shell; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.LogAggregationContext; +import org.apache.hadoop.yarn.client.api.impl.DirectTimelineWriter; +import org.apache.hadoop.yarn.client.api.impl.TestTimelineClient; +import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl; +import org.apache.hadoop.yarn.client.api.impl.TimelineWriter; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.ResourceNotFoundException; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.apache.hadoop.yarn.util.Records; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + +/** + * Unit tests implementations for distributed shell on TimeLineV1. + */ +public class TestDSTimelineV10 extends DistributedShellBaseTest { + private static final Logger LOG = + LoggerFactory.getLogger(TestDSTimelineV10.class); + + @Override + protected float getTimelineVersion() { + return 1.0f; + } + + @Override + protected void cleanUpDFSClient() { + + } + + @Test + public void testDSShellWithDomain() throws Exception { + baseTestDSShell(true); + } + + @Test + public void testDSShellWithoutDomain() throws Exception { + baseTestDSShell(false); + } + + @Test + public void testDSRestartWithPreviousRunningContainers() throws Exception { + String[] args = createArgumentsWithAppName( + "--num_containers", + "1", + "--shell_command", + getSleepCommand(8), + "--master_memory", + "512", + "--container_memory", + "128", + "--keep_containers_across_application_attempts" + ); + + LOG.info("Initializing DS Client"); + setAndGetDSClient(TestDSFailedAppMaster.class.getName(), + new Configuration(getYarnClusterConfiguration())); + + getDSClient().init(args); + + LOG.info("Running DS Client"); + boolean result = getDSClient().run(); + LOG.info("Client run completed. Result={}", result); + // application should succeed + Assert.assertTrue(result); + } + + /* + * The sleeping period in TestDSSleepingAppMaster is set as 5 seconds. + * Set attempt_failures_validity_interval as 2.5 seconds. It will check + * how many attempt failures for previous 2.5 seconds. + * The application is expected to be successful. + */ + @Test + public void testDSAttemptFailuresValidityIntervalSuccess() throws Exception { + String[] args = createArgumentsWithAppName( + "--num_containers", + "1", + "--shell_command", + getSleepCommand(8), + "--master_memory", + "512", + "--container_memory", + "128", + "--attempt_failures_validity_interval", + "2500" + ); + + LOG.info("Initializing DS Client"); + Configuration config = getYarnClusterConfiguration(); + config.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); + setAndGetDSClient(TestDSSleepingAppMaster.class.getName(), + new Configuration(config)); + + getDSClient().init(args); + + LOG.info("Running DS Client"); + boolean result = getDSClient().run(); + + LOG.info("Client run completed. Result=" + result); + // application should succeed + Assert.assertTrue(result); + } + + /* + * The sleeping period in TestDSSleepingAppMaster is set as 5 seconds. + * Set attempt_failures_validity_interval as 15 seconds. It will check + * how many attempt failure for previous 15 seconds. + * The application is expected to be fail. + */ + @Test + public void testDSAttemptFailuresValidityIntervalFailed() throws Exception { + String[] args = createArgumentsWithAppName( + "--num_containers", + "1", + "--shell_command", + getSleepCommand(8), + "--master_memory", + "512", + "--container_memory", + "128", + "--attempt_failures_validity_interval", + "15000" + ); + + LOG.info("Initializing DS Client"); + Configuration config = getYarnClusterConfiguration(); + config.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); + setAndGetDSClient(TestDSSleepingAppMaster.class.getName(), + new Configuration(config)); + + getDSClient().init(args); + + LOG.info("Running DS Client"); + boolean result = getDSClient().run(); + + LOG.info("Client run completed. Result=" + result); + // application should be failed + Assert.assertFalse(result); + } + + @Test + public void testDSShellWithCustomLogPropertyFile() throws Exception { + final File basedir = getBaseDirForTest(); + final File tmpDir = new File(basedir, "tmpDir"); + tmpDir.mkdirs(); + final File customLogProperty = new File(tmpDir, "custom_log4j.properties"); + if (customLogProperty.exists()) { + customLogProperty.delete(); + } + if (!customLogProperty.createNewFile()) { + Assert.fail("Can not create custom log4j property file."); + } + PrintWriter fileWriter = new PrintWriter(customLogProperty); + // set the output to DEBUG level + fileWriter.write("log4j.rootLogger=debug,stdout"); + fileWriter.close(); + String[] args = createArgumentsWithAppName( + "--num_containers", + "3", + "--shell_command", + "echo", + "--shell_args", + "HADOOP", + "--log_properties", + customLogProperty.getAbsolutePath(), + "--master_memory", + "512", + "--master_vcores", + "2", + "--container_memory", + "128", + "--container_vcores", + "1" + ); + + // Before run the DS, the default the log level is INFO + final Logger LOG_Client = + LoggerFactory.getLogger(Client.class); + Assert.assertTrue(LOG_Client.isInfoEnabled()); + Assert.assertFalse(LOG_Client.isDebugEnabled()); + final Logger LOG_AM = LoggerFactory.getLogger(ApplicationMaster.class); + Assert.assertTrue(LOG_AM.isInfoEnabled()); + Assert.assertFalse(LOG_AM.isDebugEnabled()); + + LOG.info("Initializing DS Client"); + setAndGetDSClient(new Configuration(getYarnClusterConfiguration())); + boolean initSuccess = getDSClient().init(args); + Assert.assertTrue(initSuccess); + + LOG.info("Running DS Client"); + boolean result = getDSClient().run(); + LOG.info("Client run completed. Result=" + result); + Assert.assertTrue(verifyContainerLog(3, null, true, "DEBUG") > 10); + //After DS is finished, the log level should be DEBUG + Assert.assertTrue(LOG_Client.isInfoEnabled()); + Assert.assertTrue(LOG_Client.isDebugEnabled()); + Assert.assertTrue(LOG_AM.isInfoEnabled()); + Assert.assertTrue(LOG_AM.isDebugEnabled()); + } + + @Test + public void testSpecifyingLogAggregationContext() throws Exception { + String regex = ".*(foo|bar)\\d"; + String[] args = createArgumentsWithAppName( + "--shell_command", + "echo", + "--rolling_log_pattern", + regex + ); + setAndGetDSClient(new Configuration(getYarnClusterConfiguration())); + Assert.assertTrue(getDSClient().init(args)); + + ApplicationSubmissionContext context = + Records.newRecord(ApplicationSubmissionContext.class); + getDSClient().specifyLogAggregationContext(context); + LogAggregationContext logContext = context.getLogAggregationContext(); + assertEquals(logContext.getRolledLogsIncludePattern(), regex); + assertTrue(logContext.getRolledLogsExcludePattern().isEmpty()); + } + + @Test + public void testDSShellWithMultipleArgs() throws Exception { + String[] args = createArgumentsWithAppName( + "--num_containers", + "4", + "--shell_command", + "echo", + "--shell_args", + "HADOOP YARN MAPREDUCE HDFS", + "--master_memory", + "512", + "--master_vcores", + "2", + "--container_memory", + "128", + "--container_vcores", + "1" + ); + + LOG.info("Initializing DS Client"); + setAndGetDSClient(new Configuration(getYarnClusterConfiguration())); + boolean initSuccess = getDSClient().init(args); + Assert.assertTrue(initSuccess); + LOG.info("Running DS Client"); + + boolean result = getDSClient().run(); + LOG.info("Client run completed. Result=" + result); + List expectedContent = new ArrayList<>(); + expectedContent.add("HADOOP YARN MAPREDUCE HDFS"); + verifyContainerLog(4, expectedContent, false, ""); + } + + @Test + public void testDSShellWithShellScript() throws Exception { + final File basedir = getBaseDirForTest(); + final File tmpDir = new File(basedir, "tmpDir"); + tmpDir.mkdirs(); + final File customShellScript = new File(tmpDir, "custom_script.sh"); + if (customShellScript.exists()) { + customShellScript.delete(); + } + if (!customShellScript.createNewFile()) { + Assert.fail("Can not create custom shell script file."); + } + PrintWriter fileWriter = new PrintWriter(customShellScript); + // set the output to DEBUG level + fileWriter.write("echo testDSShellWithShellScript"); + fileWriter.close(); + LOG.info(customShellScript.getAbsolutePath()); + String[] args = createArgumentsWithAppName( + "--num_containers", + "1", + "--shell_script", + customShellScript.getAbsolutePath(), + "--master_memory", + "512", + "--master_vcores", + "2", + "--container_memory", + "128", + "--container_vcores", + "1" + ); + + LOG.info("Initializing DS Client"); + setAndGetDSClient(new Configuration(getYarnClusterConfiguration())); + Assert.assertTrue(getDSClient().init(args)); + LOG.info("Running DS Client"); + assertTrue(getDSClient().run()); + List expectedContent = new ArrayList<>(); + expectedContent.add("testDSShellWithShellScript"); + verifyContainerLog(1, expectedContent, false, ""); + } + + @Test + public void testDSShellWithInvalidArgs() throws Exception { + setAndGetDSClient(new Configuration(getYarnClusterConfiguration())); + int appNameCounter = 0; + LOG.info("Initializing DS Client with no args"); + LambdaTestUtils.intercept(IllegalArgumentException.class, + "No args", + () -> getDSClient().init(new String[]{})); + + LOG.info("Initializing DS Client with no jar file"); + String[] noJarArgs = createArgsWithPostFix(appNameCounter++, + "--num_containers", + "2", + "--shell_command", + getListCommand(), + "--master_memory", + "512", + "--container_memory", + "128" + ); + String[] argsNoJar = Arrays.copyOfRange(noJarArgs, 2, noJarArgs.length); + LambdaTestUtils.intercept(IllegalArgumentException.class, + "No jar", + () -> getDSClient().init(argsNoJar)); + + LOG.info("Initializing DS Client with no shell command"); + String[] noShellCmdArgs = createArgsWithPostFix(appNameCounter++, + "--num_containers", + "2", + "--master_memory", + "512", + "--container_memory", + "128" + ); + LambdaTestUtils.intercept(IllegalArgumentException.class, + "No shell command", + () -> getDSClient().init(noShellCmdArgs)); + + LOG.info("Initializing DS Client with invalid no. of containers"); + + String[] numContainersArgs = createArgsWithPostFix(appNameCounter++, + "--num_containers", + "-1", + "--shell_command", + getListCommand(), + "--master_memory", + "512", + "--container_memory", + "128" + ); + LambdaTestUtils.intercept(IllegalArgumentException.class, + "Invalid no. of containers", + () -> getDSClient().init(numContainersArgs)); + + LOG.info("Initializing DS Client with invalid no. of vcores"); + + String[] vCoresArgs = createArgsWithPostFix(appNameCounter++, + "--num_containers", + "2", + "--shell_command", + getListCommand(), + "--master_memory", + "512", + "--master_vcores", + "-2", + "--container_memory", + "128", + "--container_vcores", + "1" + ); + getDSClient().init(vCoresArgs); + + LambdaTestUtils.intercept(IllegalArgumentException.class, + "Invalid virtual cores specified", + () -> { + getDSClient().init(vCoresArgs); + getDSClient().run(); + }); + + LOG.info("Initializing DS Client with --shell_command and --shell_script"); + + String[] scriptAndCmdArgs = createArgsWithPostFix(appNameCounter++, + "--num_containers", + "2", + "--shell_command", + getListCommand(), + "--master_memory", + "512", + "--master_vcores", + "2", + "--container_memory", + "128", + "--container_vcores", + "1", + "--shell_script", + "test.sh" + ); + + LambdaTestUtils.intercept(IllegalArgumentException.class, + "Can not specify shell_command option and shell_script option at " + + "the same time", + () -> getDSClient().init(scriptAndCmdArgs)); + + LOG.info( + "Initializing DS Client without --shell_command and --shell_script"); + + String[] noShellCmdNoScriptArgs = createArgsWithPostFix(appNameCounter++, + "--num_containers", + "2", + "--master_memory", + "512", + "--master_vcores", + "2", + "--container_memory", + "128", + "--container_vcores", + "1" + ); + LambdaTestUtils.intercept(IllegalArgumentException.class, + "No shell command or shell script specified " + + "to be executed by application master", + () -> getDSClient().init(noShellCmdNoScriptArgs)); + + LOG.info("Initializing DS Client with invalid container_type argument"); + String[] invalidTypeArgs = createArgsWithPostFix(appNameCounter++, + "--num_containers", + "2", + "--master_memory", + "512", + "--master_vcores", + "2", + "--container_memory", + "128", + "--container_vcores", + "1", + "--shell_command", + "date", + "--container_type", + "UNSUPPORTED_TYPE" + ); + LambdaTestUtils.intercept(IllegalArgumentException.class, + "Invalid container_type: UNSUPPORTED_TYPE", + () -> getDSClient().init(invalidTypeArgs)); + + String[] invalidMemArgs = createArgsWithPostFix(appNameCounter++, + "--num_containers", + "1", + "--shell_command", + getListCommand(), + "--master_resources", + "memory-mb=invalid" + ); + LambdaTestUtils.intercept(IllegalArgumentException.class, + () -> getDSClient().init(invalidMemArgs)); + + String[] invalidMasterResArgs = createArgsWithPostFix(appNameCounter++, + "--num_containers", + "1", + "--shell_command", + getListCommand(), + "--master_resources" + ); + LambdaTestUtils.intercept(MissingArgumentException.class, + () -> getDSClient().init(invalidMasterResArgs)); + } + + @Test + public void testDSTimelineClientWithConnectionRefuse() throws Exception { + ApplicationMaster am = new ApplicationMaster(); + final AtomicReference spyTimelineWriterRef = + new AtomicReference<>(null); + TimelineClientImpl client = new TimelineClientImpl() { + @Override + protected TimelineWriter createTimelineWriter(Configuration conf, + UserGroupInformation authUgi, com.sun.jersey.api.client.Client client, + URI resURI) throws IOException { + TimelineWriter timelineWriter = + new DirectTimelineWriter(authUgi, client, resURI); + spyTimelineWriterRef.set(spy(timelineWriter)); + return spyTimelineWriterRef.get(); + } + }; + client.init(getConfiguration()); + client.start(); + TestTimelineClient.mockEntityClientResponse(spyTimelineWriterRef.get(), + null, false, true); + try { + UserGroupInformation ugi = mock(UserGroupInformation.class); + when(ugi.getShortUserName()).thenReturn("user1"); + // verify no ClientHandlerException get thrown out. + am.publishContainerEndEvent(client, ContainerStatus.newInstance( + BuilderUtils.newContainerId(1, 1, 1, 1), ContainerState.COMPLETE, "", + 1), "domainId", ugi); + } finally { + client.stop(); + } + } + + @Test + public void testContainerLaunchFailureHandling() throws Exception { + String[] args = createArgumentsWithAppName( + "--num_containers", + "2", + "--shell_command", + getListCommand(), + "--master_memory", + "512", + "--container_memory", + "128" + ); + + LOG.info("Initializing DS Client"); + setAndGetDSClient(ContainerLaunchFailAppMaster.class.getName(), + new Configuration(getYarnClusterConfiguration())); + Assert.assertTrue(getDSClient().init(args)); + LOG.info("Running DS Client"); + Assert.assertFalse(getDSClient().run()); + } + + @Test + public void testDebugFlag() throws Exception { + String[] args = createArgumentsWithAppName( + "--num_containers", + "2", + "--shell_command", + getListCommand(), + "--master_memory", + "512", + "--master_vcores", + "2", + "--container_memory", + "128", + "--container_vcores", + "1", + "--debug" + ); + + LOG.info("Initializing DS Client"); + setAndGetDSClient(new Configuration(getYarnClusterConfiguration())); + Assert.assertTrue(getDSClient().init(args)); + LOG.info("Running DS Client"); + Assert.assertTrue(getDSClient().run()); + } + + private int verifyContainerLog(int containerNum, + List expectedContent, boolean count, String expectedWord) { + File logFolder = + new File(getNodeManager(0).getConfig() + .get(YarnConfiguration.NM_LOG_DIRS, + YarnConfiguration.DEFAULT_NM_LOG_DIRS)); + + File[] listOfFiles = logFolder.listFiles(); + Assert.assertNotNull(listOfFiles); + int currentContainerLogFileIndex = -1; + for (int i = listOfFiles.length - 1; i >= 0; i--) { + if (listOfFiles[i].listFiles().length == containerNum + 1) { + currentContainerLogFileIndex = i; + break; + } + } + Assert.assertTrue(currentContainerLogFileIndex != -1); + File[] containerFiles = + listOfFiles[currentContainerLogFileIndex].listFiles(); + + int numOfWords = 0; + for (File containerFile : containerFiles) { + if (containerFile == null) { + continue; + } + for (File output : containerFile.listFiles()) { + if (output.getName().trim().contains("stdout")) { + List stdOutContent = new ArrayList<>(); + try (BufferedReader br = new BufferedReader(new FileReader(output))) { + String sCurrentLine; + + int numOfline = 0; + while ((sCurrentLine = br.readLine()) != null) { + if (count) { + if (sCurrentLine.contains(expectedWord)) { + numOfWords++; + } + } else if (output.getName().trim().equals("stdout")) { + if (!Shell.WINDOWS) { + Assert.assertEquals("The current is" + sCurrentLine, + expectedContent.get(numOfline), sCurrentLine.trim()); + numOfline++; + } else { + stdOutContent.add(sCurrentLine.trim()); + } + } + } + /* By executing bat script using cmd /c, + * it will output all contents from bat script first + * It is hard for us to do check line by line + * Simply check whether output from bat file contains + * all the expected messages + */ + if (Shell.WINDOWS && !count + && output.getName().trim().equals("stdout")) { + Assert.assertTrue(stdOutContent.containsAll(expectedContent)); + } + } catch (IOException e) { + LOG.error("Exception reading the buffer", e); + } + } + } + } + return numOfWords; + } + + @Test + public void testDistributedShellResourceProfiles() throws Exception { + int appNameCounter = 0; + String[][] args = { + createArgsWithPostFix(appNameCounter++, + "--num_containers", "1", "--shell_command", + getListCommand(), "--container_resource_profile", + "maximum"), + createArgsWithPostFix(appNameCounter++, + "--num_containers", "1", "--shell_command", + getListCommand(), "--master_resource_profile", + "default"), + createArgsWithPostFix(appNameCounter++, + "--num_containers", "1", "--shell_command", + getListCommand(), "--master_resource_profile", + "default", "--container_resource_profile", "maximum"), + }; + + for (int i = 0; i < args.length; ++i) { + LOG.info("Initializing DS Client[{}]", i); + setAndGetDSClient(new Configuration(getYarnClusterConfiguration())); + Assert.assertTrue(getDSClient().init(args[i])); + LOG.info("Running DS Client[{}]", i); + LambdaTestUtils.intercept(Exception.class, + () -> getDSClient().run()); + } + } + + @Test + public void testDSShellWithOpportunisticContainers() throws Exception { + setAndGetDSClient(new Configuration(getYarnClusterConfiguration())); + + String[] args = createArgumentsWithAppName( + "--num_containers", + "2", + "--master_memory", + "512", + "--master_vcores", + "2", + "--container_memory", + "128", + "--container_vcores", + "1", + "--shell_command", + "date", + "--container_type", + "OPPORTUNISTIC" + ); + assertTrue(getDSClient().init(args)); + assertTrue(getDSClient().run()); + } + + @Test(expected = ResourceNotFoundException.class) + public void testDistributedShellAMResourcesWithUnknownResource() + throws Exception { + String[] args = createArgumentsWithAppName( + "--num_containers", + "1", + "--shell_command", + getListCommand(), + "--master_resources", + "unknown-resource=5" + ); + setAndGetDSClient(new Configuration(getYarnClusterConfiguration())); + assertTrue(getDSClient().init(args)); + getDSClient().run(); + } + + @Test(expected = IllegalArgumentException.class) + public void testDistributedShellNonExistentQueue() + throws Exception { + String[] args = createArgumentsWithAppName( + "--num_containers", + "1", + "--shell_command", + getListCommand(), + "--queue", + "non-existent-queue" + ); + setAndGetDSClient(new Configuration(getYarnClusterConfiguration())); + assertTrue(getDSClient().init(args)); + getDSClient().run(); + } + + @Test + public void testDistributedShellWithSingleFileLocalization() + throws Exception { + String[] args = createArgumentsWithAppName( + "--num_containers", + "1", + "--shell_command", + getCatCommand(), + "--localize_files", + "./src/test/resources/a.txt", + "--shell_args", + "a.txt" + ); + + setAndGetDSClient(new Configuration(getYarnClusterConfiguration())); + assertTrue(getDSClient().init(args)); + assertTrue("Client exited with an error", getDSClient().run()); + } + + @Test + public void testDistributedShellWithMultiFileLocalization() + throws Exception { + String[] args = createArgumentsWithAppName( + "--num_containers", + "1", + "--shell_command", + getCatCommand(), + "--localize_files", + "./src/test/resources/a.txt,./src/test/resources/b.txt", + "--shell_args", + "a.txt b.txt" + ); + + setAndGetDSClient(new Configuration(getYarnClusterConfiguration())); + assertTrue(getDSClient().init(args)); + assertTrue("Client exited with an error", getDSClient().run()); + } + + @Test(expected = UncheckedIOException.class) + public void testDistributedShellWithNonExistentFileLocalization() + throws Exception { + String[] args = createArgumentsWithAppName( + "--num_containers", + "1", + "--shell_command", + getCatCommand(), + "--localize_files", + "/non/existing/path/file.txt", + "--shell_args", + "file.txt" + ); + + setAndGetDSClient(new Configuration(getYarnClusterConfiguration())); + assertTrue(getDSClient().init(args)); + assertTrue(getDSClient().run()); + } + + @Test + public void testDistributedShellCleanup() + throws Exception { + String[] args = createArgumentsWithAppName( + "--num_containers", + "1", + "--shell_command", + getListCommand() + ); + Configuration config = new Configuration(getYarnClusterConfiguration()); + setAndGetDSClient(config); + + assertTrue(getDSClient().init(args)); + assertTrue(getDSClient().run()); + ApplicationId appId = getDSClient().getAppId(); + String relativePath = + ApplicationMaster.getRelativePath(generateAppName(), + appId.toString(), ""); + FileSystem fs1 = FileSystem.get(config); + Path path = new Path(fs1.getHomeDirectory(), relativePath); + + GenericTestUtils.waitFor(() -> { + try { + return !fs1.exists(path); + } catch (IOException e) { + return false; + } + }, 10, 60000); + + assertFalse("Distributed Shell Cleanup failed", fs1.exists(path)); + } + + @Override + protected void customizeConfiguration( + YarnConfiguration config) throws Exception { + config.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, + CommonConfigurationKeysPublic.FS_DEFAULT_NAME_DEFAULT); + } + + private static File getBaseDirForTest() { + return new File("target", TestDSTimelineV10.class.getName()); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSTimelineV15.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSTimelineV15.java new file mode 100644 index 00000000000..634bac4df43 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSTimelineV15.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.applications.distributedshell; + +import java.util.concurrent.atomic.AtomicReference; + +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.timeline.PluginStoreTestUtils; + +/** + * Unit tests implementations for distributed shell on TimeLineV1.5. + */ +public class TestDSTimelineV15 extends DistributedShellBaseTest { + private static final Logger LOG = + LoggerFactory.getLogger(TestDSTimelineV15.class); + + @Override + protected float getTimelineVersion() { + return 1.5f; + } + + @Override + protected void customizeConfiguration( + YarnConfiguration config) throws Exception { + setUpHDFSCluster(); + PluginStoreTestUtils.prepareFileSystemForPluginStore( + getHDFSCluster().getFileSystem()); + PluginStoreTestUtils.prepareConfiguration(config, getHDFSCluster()); + config.set(YarnConfiguration.TIMELINE_SERVICE_ENTITY_GROUP_PLUGIN_CLASSES, + DistributedShellTimelinePlugin.class.getName()); + } + + @Override + protected void checkTimeline(ApplicationId appId, + boolean defaultFlow, boolean haveDomain, + ApplicationReport appReport) throws Exception { + long scanInterval = getConfiguration().getLong( + YarnConfiguration + .TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SCAN_INTERVAL_SECONDS, + YarnConfiguration + .TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SCAN_INTERVAL_SECONDS_DEFAULT + ); + Path doneDir = new Path( + YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_DONE_DIR_DEFAULT + ); + // Wait till the data is moved to done dir, or timeout and fail + AtomicReference exceptionRef = new AtomicReference<>(null); + GenericTestUtils.waitFor(() -> { + try { + RemoteIterator iterApps = + getHDFSCluster().getFileSystem().listStatusIterator(doneDir); + return (iterApps.hasNext()); + } catch (Exception e) { + exceptionRef.set(e); + LOG.error("Exception listing Done Dir", e); + return true; + } + }, scanInterval * 2, TEST_TIME_WINDOW_EXPIRE); + Assert.assertNull("Exception in getting listing status", + exceptionRef.get()); + super.checkTimeline(appId, defaultFlow, haveDomain, appReport); + } + + @Test + public void testDSShellWithDomain() throws Exception { + baseTestDSShell(true); + } + + @Test + public void testDSShellWithoutDomain() throws Exception { + baseTestDSShell(false); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSTimelineV20.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSTimelineV20.java new file mode 100644 index 00000000000..caf9d3b8de7 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSTimelineV20.java @@ -0,0 +1,484 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.applications.distributedshell; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import org.junit.Assert; +import org.junit.Assume; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerReport; +import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; +import org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster.DSEvent; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants; +import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; +import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants; +import org.apache.hadoop.yarn.server.timelineservice.collector.PerNodeTimelineCollectorsAuxService; +import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineReaderImpl; +import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl; +import org.apache.hadoop.yarn.util.timeline.TimelineUtils; + +/** + * Unit tests implementations for distributed shell on TimeLineV2. + */ +public class TestDSTimelineV20 extends DistributedShellBaseTest { + private static final Logger LOG = + LoggerFactory.getLogger(TestDSTimelineV20.class); + private static final String TIMELINE_AUX_SERVICE_NAME = "timeline_collector"; + + @Override + protected float getTimelineVersion() { + return 2.0f; + } + + @Override + protected void customizeConfiguration( + YarnConfiguration config) throws Exception { + // disable v1 timeline server since we no longer have a server here + // enable aux-service based timeline aggregators + config.set(YarnConfiguration.NM_AUX_SERVICES, TIMELINE_AUX_SERVICE_NAME); + config.set(YarnConfiguration.NM_AUX_SERVICES + "." + + TIMELINE_AUX_SERVICE_NAME + ".class", + PerNodeTimelineCollectorsAuxService.class.getName()); + config.setClass(YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS, + FileSystemTimelineWriterImpl.class, + org.apache.hadoop.yarn.server.timelineservice.storage. + TimelineWriter.class); + setTimelineV2StorageDir(); + // set the file system timeline writer storage directory + config.set(FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_DIR_ROOT, + getTimelineV2StorageDir()); + } + + @Test + public void testDSShellWithEnforceExecutionType() throws Exception { + YarnClient yarnClient = null; + AtomicReference thrownError = new AtomicReference<>(null); + AtomicReference> containersListRef = + new AtomicReference<>(null); + AtomicReference appAttemptIdRef = + new AtomicReference<>(null); + AtomicReference appAttemptReportRef = + new AtomicReference<>(null); + String[] args = createArgumentsWithAppName( + "--num_containers", + "2", + "--master_memory", + "512", + "--master_vcores", + "2", + "--container_memory", + "128", + "--container_vcores", + "1", + "--shell_command", + getListCommand(), + "--container_type", + "OPPORTUNISTIC", + "--enforce_execution_type" + ); + try { + setAndGetDSClient(new Configuration(getYarnClusterConfiguration())); + getDSClient().init(args); + Thread dsClientRunner = new Thread(() -> { + try { + getDSClient().run(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + dsClientRunner.start(); + + yarnClient = YarnClient.createYarnClient(); + yarnClient.init(new Configuration(getYarnClusterConfiguration())); + yarnClient.start(); + + // expecting three containers including the AM container. + waitForContainersLaunch(yarnClient, 3, appAttemptReportRef, + containersListRef, appAttemptIdRef, thrownError); + if (thrownError.get() != null) { + Assert.fail(thrownError.get().getMessage()); + } + ContainerId amContainerId = appAttemptReportRef.get().getAMContainerId(); + for (ContainerReport container : containersListRef.get()) { + if (!container.getContainerId().equals(amContainerId)) { + Assert.assertEquals(container.getExecutionType(), + ExecutionType.OPPORTUNISTIC); + } + } + } catch (Exception e) { + LOG.error("Job execution with enforce execution type failed.", e); + Assert.fail("Exception. " + e.getMessage()); + } finally { + if (yarnClient != null) { + yarnClient.stop(); + } + } + } + + @Test + public void testDistributedShellWithResources() throws Exception { + doTestDistributedShellWithResources(false); + } + + @Test + public void testDistributedShellWithResourcesWithLargeContainers() + throws Exception { + doTestDistributedShellWithResources(true); + } + + private void doTestDistributedShellWithResources(boolean largeContainers) + throws Exception { + AtomicReference thrownExceptionRef = + new AtomicReference<>(null); + AtomicReference> containersListRef = + new AtomicReference<>(null); + AtomicReference appAttemptIdRef = + new AtomicReference<>(null); + AtomicReference appAttemptReportRef = + new AtomicReference<>(null); + Resource clusterResource = getYarnCluster().getResourceManager() + .getResourceScheduler().getClusterResource(); + String masterMemoryString = "1 Gi"; + String containerMemoryString = "512 Mi"; + long[] memVars = {1024, 512}; + YarnClient yarnClient = null; + Assume.assumeTrue("The cluster doesn't have enough memory for this test", + clusterResource.getMemorySize() >= memVars[0] + memVars[1]); + Assume.assumeTrue("The cluster doesn't have enough cores for this test", + clusterResource.getVirtualCores() >= 2); + if (largeContainers) { + memVars[0] = clusterResource.getMemorySize() * 2 / 3; + memVars[0] = memVars[0] - memVars[0] % MIN_ALLOCATION_MB; + masterMemoryString = memVars[0] + "Mi"; + memVars[1] = clusterResource.getMemorySize() / 3; + memVars[1] = memVars[1] - memVars[1] % MIN_ALLOCATION_MB; + containerMemoryString = String.valueOf(memVars[1]); + } + + String[] args = createArgumentsWithAppName( + "--num_containers", + "2", + "--shell_command", + getListCommand(), + "--master_resources", + "memory=" + masterMemoryString + ",vcores=1", + "--container_resources", + "memory=" + containerMemoryString + ",vcores=1" + ); + + LOG.info("Initializing DS Client"); + setAndGetDSClient(new Configuration(getYarnClusterConfiguration())); + Assert.assertTrue(getDSClient().init(args)); + LOG.info("Running DS Client"); + final AtomicBoolean result = new AtomicBoolean(false); + Thread dsClientRunner = new Thread(() -> { + try { + result.set(getDSClient().run()); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + dsClientRunner.start(); + try { + yarnClient = YarnClient.createYarnClient(); + yarnClient.init(new Configuration(getYarnClusterConfiguration())); + yarnClient.start(); + // expecting two containers. + waitForContainersLaunch(yarnClient, 2, appAttemptReportRef, + containersListRef, appAttemptIdRef, thrownExceptionRef); + if (thrownExceptionRef.get() != null) { + Assert.fail(thrownExceptionRef.get().getMessage()); + } + ContainerId amContainerId = appAttemptReportRef.get().getAMContainerId(); + ContainerReport report = yarnClient.getContainerReport(amContainerId); + Resource masterResource = report.getAllocatedResource(); + Assert.assertEquals(memVars[0], masterResource.getMemorySize()); + Assert.assertEquals(1, masterResource.getVirtualCores()); + for (ContainerReport container : containersListRef.get()) { + if (!container.getContainerId().equals(amContainerId)) { + Resource containerResource = container.getAllocatedResource(); + Assert.assertEquals(memVars[1], + containerResource.getMemorySize()); + Assert.assertEquals(1, containerResource.getVirtualCores()); + } + } + } finally { + LOG.info("Signaling Client to Stop"); + if (yarnClient != null) { + LOG.info("Stopping yarnClient service"); + yarnClient.stop(); + } + } + } + + @Test + public void testDSShellWithoutDomain() throws Exception { + baseTestDSShell(false); + } + + @Test + public void testDSShellWithoutDomainDefaultFlow() throws Exception { + baseTestDSShell(false, true); + } + + @Test + public void testDSShellWithoutDomainCustomizedFlow() throws Exception { + baseTestDSShell(false, false); + } + + @Override + protected String[] appendFlowArgsForTestDSShell(String[] args, + boolean defaultFlow) { + if (!defaultFlow) { + String[] flowArgs = { + "--flow_name", + "test_flow_name", + "--flow_version", + "test_flow_version", + "--flow_run_id", + "12345678" + }; + args = mergeArgs(args, flowArgs); + } + return args; + } + + @Override + protected void checkTimeline(ApplicationId appId, boolean defaultFlow, + boolean haveDomain, ApplicationReport appReport) throws Exception { + LOG.info("Started {}#checkTimeline()", getClass().getCanonicalName()); + // For PoC check using the file-based timeline writer (YARN-3264) + String tmpRoot = getTimelineV2StorageDir() + File.separator + "entities" + + File.separator; + + File tmpRootFolder = new File(tmpRoot); + try { + Assert.assertTrue(tmpRootFolder.isDirectory()); + String basePath = tmpRoot + + YarnConfiguration.DEFAULT_RM_CLUSTER_ID + File.separator + + UserGroupInformation.getCurrentUser().getShortUserName() + + (defaultFlow ? + File.separator + appReport.getName() + File.separator + + TimelineUtils.DEFAULT_FLOW_VERSION + File.separator + + appReport.getStartTime() + File.separator : + File.separator + "test_flow_name" + File.separator + + "test_flow_version" + File.separator + "12345678" + + File.separator) + + appId.toString(); + LOG.info("basePath for appId {}: {}", appId, basePath); + // for this test, we expect DS_APP_ATTEMPT AND DS_CONTAINER dirs + + // Verify DS_APP_ATTEMPT entities posted by the client + // there will be at least one attempt, look for that file + String appTimestampFileName = + String.format("appattempt_%d_000%d_000001%s", + appId.getClusterTimestamp(), appId.getId(), + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION); + File dsAppAttemptEntityFile = verifyEntityTypeFileExists(basePath, + "DS_APP_ATTEMPT", appTimestampFileName); + // Check if required events are published and same idprefix is sent for + // on each publish. + verifyEntityForTimeline(dsAppAttemptEntityFile, + DSEvent.DS_APP_ATTEMPT_START.toString(), 1, 1, 0, true); + // to avoid race condition of testcase, at least check 40 times with + // sleep of 50ms + verifyEntityForTimeline(dsAppAttemptEntityFile, + DSEvent.DS_APP_ATTEMPT_END.toString(), 1, 40, 50, true); + + // Verify DS_CONTAINER entities posted by the client. + String containerTimestampFileName = + String.format("container_%d_000%d_01_000002.thist", + appId.getClusterTimestamp(), appId.getId()); + File dsContainerEntityFile = verifyEntityTypeFileExists(basePath, + "DS_CONTAINER", containerTimestampFileName); + // Check if required events are published and same idprefix is sent for + // on each publish. + verifyEntityForTimeline(dsContainerEntityFile, + DSEvent.DS_CONTAINER_START.toString(), 1, 1, 0, true); + // to avoid race condition of testcase, at least check 40 times with + // sleep of 50ms. + verifyEntityForTimeline(dsContainerEntityFile, + DSEvent.DS_CONTAINER_END.toString(), 1, 40, 50, true); + + // Verify NM posting container metrics info. + String containerMetricsTimestampFileName = + String.format("container_%d_000%d_01_000001%s", + appId.getClusterTimestamp(), appId.getId(), + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION); + File containerEntityFile = verifyEntityTypeFileExists(basePath, + TimelineEntityType.YARN_CONTAINER.toString(), + containerMetricsTimestampFileName); + verifyEntityForTimeline(containerEntityFile, + ContainerMetricsConstants.CREATED_EVENT_TYPE, 1, 1, 0, true); + + // to avoid race condition of testcase, at least check 40 times with + // sleep of 50ms + verifyEntityForTimeline(containerEntityFile, + ContainerMetricsConstants.FINISHED_EVENT_TYPE, 1, 40, 50, true); + + // Verify RM posting Application life cycle Events are getting published + String appMetricsTimestampFileName = + String.format("application_%d_000%d%s", + appId.getClusterTimestamp(), appId.getId(), + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION); + File appEntityFile = + verifyEntityTypeFileExists(basePath, + TimelineEntityType.YARN_APPLICATION.toString(), + appMetricsTimestampFileName); + // No need to check idprefix for app. + verifyEntityForTimeline(appEntityFile, + ApplicationMetricsConstants.CREATED_EVENT_TYPE, 1, 1, 0, false); + + // to avoid race condition of testcase, at least check 40 times with + // sleep of 50ms + verifyEntityForTimeline(appEntityFile, + ApplicationMetricsConstants.FINISHED_EVENT_TYPE, 1, 40, 50, false); + + // Verify RM posting AppAttempt life cycle Events are getting published + String appAttemptMetricsTimestampFileName = + String.format("appattempt_%d_000%d_000001%s", + appId.getClusterTimestamp(), appId.getId(), + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION); + + File appAttemptEntityFile = + verifyEntityTypeFileExists(basePath, + TimelineEntityType.YARN_APPLICATION_ATTEMPT.toString(), + appAttemptMetricsTimestampFileName); + verifyEntityForTimeline(appAttemptEntityFile, + AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE, 1, 1, 0, true); + verifyEntityForTimeline(appAttemptEntityFile, + AppAttemptMetricsConstants.FINISHED_EVENT_TYPE, 1, 1, 0, true); + } finally { + try { + FileUtils.deleteDirectory(tmpRootFolder.getParentFile()); + } catch (Exception ex) { + // the recursive delete can throw an exception when one of the file + // does not exist. + LOG.warn("Exception deleting a file/subDirectory: {}", ex.getMessage()); + } + } + } + + /** + * Checks the events and idprefix published for an entity. + * + * @param entityFile Entity file. + * @param expectedEvent Expected event Id. + * @param numOfExpectedEvent Number of expected occurrences of expected event + * id. + * @param checkTimes Number of times to check. + * @param sleepTime Sleep time for each iteration. + * @param checkIdPrefix Whether to check idprefix. + * @throws IOException if entity file reading fails. + * @throws InterruptedException if sleep is interrupted. + */ + private void verifyEntityForTimeline(File entityFile, String expectedEvent, + long numOfExpectedEvent, int checkTimes, long sleepTime, + boolean checkIdPrefix) throws Exception { + AtomicReference thrownExceptionRef = new AtomicReference<>(null); + GenericTestUtils.waitFor(() -> { + String strLine; + long actualCount = 0; + long idPrefix = -1; + try (BufferedReader reader = + new BufferedReader(new FileReader(entityFile))) { + while ((strLine = reader.readLine()) != null) { + String entityLine = strLine.trim(); + if (entityLine.isEmpty()) { + continue; + } + if (entityLine.contains(expectedEvent)) { + actualCount++; + } + if (expectedEvent.equals(DSEvent.DS_CONTAINER_END.toString()) + && entityLine.contains(expectedEvent)) { + TimelineEntity entity = FileSystemTimelineReaderImpl. + getTimelineRecordFromJSON(entityLine, TimelineEntity.class); + TimelineEvent event = entity.getEvents().pollFirst(); + Assert.assertNotNull(event); + Assert.assertTrue("diagnostics", + event.getInfo().containsKey(ApplicationMaster.DIAGNOSTICS)); + } + if (checkIdPrefix) { + TimelineEntity entity = FileSystemTimelineReaderImpl. + getTimelineRecordFromJSON(entityLine, TimelineEntity.class); + Assert.assertTrue("Entity ID prefix expected to be > 0", + entity.getIdPrefix() > 0); + if (idPrefix == -1) { + idPrefix = entity.getIdPrefix(); + } else { + Assert.assertEquals( + "Entity ID prefix should be same across each publish of " + + "same entity", idPrefix, entity.getIdPrefix()); + } + } + } + } catch (Throwable e) { + LOG.error("Exception is waiting on application report", e); + thrownExceptionRef.set(e); + return true; + } + return (numOfExpectedEvent == actualCount); + }, sleepTime, (checkTimes + 1) * sleepTime); + + if (thrownExceptionRef.get() != null) { + Assert.fail("verifyEntityForTimeline failed " + + thrownExceptionRef.get().getMessage()); + } + } + + private File verifyEntityTypeFileExists(String basePath, String entityType, + String entityFileName) { + String outputDirPathForEntity = + basePath + File.separator + entityType + File.separator; + LOG.info("verifyEntityTypeFileExists output path for entityType {}: {}", + entityType, outputDirPathForEntity); + File outputDirForEntity = new File(outputDirPathForEntity); + Assert.assertTrue(outputDirForEntity.isDirectory()); + String entityFilePath = outputDirPathForEntity + entityFileName; + File entityFile = new File(entityFilePath); + Assert.assertTrue(entityFile.exists()); + return entityFile; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSWithMultipleNodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSWithMultipleNodeManager.java index 00942195846..51afe83491a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSWithMultipleNodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSWithMultipleNodeManager.java @@ -17,74 +17,108 @@ */ package org.apache.hadoop.yarn.applications.distributedshell; + import java.io.IOException; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.Arrays; import java.util.HashSet; -import java.util.Set; import java.util.Iterator; +import java.util.Set; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; +import org.junit.rules.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.util.Shell; -import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.nodemanager.NodeManager; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; public class TestDSWithMultipleNodeManager { private static final Logger LOG = LoggerFactory.getLogger(TestDSWithMultipleNodeManager.class); - static final int NUM_NMS = 2; - TestDistributedShell distShellTest; + private static final int NUM_NMS = 2; + @Rule + public TestName name = new TestName(); + @Rule + public Timeout globalTimeout = + new Timeout(DistributedShellBaseTest.TEST_TIME_OUT, + TimeUnit.MILLISECONDS); + private DistributedShellBaseTest distShellTest; + private Client dsClient; + + @BeforeClass + public static void setupUnitTests() throws Exception { + TestDSTimelineV10.setupUnitTests(); + } + + @AfterClass + public static void tearDownUnitTests() throws Exception { + TestDSTimelineV10.tearDownUnitTests(); + } @Before public void setup() throws Exception { - distShellTest = new TestDistributedShell(); - distShellTest.setupInternal(NUM_NMS); + distShellTest = new TestDSTimelineV10(); + distShellTest.setupInternal(NUM_NMS, new YarnConfiguration()); } @After public void tearDown() throws Exception { - distShellTest.tearDown(); + if (dsClient != null) { + dsClient.sendStopSignal(); + dsClient = null; + } + if (distShellTest != null) { + distShellTest.tearDown(); + distShellTest = null; + } } private void initializeNodeLabels() throws IOException { - RMContext rmContext = distShellTest.yarnCluster.getResourceManager(0).getRMContext(); - + RMContext rmContext = distShellTest.getResourceManager(0).getRMContext(); // Setup node labels RMNodeLabelsManager labelsMgr = rmContext.getNodeLabelManager(); - Set labels = new HashSet(); + Set labels = new HashSet<>(); labels.add("x"); labelsMgr.addToCluserNodeLabelsWithDefaultExclusivity(labels); // Setup queue access to node labels - distShellTest.conf.set(PREFIX + "root.accessible-node-labels", "x"); - distShellTest.conf.set(PREFIX + "root.accessible-node-labels.x.capacity", - "100"); - distShellTest.conf.set(PREFIX + "root.default.accessible-node-labels", "x"); - distShellTest.conf.set(PREFIX + distShellTest.setConfiguration(PREFIX + "root.accessible-node-labels", "x"); + distShellTest.setConfiguration( + PREFIX + "root.accessible-node-labels.x.capacity", "100"); + distShellTest.setConfiguration( + PREFIX + "root.default.accessible-node-labels", "x"); + distShellTest.setConfiguration(PREFIX + "root.default.accessible-node-labels.x.capacity", "100"); - rmContext.getScheduler().reinitialize(distShellTest.conf, rmContext); + rmContext.getScheduler().reinitialize(distShellTest.getConfiguration(), + rmContext); // Fetch node-ids from yarn cluster NodeId[] nodeIds = new NodeId[NUM_NMS]; for (int i = 0; i < NUM_NMS; i++) { - NodeManager mgr = distShellTest.yarnCluster.getNodeManager(i); + NodeManager mgr = distShellTest.getNodeManager(i); nodeIds[i] = mgr.getNMContext().getNodeId(); } @@ -92,264 +126,312 @@ public class TestDSWithMultipleNodeManager { labelsMgr.addLabelsToNode(ImmutableMap.of(nodeIds[1], labels)); } - @Test(timeout=90000) + @Test public void testDSShellWithNodeLabelExpression() throws Exception { + NMContainerMonitor containerMonitorRunner = null; initializeNodeLabels(); - // Start NMContainerMonitor - NMContainerMonitor mon = new NMContainerMonitor(); - Thread t = new Thread(mon); - t.start(); + try { + // Start NMContainerMonitor + containerMonitorRunner = new NMContainerMonitor(); + containerMonitorRunner.start(); - // Submit a job which will sleep for 60 sec - String[] args = { - "--jar", - TestDistributedShell.APPMASTER_JAR, - "--num_containers", - "4", - "--shell_command", - "sleep", - "--shell_args", - "15", - "--master_memory", - "512", - "--master_vcores", - "2", - "--container_memory", - "128", - "--container_vcores", - "1", - "--node_label_expression", - "x" - }; + // Submit a job which will sleep for 60 sec + String[] args = + DistributedShellBaseTest.createArguments(() -> generateAppName(), + "--num_containers", + "4", + "--shell_command", + "sleep", + "--shell_args", + "15", + "--master_memory", + "512", + "--master_vcores", + "2", + "--container_memory", + "128", + "--container_vcores", + "1", + "--node_label_expression", + "x" + ); - LOG.info("Initializing DS Client"); - final Client client = - new Client(new Configuration(distShellTest.yarnCluster.getConfig())); - boolean initSuccess = client.init(args); - Assert.assertTrue(initSuccess); - LOG.info("Running DS Client"); - boolean result = client.run(); - LOG.info("Client run completed. Result=" + result); + LOG.info("Initializing DS Client"); + dsClient = + new Client( + new Configuration(distShellTest.getYarnClusterConfiguration())); + Assert.assertTrue(dsClient.init(args)); + LOG.info("Running DS Client"); + boolean result = dsClient.run(); + LOG.info("Client run completed. Result={}", result); - t.interrupt(); + containerMonitorRunner.stopMonitoring(); - // Check maximum number of containers on each NMs - int[] maxRunningContainersOnNMs = mon.getMaxRunningContainersReport(); - // Check no container allocated on NM[0] - Assert.assertEquals(0, maxRunningContainersOnNMs[0]); - // Check there're some containers allocated on NM[1] - Assert.assertTrue(maxRunningContainersOnNMs[1] > 0); + // Check maximum number of containers on each NMs + int[] maxRunningContainersOnNMs = + containerMonitorRunner.getMaxRunningContainersReport(); + // Check no container allocated on NM[0] + Assert.assertEquals(0, maxRunningContainersOnNMs[0]); + // Check there are some containers allocated on NM[1] + Assert.assertTrue(maxRunningContainersOnNMs[1] > 0); + } finally { + if (containerMonitorRunner != null) { + containerMonitorRunner.stopMonitoring(); + containerMonitorRunner.join(); + } + } } - @Test(timeout = 90000) + @Test public void testDistributedShellWithPlacementConstraint() throws Exception { - NMContainerMonitor mon = new NMContainerMonitor(); - Thread t = new Thread(mon); - t.start(); + NMContainerMonitor containerMonitorRunner = null; + String[] args = + DistributedShellBaseTest.createArguments(() -> generateAppName(), + "1", + "--shell_command", + DistributedShellBaseTest.getSleepCommand(15), + "--placement_spec", + "zk(1),NOTIN,NODE,zk:spark(1),NOTIN,NODE,zk" + ); + try { + containerMonitorRunner = new NMContainerMonitor(); + containerMonitorRunner.start(); - String[] args = { - "--jar", - distShellTest.APPMASTER_JAR, - "1", - "--shell_command", - distShellTest.getSleepCommand(15), - "--placement_spec", - "zk(1),NOTIN,NODE,zk:spark(1),NOTIN,NODE,zk" - }; - LOG.info("Initializing DS Client"); - final Client client = - new Client(new Configuration(distShellTest.yarnCluster.getConfig())); - boolean initSuccess = client.init(args); - Assert.assertTrue(initSuccess); - LOG.info("Running DS Client"); - boolean result = client.run(); - LOG.info("Client run completed. Result=" + result); + LOG.info("Initializing DS Client with args {}", Arrays.toString(args)); + dsClient = + new Client( + new Configuration(distShellTest.getYarnClusterConfiguration())); + Assert.assertTrue(dsClient.init(args)); + LOG.info("Running DS Client"); + boolean result = dsClient.run(); + LOG.info("Client run completed. Result={}", result); - t.interrupt(); + containerMonitorRunner.stopMonitoring(); - ConcurrentMap apps = distShellTest.yarnCluster. - getResourceManager().getRMContext().getRMApps(); - RMApp app = apps.values().iterator().next(); - RMAppAttempt appAttempt = app.getAppAttempts().values().iterator().next(); - NodeId masterNodeId = appAttempt.getMasterContainer().getNodeId(); - NodeManager nm1 = distShellTest.yarnCluster.getNodeManager(0); + ConcurrentMap apps = + distShellTest.getResourceManager().getRMContext().getRMApps(); + RMApp app = apps.values().iterator().next(); + RMAppAttempt appAttempt = app.getAppAttempts().values().iterator().next(); + NodeId masterNodeId = appAttempt.getMasterContainer().getNodeId(); + NodeManager nm1 = distShellTest.getNodeManager(0); - int expectedNM1Count = 1; - int expectedNM2Count = 1; - if (nm1.getNMContext().getNodeId().equals(masterNodeId)) { - expectedNM1Count++; - } else { - expectedNM2Count++; + int[] expectedNMsCount = new int[]{1, 1}; + if (nm1.getNMContext().getNodeId().equals(masterNodeId)) { + expectedNMsCount[0]++; + } else { + expectedNMsCount[1]++; + } + + int[] maxRunningContainersOnNMs = + containerMonitorRunner.getMaxRunningContainersReport(); + Assert.assertEquals(expectedNMsCount[0], maxRunningContainersOnNMs[0]); + Assert.assertEquals(expectedNMsCount[1], maxRunningContainersOnNMs[1]); + } finally { + if (containerMonitorRunner != null) { + containerMonitorRunner.stopMonitoring(); + containerMonitorRunner.join(); + } } - - int[] maxRunningContainersOnNMs = mon.getMaxRunningContainersReport(); - Assert.assertEquals(expectedNM1Count, maxRunningContainersOnNMs[0]); - Assert.assertEquals(expectedNM2Count, maxRunningContainersOnNMs[1]); } - @Test(timeout = 90000) + @Test public void testDistributedShellWithAllocationTagNamespace() throws Exception { - NMContainerMonitor mon = new NMContainerMonitor(); - Thread monitorThread = new Thread(mon); - monitorThread.start(); + NMContainerMonitor containerMonitorRunner = null; + Client clientB = null; + YarnClient yarnClient = null; - String[] argsA = { - "--jar", - distShellTest.APPMASTER_JAR, - "--shell_command", - distShellTest.getSleepCommand(30), - "--placement_spec", - "bar(1),notin,node,bar" - }; - final Client clientA = - new Client(new Configuration(distShellTest.yarnCluster.getConfig())); - clientA.init(argsA); - final AtomicBoolean resultA = new AtomicBoolean(false); - Thread t = new Thread() { - public void run() { + String[] argsA = + DistributedShellBaseTest.createArguments(() -> generateAppName("001"), + "--shell_command", + DistributedShellBaseTest.getSleepCommand(30), + "--placement_spec", + "bar(1),notin,node,bar" + ); + String[] argsB = + DistributedShellBaseTest.createArguments(() -> generateAppName("002"), + "1", + "--shell_command", + DistributedShellBaseTest.getListCommand(), + "--placement_spec", + "foo(3),notin,node,all/bar" + ); + + try { + containerMonitorRunner = new NMContainerMonitor(); + containerMonitorRunner.start(); + dsClient = + new Client( + new Configuration(distShellTest.getYarnClusterConfiguration())); + dsClient.init(argsA); + Thread dsClientRunner = new Thread(() -> { try { - resultA.set(clientA.run()); + dsClient.run(); } catch (Exception e) { throw new RuntimeException(e); } + }); + dsClientRunner.start(); + + NodeId taskContainerNodeIdA; + ConcurrentMap apps; + AtomicReference appARef = new AtomicReference<>(null); + AtomicReference masterContainerNodeIdARef = + new AtomicReference<>(null); + int[] expectedNMCounts = new int[]{0, 0}; + + waitForExpectedNMsCount(expectedNMCounts, appARef, + masterContainerNodeIdARef); + + NodeId nodeA = distShellTest.getNodeManager(0).getNMContext(). + getNodeId(); + NodeId nodeB = distShellTest.getNodeManager(1).getNMContext(). + getNodeId(); + Assert.assertEquals(2, (expectedNMCounts[0] + expectedNMCounts[1])); + if (expectedNMCounts[0] != expectedNMCounts[1]) { + taskContainerNodeIdA = masterContainerNodeIdARef.get(); + } else { + taskContainerNodeIdA = + masterContainerNodeIdARef.get().equals(nodeA) ? nodeB : nodeA; } - }; - t.start(); - NodeId masterContainerNodeIdA; - NodeId taskContainerNodeIdA; - ConcurrentMap apps; - RMApp appA; - - int expectedNM1Count = 0; - int expectedNM2Count = 0; - while (true) { - if ((expectedNM1Count + expectedNM2Count) < 2) { - expectedNM1Count = distShellTest.yarnCluster.getNodeManager(0). - getNMContext().getContainers().size(); - expectedNM2Count = distShellTest.yarnCluster.getNodeManager(1). - getNMContext().getContainers().size(); - continue; + clientB = + new Client( + new Configuration(distShellTest.getYarnClusterConfiguration())); + clientB.init(argsB); + Assert.assertTrue(clientB.run()); + containerMonitorRunner.stopMonitoring(); + apps = distShellTest.getResourceManager().getRMContext().getRMApps(); + Iterator it = apps.values().iterator(); + RMApp appB = it.next(); + if (appARef.get().equals(appB)) { + appB = it.next(); } - apps = distShellTest.yarnCluster.getResourceManager().getRMContext(). - getRMApps(); - if (apps.isEmpty()) { - Thread.sleep(10); - continue; + LOG.info("Allocation Tag NameSpace Applications are={} and {}", + appARef.get().getApplicationId(), appB.getApplicationId()); + + RMAppAttempt appAttemptB = + appB.getAppAttempts().values().iterator().next(); + NodeId masterContainerNodeIdB = + appAttemptB.getMasterContainer().getNodeId(); + + if (nodeA.equals(masterContainerNodeIdB)) { + expectedNMCounts[0]++; + } else { + expectedNMCounts[1]++; } - appA = apps.values().iterator().next(); - if (appA.getAppAttempts().isEmpty()) { - Thread.sleep(10); - continue; + if (nodeA.equals(taskContainerNodeIdA)) { + expectedNMCounts[1] += 3; + } else { + expectedNMCounts[0] += 3; } - RMAppAttempt appAttemptA = appA.getAppAttempts().values().iterator(). - next(); - if (appAttemptA.getMasterContainer() == null) { - Thread.sleep(10); - continue; + int[] maxRunningContainersOnNMs = + containerMonitorRunner.getMaxRunningContainersReport(); + Assert.assertEquals(expectedNMCounts[0], maxRunningContainersOnNMs[0]); + Assert.assertEquals(expectedNMCounts[1], maxRunningContainersOnNMs[1]); + + try { + yarnClient = YarnClient.createYarnClient(); + yarnClient.init( + new Configuration(distShellTest.getYarnClusterConfiguration())); + yarnClient.start(); + yarnClient.killApplication(appARef.get().getApplicationId()); + } catch (Exception e) { + // Ignore Exception while killing a job + LOG.warn("Exception killing the job: {}", e.getMessage()); + } + } finally { + if (yarnClient != null) { + yarnClient.stop(); + } + if (clientB != null) { + clientB.sendStopSignal(); + } + if (containerMonitorRunner != null) { + containerMonitorRunner.stopMonitoring(); + containerMonitorRunner.join(); } - masterContainerNodeIdA = appAttemptA.getMasterContainer().getNodeId(); - break; - } - - NodeId nodeA = distShellTest.yarnCluster.getNodeManager(0).getNMContext(). - getNodeId(); - NodeId nodeB = distShellTest.yarnCluster.getNodeManager(1).getNMContext(). - getNodeId(); - Assert.assertEquals(2, (expectedNM1Count + expectedNM2Count)); - - if (expectedNM1Count != expectedNM2Count) { - taskContainerNodeIdA = masterContainerNodeIdA; - } else { - taskContainerNodeIdA = masterContainerNodeIdA.equals(nodeA) ? nodeB : - nodeA; - } - - String[] argsB = { - "--jar", - distShellTest.APPMASTER_JAR, - "1", - "--shell_command", - Shell.WINDOWS ? "dir" : "ls", - "--placement_spec", - "foo(3),notin,node,all/bar" - }; - final Client clientB = new Client(new Configuration(distShellTest. - yarnCluster.getConfig())); - clientB.init(argsB); - boolean resultB = clientB.run(); - Assert.assertTrue(resultB); - - monitorThread.interrupt(); - apps = distShellTest.yarnCluster.getResourceManager().getRMContext(). - getRMApps(); - Iterator it = apps.values().iterator(); - RMApp appB = it.next(); - if (appA.equals(appB)) { - appB = it.next(); - } - LOG.info("Allocation Tag NameSpace Applications are=" + appA. - getApplicationId() + " and " + appB.getApplicationId()); - - RMAppAttempt appAttemptB = appB.getAppAttempts().values().iterator(). - next(); - NodeId masterContainerNodeIdB = appAttemptB.getMasterContainer(). - getNodeId(); - - if (nodeA.equals(masterContainerNodeIdB)) { - expectedNM1Count += 1; - } else { - expectedNM2Count += 1; - } - if (nodeA.equals(taskContainerNodeIdA)) { - expectedNM2Count += 3; - } else { - expectedNM1Count += 3; - } - int[] maxRunningContainersOnNMs = mon.getMaxRunningContainersReport(); - Assert.assertEquals(expectedNM1Count, maxRunningContainersOnNMs[0]); - Assert.assertEquals(expectedNM2Count, maxRunningContainersOnNMs[1]); - - try { - YarnClient yarnClient = YarnClient.createYarnClient(); - yarnClient.init(new Configuration(distShellTest.yarnCluster. - getConfig())); - yarnClient.start(); - yarnClient.killApplication(appA.getApplicationId()); - } catch (Exception e) { - // Ignore Exception while killing a job } } + protected String generateAppName() { + return generateAppName(null); + } + + protected String generateAppName(String postFix) { + return name.getMethodName().replaceFirst("test", "") + .concat(postFix == null ? "" : "-" + postFix); + } + + private void waitForExpectedNMsCount(int[] expectedNMCounts, + AtomicReference appARef, + AtomicReference masterContainerNodeIdARef) throws Exception { + GenericTestUtils.waitFor(() -> { + if ((expectedNMCounts[0] + expectedNMCounts[1]) < 2) { + expectedNMCounts[0] = + distShellTest.getNodeManager(0).getNMContext() + .getContainers().size(); + expectedNMCounts[1] = + distShellTest.getNodeManager(1).getNMContext() + .getContainers().size(); + return false; + } + ConcurrentMap appIDsMap = + distShellTest.getResourceManager().getRMContext().getRMApps(); + if (appIDsMap.isEmpty()) { + return false; + } + appARef.set(appIDsMap.values().iterator().next()); + if (appARef.get().getAppAttempts().isEmpty()) { + return false; + } + RMAppAttempt appAttemptA = + appARef.get().getAppAttempts().values().iterator().next(); + if (appAttemptA.getMasterContainer() == null) { + return false; + } + masterContainerNodeIdARef.set( + appAttemptA.getMasterContainer().getNodeId()); + return true; + }, 10, 60000); + } + /** - * Monitor containers running on NMs + * Monitor containers running on NMs. */ - class NMContainerMonitor implements Runnable { + class NMContainerMonitor extends Thread { // The interval of milliseconds of sampling (500ms) - final static int SAMPLING_INTERVAL_MS = 500; + private final static int SAMPLING_INTERVAL_MS = 500; // The maximum number of containers running on each NMs - int[] maxRunningContainersOnNMs = new int[NUM_NMS]; + private final int[] maxRunningContainersOnNMs = new int[NUM_NMS]; + private final Object quitSignal = new Object(); + private volatile boolean isRunning = true; @Override public void run() { - while (true) { + while (isRunning) { for (int i = 0; i < NUM_NMS; i++) { int nContainers = - distShellTest.yarnCluster.getNodeManager(i).getNMContext() + distShellTest.getNodeManager(i).getNMContext() .getContainers().size(); if (nContainers > maxRunningContainersOnNMs[i]) { maxRunningContainersOnNMs[i] = nContainers; } } - try { - Thread.sleep(SAMPLING_INTERVAL_MS); - } catch (InterruptedException e) { - e.printStackTrace(); - break; + synchronized (quitSignal) { + try { + if (!isRunning) { + break; + } + quitSignal.wait(SAMPLING_INTERVAL_MS); + } catch (InterruptedException e) { + LOG.warn("NMContainerMonitor interrupted"); + isRunning = false; + break; + } } } } @@ -357,5 +439,15 @@ public class TestDSWithMultipleNodeManager { public int[] getMaxRunningContainersReport() { return maxRunningContainersOnNMs; } + + public void stopMonitoring() { + if (!isRunning) { + return; + } + synchronized (quitSignal) { + isRunning = false; + quitSignal.notifyAll(); + } + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java deleted file mode 100644 index 4222fd0b566..00000000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java +++ /dev/null @@ -1,1862 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.applications.distributedshell; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.when; - -import java.io.BufferedReader; -import java.io.ByteArrayOutputStream; -import java.io.File; -import java.io.FileNotFoundException; -import java.io.FileOutputStream; -import java.io.FileReader; -import java.io.IOException; -import java.io.OutputStream; -import java.io.PrintWriter; -import java.io.UncheckedIOException; -import java.net.URI; -import java.net.URL; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - -import java.util.function.Supplier; -import org.apache.commons.cli.MissingArgumentException; -import org.apache.commons.io.FileUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.CommonConfigurationKeysPublic; -import org.apache.hadoop.fs.FileContext; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.RemoteIterator; -import org.apache.hadoop.hdfs.HdfsConfiguration; -import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.hadoop.net.ServerSocketUtil; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.test.GenericTestUtils; -import org.apache.hadoop.util.JarFinder; -import org.apache.hadoop.util.Shell; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.ApplicationReport; -import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerReport; -import org.apache.hadoop.yarn.api.records.ContainerState; -import org.apache.hadoop.yarn.api.records.ContainerStatus; -import org.apache.hadoop.yarn.api.records.ExecutionType; -import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; -import org.apache.hadoop.yarn.api.records.LogAggregationContext; -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.YarnApplicationState; -import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain; -import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; -import org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster.DSEvent; -import org.apache.hadoop.yarn.client.api.YarnClient; -import org.apache.hadoop.yarn.client.api.impl.DirectTimelineWriter; -import org.apache.hadoop.yarn.client.api.impl.TestTimelineClient; -import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl; -import org.apache.hadoop.yarn.client.api.impl.TimelineWriter; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.exceptions.ResourceNotFoundException; -import org.apache.hadoop.yarn.server.MiniYARNCluster; -import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants; -import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; -import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants; -import org.apache.hadoop.yarn.server.resourcemanager.RMContext; -import org.apache.hadoop.yarn.server.timeline.NameValuePair; -import org.apache.hadoop.yarn.server.timeline.PluginStoreTestUtils; -import org.apache.hadoop.yarn.server.timeline.TimelineVersion; -import org.apache.hadoop.yarn.server.timeline.TimelineVersionWatcher; -import org.apache.hadoop.yarn.server.timelineservice.collector.PerNodeTimelineCollectorsAuxService; -import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineReaderImpl; -import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl; -import org.apache.hadoop.yarn.server.utils.BuilderUtils; -import org.apache.hadoop.yarn.util.LinuxResourceCalculatorPlugin; -import org.apache.hadoop.yarn.util.ProcfsBasedProcessTree; -import org.apache.hadoop.yarn.util.Records; -import org.apache.hadoop.yarn.util.timeline.TimelineUtils; -import org.junit.After; -import org.junit.Assert; -import org.junit.Assume; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.junit.rules.TestName; -import org.junit.rules.Timeout; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class TestDistributedShell { - - private static final Logger LOG = - LoggerFactory.getLogger(TestDistributedShell.class); - - protected MiniYARNCluster yarnCluster = null; - protected MiniDFSCluster hdfsCluster = null; - private FileSystem fs = null; - private TimelineWriter spyTimelineWriter; - protected YarnConfiguration conf = null; - // location of the filesystem timeline writer for timeline service v.2 - private String timelineV2StorageDir = null; - private static final int NUM_NMS = 1; - private static final float DEFAULT_TIMELINE_VERSION = 1.0f; - private static final String TIMELINE_AUX_SERVICE_NAME = "timeline_collector"; - private static final int MIN_ALLOCATION_MB = 128; - private static final int TEST_TIME_OUT = 150000; - // set the timeout of the yarnClient to be 95% of the globalTimeout. - private static final int TEST_TIME_WINDOW_EXPIRE = (TEST_TIME_OUT * 90) / 100; - - protected final static String APPMASTER_JAR = - JarFinder.getJar(ApplicationMaster.class); - - @Rule - public TimelineVersionWatcher timelineVersionWatcher - = new TimelineVersionWatcher(); - - @Rule - public Timeout globalTimeout = new Timeout(TEST_TIME_OUT, - TimeUnit.MILLISECONDS); - - @Rule - public TemporaryFolder tmpFolder = new TemporaryFolder(); - - @Rule - public TestName name = new TestName(); - - // set the timeout of the yarnClient to be 95% of the globalTimeout. - private final String yarnClientTimeout = - String.valueOf(TEST_TIME_WINDOW_EXPIRE); - - private final String[] commonArgs = { - "--jar", - APPMASTER_JAR, - "--timeout", - yarnClientTimeout, - "--appname", - "" - }; - - @Before - public void setup() throws Exception { - setupInternal(NUM_NMS, timelineVersionWatcher.getTimelineVersion()); - } - - protected void setupInternal(int numNodeManager) throws Exception { - setupInternal(numNodeManager, DEFAULT_TIMELINE_VERSION); - } - - private void setupInternal(int numNodeManager, float timelineVersion) - throws Exception { - LOG.info("Starting up YARN cluster"); - - conf = new YarnConfiguration(); - conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, - MIN_ALLOCATION_MB); - // reduce the teardown waiting time - conf.setLong(YarnConfiguration.DISPATCHER_DRAIN_EVENTS_TIMEOUT, 1000); - conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 500); - conf.set("yarn.log.dir", "target"); - conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); - // mark if we need to launch the v1 timeline server - // disable aux-service based timeline aggregators - conf.set(YarnConfiguration.NM_AUX_SERVICES, ""); - conf.setBoolean(YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED, true); - - conf.set(YarnConfiguration.NM_VMEM_PMEM_RATIO, "8"); - conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true); - conf.set("mapreduce.jobhistory.address", - "0.0.0.0:" + ServerSocketUtil.getPort(10021, 10)); - // Enable ContainersMonitorImpl - conf.set(YarnConfiguration.NM_CONTAINER_MON_RESOURCE_CALCULATOR, - LinuxResourceCalculatorPlugin.class.getName()); - conf.set(YarnConfiguration.NM_CONTAINER_MON_PROCESS_TREE, - ProcfsBasedProcessTree.class.getName()); - conf.setBoolean(YarnConfiguration.NM_PMEM_CHECK_ENABLED, true); - conf.setBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED, true); - conf.setBoolean( - YarnConfiguration.YARN_MINICLUSTER_CONTROL_RESOURCE_MONITORING, - true); - conf.setBoolean(YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_ENABLED, - true); - conf.setBoolean( - YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true); - conf.setInt(YarnConfiguration.NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH, - 10); - conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER, - YarnConfiguration.PROCESSOR_RM_PLACEMENT_CONSTRAINTS_HANDLER); - // ATS version specific settings - if (timelineVersion == 1.0f) { - conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 1.0f); - conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, - CommonConfigurationKeysPublic.FS_DEFAULT_NAME_DEFAULT); - } else if (timelineVersion == 1.5f) { - HdfsConfiguration hdfsConfig = new HdfsConfiguration(); - hdfsCluster = new MiniDFSCluster.Builder(hdfsConfig) - .numDataNodes(1).build(); - hdfsCluster.waitActive(); - fs = hdfsCluster.getFileSystem(); - PluginStoreTestUtils.prepareFileSystemForPluginStore(fs); - PluginStoreTestUtils.prepareConfiguration(conf, hdfsCluster); - conf.set(YarnConfiguration.TIMELINE_SERVICE_ENTITY_GROUP_PLUGIN_CLASSES, - DistributedShellTimelinePlugin.class.getName()); - } else if (timelineVersion == 2.0f) { - // set version to 2 - conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f); - // disable v1 timeline server since we no longer have a server here - // enable aux-service based timeline aggregators - conf.set(YarnConfiguration.NM_AUX_SERVICES, TIMELINE_AUX_SERVICE_NAME); - conf.set(YarnConfiguration.NM_AUX_SERVICES + "." + - TIMELINE_AUX_SERVICE_NAME + ".class", - PerNodeTimelineCollectorsAuxService.class.getName()); - conf.setClass(YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS, - FileSystemTimelineWriterImpl.class, - org.apache.hadoop.yarn.server.timelineservice.storage. - TimelineWriter.class); - timelineV2StorageDir = tmpFolder.newFolder().getAbsolutePath(); - // set the file system timeline writer storage directory - conf.set(FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_DIR_ROOT, - timelineV2StorageDir); - } else { - Assert.fail("Wrong timeline version number: " + timelineVersion); - } - - yarnCluster = - new MiniYARNCluster(TestDistributedShell.class.getSimpleName(), 1, - numNodeManager, 1, 1); - yarnCluster.init(conf); - yarnCluster.start(); - - conf.set( - YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS, - MiniYARNCluster.getHostname() + ":" - + yarnCluster.getApplicationHistoryServer().getPort()); - - waitForNMsToRegister(); - - URL url = Thread.currentThread().getContextClassLoader().getResource( - "yarn-site.xml"); - if (url == null) { - throw new RuntimeException( - "Could not find 'yarn-site.xml' dummy file in classpath"); - } - Configuration yarnClusterConfig = yarnCluster.getConfig(); - yarnClusterConfig.set(YarnConfiguration.YARN_APPLICATION_CLASSPATH, - new File(url.getPath()).getParent()); - //write the document to a buffer (not directly to the file, as that - //can cause the file being written to get read -which will then fail. - ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); - yarnClusterConfig.writeXml(bytesOut); - bytesOut.close(); - //write the bytes to the file in the classpath - OutputStream os = new FileOutputStream(url.getPath()); - os.write(bytesOut.toByteArray()); - os.close(); - - FileContext fsContext = FileContext.getLocalFSFileContext(); - fsContext - .delete( - new Path(conf.get(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH)), - true); - try { - Thread.sleep(2000); - } catch (InterruptedException e) { - LOG.info("setup thread sleep interrupted. message=" + e.getMessage()); - } - } - - @After - public void tearDown() throws IOException { - FileContext fsContext = FileContext.getLocalFSFileContext(); - fsContext - .delete( - new Path(conf.get(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH)), - true); - if (yarnCluster != null) { - try { - yarnCluster.stop(); - } finally { - yarnCluster = null; - } - } - if (hdfsCluster != null) { - try { - hdfsCluster.shutdown(); - } finally { - hdfsCluster = null; - } - } - } - - @Test - public void testDSShellWithDomain() throws Exception { - testDSShell(true); - } - - @Test - public void testDSShellWithoutDomain() throws Exception { - testDSShell(false); - } - - @Test - @TimelineVersion(1.5f) - public void testDSShellWithoutDomainV1_5() throws Exception { - testDSShell(false); - } - - @Test - @TimelineVersion(1.5f) - public void testDSShellWithDomainV1_5() throws Exception { - testDSShell(true); - } - - @Test - @TimelineVersion(2.0f) - public void testDSShellWithoutDomainV2() throws Exception { - testDSShell(false); - } - - public void testDSShell(boolean haveDomain) throws Exception { - testDSShell(haveDomain, true); - } - - @Test - @TimelineVersion(2.0f) - public void testDSShellWithoutDomainV2DefaultFlow() throws Exception { - testDSShell(false, true); - } - - @Test - @TimelineVersion(2.0f) - public void testDSShellWithoutDomainV2CustomizedFlow() throws Exception { - testDSShell(false, false); - } - - public void testDSShell(boolean haveDomain, boolean defaultFlow) - throws Exception { - String[] args = createArguments( - "--num_containers", - "2", - "--shell_command", - Shell.WINDOWS ? "dir" : "ls", - "--master_memory", - "512", - "--master_vcores", - "2", - "--container_memory", - "128", - "--container_vcores", - "1"); - - if (haveDomain) { - String[] domainArgs = { - "--domain", - "TEST_DOMAIN", - "--view_acls", - "reader_user reader_group", - "--modify_acls", - "writer_user writer_group", - "--create" - }; - args = mergeArgs(args, domainArgs); - } - boolean isTestingTimelineV2 = false; - if (timelineVersionWatcher.getTimelineVersion() == 2.0f) { - isTestingTimelineV2 = true; - if (!defaultFlow) { - String[] flowArgs = { - "--flow_name", - "test_flow_name", - "--flow_version", - "test_flow_version", - "--flow_run_id", - "12345678" - }; - args = mergeArgs(args, flowArgs); - } - LOG.info("Setup: Using timeline v2!"); - } - - LOG.info("Initializing DS Client"); - YarnClient yarnClient; - final Client client = new Client(new Configuration(yarnCluster.getConfig())); - boolean initSuccess = client.init(args); - Assert.assertTrue(initSuccess); - LOG.info("Running DS Client"); - final AtomicBoolean result = new AtomicBoolean(false); - Thread t = new Thread() { - public void run() { - try { - result.set(client.run()); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - }; - t.start(); - - yarnClient = YarnClient.createYarnClient(); - yarnClient.init(new Configuration(yarnCluster.getConfig())); - yarnClient.start(); - - boolean verified = false; - String errorMessage = ""; - ApplicationId appId = null; - ApplicationReport appReport = null; - while (!verified) { - List apps = yarnClient.getApplications(); - if (apps.size() == 0) { - Thread.sleep(10); - continue; - } - appReport = apps.get(0); - appId = appReport.getApplicationId(); - if (appReport.getHost().equals("N/A")) { - Thread.sleep(10); - continue; - } - errorMessage = - "'. Expected rpc port to be '-1', was '" - + appReport.getRpcPort() + "'."; - if (appReport.getRpcPort() == -1) { - verified = true; - } - - if (appReport.getYarnApplicationState() == YarnApplicationState.FINISHED - && appReport.getFinalApplicationStatus() != - FinalApplicationStatus.UNDEFINED) { - break; - } - } - Assert.assertTrue(errorMessage, verified); - t.join(); - LOG.info("Client run completed for testDSShell. Result=" + result); - Assert.assertTrue(result.get()); - - if (timelineVersionWatcher.getTimelineVersion() == 1.5f) { - long scanInterval = conf.getLong( - YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SCAN_INTERVAL_SECONDS, - YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SCAN_INTERVAL_SECONDS_DEFAULT - ); - Path doneDir = new Path( - YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_DONE_DIR_DEFAULT - ); - // Wait till the data is moved to done dir, or timeout and fail - while (true) { - RemoteIterator iterApps = fs.listStatusIterator(doneDir); - if (iterApps.hasNext()) { - break; - } - Thread.sleep(scanInterval * 2); - } - } - - if (!isTestingTimelineV2) { - checkTimelineV1(haveDomain); - } else { - checkTimelineV2(appId, defaultFlow, appReport); - } - } - - private void checkTimelineV1(boolean haveDomain) throws Exception { - TimelineDomain domain = null; - if (haveDomain) { - domain = yarnCluster.getApplicationHistoryServer() - .getTimelineStore().getDomain("TEST_DOMAIN"); - Assert.assertNotNull(domain); - Assert.assertEquals("reader_user reader_group", domain.getReaders()); - Assert.assertEquals("writer_user writer_group", domain.getWriters()); - } - TimelineEntities entitiesAttempts = yarnCluster - .getApplicationHistoryServer() - .getTimelineStore() - .getEntities(ApplicationMaster.DSEntity.DS_APP_ATTEMPT.toString(), - null, null, null, null, null, null, null, null, null); - Assert.assertNotNull(entitiesAttempts); - Assert.assertEquals(1, entitiesAttempts.getEntities().size()); - Assert.assertEquals(2, entitiesAttempts.getEntities().get(0).getEvents() - .size()); - Assert.assertEquals(entitiesAttempts.getEntities().get(0).getEntityType(), - ApplicationMaster.DSEntity.DS_APP_ATTEMPT.toString()); - if (haveDomain) { - Assert.assertEquals(domain.getId(), - entitiesAttempts.getEntities().get(0).getDomainId()); - } else { - Assert.assertEquals("DEFAULT", - entitiesAttempts.getEntities().get(0).getDomainId()); - } - String currAttemptEntityId - = entitiesAttempts.getEntities().get(0).getEntityId(); - ApplicationAttemptId attemptId = ApplicationAttemptId.fromString( - currAttemptEntityId); - NameValuePair primaryFilter = new NameValuePair( - ApplicationMaster.APPID_TIMELINE_FILTER_NAME, - attemptId.getApplicationId().toString()); - TimelineEntities entities = yarnCluster - .getApplicationHistoryServer() - .getTimelineStore() - .getEntities(ApplicationMaster.DSEntity.DS_CONTAINER.toString(), null, - null, null, null, null, primaryFilter, null, null, null); - Assert.assertNotNull(entities); - Assert.assertEquals(2, entities.getEntities().size()); - Assert.assertEquals(entities.getEntities().get(0).getEntityType(), - ApplicationMaster.DSEntity.DS_CONTAINER.toString()); - - String entityId = entities.getEntities().get(0).getEntityId(); - org.apache.hadoop.yarn.api.records.timeline.TimelineEntity entity = - yarnCluster.getApplicationHistoryServer().getTimelineStore() - .getEntity(entityId, - ApplicationMaster.DSEntity.DS_CONTAINER.toString(), null); - Assert.assertNotNull(entity); - Assert.assertEquals(entityId, entity.getEntityId()); - - if (haveDomain) { - Assert.assertEquals(domain.getId(), - entities.getEntities().get(0).getDomainId()); - } else { - Assert.assertEquals("DEFAULT", - entities.getEntities().get(0).getDomainId()); - } - } - - private void checkTimelineV2(ApplicationId appId, - boolean defaultFlow, ApplicationReport appReport) throws Exception { - LOG.info("Started checkTimelineV2 "); - // For PoC check using the file-based timeline writer (YARN-3264) - String tmpRoot = timelineV2StorageDir + File.separator + "entities" + - File.separator; - - File tmpRootFolder = new File(tmpRoot); - try { - Assert.assertTrue(tmpRootFolder.isDirectory()); - String basePath = tmpRoot + - YarnConfiguration.DEFAULT_RM_CLUSTER_ID + File.separator + - UserGroupInformation.getCurrentUser().getShortUserName() + - (defaultFlow ? - File.separator + appReport.getName() + File.separator + - TimelineUtils.DEFAULT_FLOW_VERSION + File.separator + - appReport.getStartTime() + File.separator : - File.separator + "test_flow_name" + File.separator + - "test_flow_version" + File.separator + "12345678" + - File.separator) + - appId.toString(); - LOG.info("basePath: " + basePath); - // for this test, we expect DS_APP_ATTEMPT AND DS_CONTAINER dirs - - // Verify DS_APP_ATTEMPT entities posted by the client - // there will be at least one attempt, look for that file - String appTimestampFileName = - "appattempt_" + appId.getClusterTimestamp() + "_000" + appId.getId() - + "_000001" - + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION; - File dsAppAttemptEntityFile = verifyEntityTypeFileExists(basePath, - "DS_APP_ATTEMPT", appTimestampFileName); - // Check if required events are published and same idprefix is sent for - // on each publish. - verifyEntityForTimelineV2(dsAppAttemptEntityFile, - DSEvent.DS_APP_ATTEMPT_START.toString(), 1, 1, 0, true); - // to avoid race condition of testcase, atleast check 40 times with sleep - // of 50ms - verifyEntityForTimelineV2(dsAppAttemptEntityFile, - DSEvent.DS_APP_ATTEMPT_END.toString(), 1, 40, 50, true); - - // Verify DS_CONTAINER entities posted by the client. - String containerTimestampFileName = - "container_" + appId.getClusterTimestamp() + "_000" + appId.getId() - + "_01_000002.thist"; - File dsContainerEntityFile = verifyEntityTypeFileExists(basePath, - "DS_CONTAINER", containerTimestampFileName); - // Check if required events are published and same idprefix is sent for - // on each publish. - verifyEntityForTimelineV2(dsContainerEntityFile, - DSEvent.DS_CONTAINER_START.toString(), 1, 1, 0, true); - // to avoid race condition of testcase, atleast check 40 times with sleep - // of 50ms - verifyEntityForTimelineV2(dsContainerEntityFile, - DSEvent.DS_CONTAINER_END.toString(), 1, 40, 50, true); - - // Verify NM posting container metrics info. - String containerMetricsTimestampFileName = - "container_" + appId.getClusterTimestamp() + "_000" + appId.getId() - + "_01_000001" - + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION; - File containerEntityFile = verifyEntityTypeFileExists(basePath, - TimelineEntityType.YARN_CONTAINER.toString(), - containerMetricsTimestampFileName); - verifyEntityForTimelineV2(containerEntityFile, - ContainerMetricsConstants.CREATED_EVENT_TYPE, 1, 1, 0, true); - - // to avoid race condition of testcase, atleast check 40 times with sleep - // of 50ms - verifyEntityForTimelineV2(containerEntityFile, - ContainerMetricsConstants.FINISHED_EVENT_TYPE, 1, 40, 50, true); - - // Verify RM posting Application life cycle Events are getting published - String appMetricsTimestampFileName = - "application_" + appId.getClusterTimestamp() + "_000" + appId.getId() - + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION; - File appEntityFile = - verifyEntityTypeFileExists(basePath, - TimelineEntityType.YARN_APPLICATION.toString(), - appMetricsTimestampFileName); - // No need to check idprefix for app. - verifyEntityForTimelineV2(appEntityFile, - ApplicationMetricsConstants.CREATED_EVENT_TYPE, 1, 1, 0, false); - - // to avoid race condition of testcase, atleast check 40 times with sleep - // of 50ms - verifyEntityForTimelineV2(appEntityFile, - ApplicationMetricsConstants.FINISHED_EVENT_TYPE, 1, 40, 50, false); - - // Verify RM posting AppAttempt life cycle Events are getting published - String appAttemptMetricsTimestampFileName = - "appattempt_" + appId.getClusterTimestamp() + "_000" + appId.getId() - + "_000001" - + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION; - File appAttemptEntityFile = - verifyEntityTypeFileExists(basePath, - TimelineEntityType.YARN_APPLICATION_ATTEMPT.toString(), - appAttemptMetricsTimestampFileName); - verifyEntityForTimelineV2(appAttemptEntityFile, - AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE, 1, 1, 0, true); - verifyEntityForTimelineV2(appAttemptEntityFile, - AppAttemptMetricsConstants.FINISHED_EVENT_TYPE, 1, 1, 0, true); - } finally { - try { - FileUtils.deleteDirectory(tmpRootFolder.getParentFile()); - } catch (FileNotFoundException ex) { - // the recursive delete can throw an exception when one of the file - // does not exist. - LOG.warn("Exception deleting a file/subDirectory: {}", ex.getMessage()); - } - } - } - - private File verifyEntityTypeFileExists(String basePath, String entityType, - String entityfileName) { - String outputDirPathForEntity = - basePath + File.separator + entityType + File.separator; - LOG.info(outputDirPathForEntity); - File outputDirForEntity = new File(outputDirPathForEntity); - Assert.assertTrue(outputDirForEntity.isDirectory()); - - String entityFilePath = outputDirPathForEntity + entityfileName; - - File entityFile = new File(entityFilePath); - Assert.assertTrue(entityFile.exists()); - return entityFile; - } - - /** - * Checks the events and idprefix published for an entity. - * - * @param entityFile Entity file. - * @param expectedEvent Expected event Id. - * @param numOfExpectedEvent Number of expected occurences of expected event - * id. - * @param checkTimes Number of times to check. - * @param sleepTime Sleep time for each iteration. - * @param checkIdPrefix Whether to check idprefix. - * @throws IOException if entity file reading fails. - * @throws InterruptedException if sleep is interrupted. - */ - private void verifyEntityForTimelineV2(File entityFile, String expectedEvent, - long numOfExpectedEvent, int checkTimes, long sleepTime, - boolean checkIdPrefix) throws IOException, InterruptedException { - long actualCount = 0; - for (int i = 0; i < checkTimes; i++) { - BufferedReader reader = null; - String strLine; - actualCount = 0; - try { - reader = new BufferedReader(new FileReader(entityFile)); - long idPrefix = -1; - while ((strLine = reader.readLine()) != null) { - String entityLine = strLine.trim(); - if (entityLine.isEmpty()) { - continue; - } - if (entityLine.contains(expectedEvent)) { - actualCount++; - } - if (expectedEvent.equals(DSEvent.DS_CONTAINER_END.toString()) && - entityLine.contains(expectedEvent)) { - TimelineEntity entity = FileSystemTimelineReaderImpl. - getTimelineRecordFromJSON(entityLine, TimelineEntity.class); - TimelineEvent event = entity.getEvents().pollFirst(); - Assert.assertNotNull(event); - Assert.assertTrue("diagnostics", - event.getInfo().containsKey(ApplicationMaster.DIAGNOSTICS)); - } - if (checkIdPrefix) { - TimelineEntity entity = FileSystemTimelineReaderImpl. - getTimelineRecordFromJSON(entityLine, TimelineEntity.class); - Assert.assertTrue("Entity ID prefix expected to be > 0", - entity.getIdPrefix() > 0); - if (idPrefix == -1) { - idPrefix = entity.getIdPrefix(); - } else { - Assert.assertEquals("Entity ID prefix should be same across " + - "each publish of same entity", - idPrefix, entity.getIdPrefix()); - } - } - } - } finally { - if (reader != null) { - reader.close(); - } - } - if (numOfExpectedEvent == actualCount) { - break; - } - if (sleepTime > 0 && i < checkTimes - 1) { - Thread.sleep(sleepTime); - } - } - Assert.assertEquals("Unexpected number of " + expectedEvent + - " event published.", numOfExpectedEvent, actualCount); - } - - /** - * Utility function to merge two String arrays to form a new String array for - * our argumemts. - * - * @param args the first set of the arguments. - * @param newArgs the second set of the arguments. - * @return a String array consists of {args, newArgs} - */ - private String[] mergeArgs(String[] args, String[] newArgs) { - int length = args.length + newArgs.length; - String[] result = new String[length]; - System.arraycopy(args, 0, result, 0, args.length); - System.arraycopy(newArgs, 0, result, args.length, newArgs.length); - return result; - } - - private String generateAppName(String postFix) { - return name.getMethodName().replaceFirst("test", "") - .concat(postFix == null? "" : "-" + postFix); - } - - private String[] createArguments(String... args) { - String[] res = mergeArgs(commonArgs, args); - // set the application name so we can track down which command is running. - res[commonArgs.length - 1] = generateAppName(null); - return res; - } - - private String[] createArgsWithPostFix(int index, String... args) { - String[] res = mergeArgs(commonArgs, args); - // set the application name so we can track down which command is running. - res[commonArgs.length - 1] = generateAppName(String.valueOf(index)); - return res; - } - - protected String getSleepCommand(int sec) { - // Windows doesn't have a sleep command, ping -n does the trick - return Shell.WINDOWS ? "ping -n " + (sec + 1) + " 127.0.0.1 >nul" - : "sleep " + sec; - } - - @Test - public void testDSRestartWithPreviousRunningContainers() throws Exception { - String[] args = createArguments( - "--num_containers", - "1", - "--shell_command", - getSleepCommand(8), - "--master_memory", - "512", - "--container_memory", - "128", - "--keep_containers_across_application_attempts" - ); - - LOG.info("Initializing DS Client"); - Client client = new Client(TestDSFailedAppMaster.class.getName(), - new Configuration(yarnCluster.getConfig())); - - client.init(args); - - LOG.info("Running DS Client"); - boolean result = client.run(); - LOG.info("Client run completed. Result=" + result); - // application should succeed - Assert.assertTrue(result); - } - - /* - * The sleeping period in TestDSSleepingAppMaster is set as 5 seconds. - * Set attempt_failures_validity_interval as 2.5 seconds. It will check - * how many attempt failures for previous 2.5 seconds. - * The application is expected to be successful. - */ - @Test - public void testDSAttemptFailuresValidityIntervalSucess() throws Exception { - String[] args = createArguments( - "--num_containers", - "1", - "--shell_command", - getSleepCommand(8), - "--master_memory", - "512", - "--container_memory", - "128", - "--attempt_failures_validity_interval", - "2500" - ); - - LOG.info("Initializing DS Client"); - Configuration config = yarnCluster.getConfig(); - config.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); - Client client = new Client(TestDSSleepingAppMaster.class.getName(), - new Configuration(config)); - - client.init(args); - - LOG.info("Running DS Client"); - boolean result = client.run(); - - LOG.info("Client run completed. Result=" + result); - // application should succeed - Assert.assertTrue(result); - } - - /* - * The sleeping period in TestDSSleepingAppMaster is set as 5 seconds. - * Set attempt_failures_validity_interval as 15 seconds. It will check - * how many attempt failure for previous 15 seconds. - * The application is expected to be fail. - */ - @Test - public void testDSAttemptFailuresValidityIntervalFailed() throws Exception { - String[] args = createArguments( - "--num_containers", - "1", - "--shell_command", - getSleepCommand(8), - "--master_memory", - "512", - "--container_memory", - "128", - "--attempt_failures_validity_interval", - "15000" - ); - - LOG.info("Initializing DS Client"); - Configuration config = yarnCluster.getConfig(); - config.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); - Client client = new Client(TestDSSleepingAppMaster.class.getName(), - new Configuration(config)); - - client.init(args); - - LOG.info("Running DS Client"); - boolean result = client.run(); - - LOG.info("Client run completed. Result=" + result); - // application should be failed - Assert.assertFalse(result); - } - - @Test - public void testDSShellWithCustomLogPropertyFile() throws Exception { - final File basedir = - new File("target", TestDistributedShell.class.getName()); - final File tmpDir = new File(basedir, "tmpDir"); - tmpDir.mkdirs(); - final File customLogProperty = new File(tmpDir, "custom_log4j.properties"); - if (customLogProperty.exists()) { - customLogProperty.delete(); - } - if(!customLogProperty.createNewFile()) { - Assert.fail("Can not create custom log4j property file."); - } - PrintWriter fileWriter = new PrintWriter(customLogProperty); - // set the output to DEBUG level - fileWriter.write("log4j.rootLogger=debug,stdout"); - fileWriter.close(); - String[] args = createArguments( - "--num_containers", - "3", - "--shell_command", - "echo", - "--shell_args", - "HADOOP", - "--log_properties", - customLogProperty.getAbsolutePath(), - "--master_memory", - "512", - "--master_vcores", - "2", - "--container_memory", - "128", - "--container_vcores", - "1" - ); - - //Before run the DS, the default the log level is INFO - final Logger LOG_Client = - LoggerFactory.getLogger(Client.class); - Assert.assertTrue(LOG_Client.isInfoEnabled()); - Assert.assertFalse(LOG_Client.isDebugEnabled()); - final Logger LOG_AM = LoggerFactory.getLogger(ApplicationMaster.class); - Assert.assertTrue(LOG_AM.isInfoEnabled()); - Assert.assertFalse(LOG_AM.isDebugEnabled()); - - LOG.info("Initializing DS Client"); - final Client client = - new Client(new Configuration(yarnCluster.getConfig())); - boolean initSuccess = client.init(args); - Assert.assertTrue(initSuccess); - - LOG.info("Running DS Client"); - boolean result = client.run(); - LOG.info("Client run completed. Result=" + result); - Assert.assertTrue(verifyContainerLog(3, null, true, "DEBUG") > 10); - //After DS is finished, the log level should be DEBUG - Assert.assertTrue(LOG_Client.isInfoEnabled()); - Assert.assertTrue(LOG_Client.isDebugEnabled()); - Assert.assertTrue(LOG_AM.isInfoEnabled()); - Assert.assertTrue(LOG_AM.isDebugEnabled()); - } - - @Test - public void testSpecifyingLogAggregationContext() throws Exception { - String regex = ".*(foo|bar)\\d"; - String[] args = createArguments( - "--shell_command", - "echo", - "--rolling_log_pattern", - regex - ); - final Client client = - new Client(new Configuration(yarnCluster.getConfig())); - Assert.assertTrue(client.init(args)); - - ApplicationSubmissionContext context = - Records.newRecord(ApplicationSubmissionContext.class); - client.specifyLogAggregationContext(context); - LogAggregationContext logContext = context.getLogAggregationContext(); - assertEquals(logContext.getRolledLogsIncludePattern(), regex); - assertTrue(logContext.getRolledLogsExcludePattern().isEmpty()); - } - - public void testDSShellWithCommands() throws Exception { - - String[] args = createArguments( - "--num_containers", - "2", - "--shell_command", - "\"echo output_ignored;echo output_expected\"", - "--master_memory", - "512", - "--master_vcores", - "2", - "--container_memory", - "128", - "--container_vcores", - "1" - ); - - LOG.info("Initializing DS Client"); - final Client client = - new Client(new Configuration(yarnCluster.getConfig())); - boolean initSuccess = client.init(args); - Assert.assertTrue(initSuccess); - LOG.info("Running DS Client"); - try { - boolean result = client.run(); - LOG.info("Client run completed. Result=" + result); - List expectedContent = new ArrayList<>(); - expectedContent.add("output_expected"); - verifyContainerLog(2, expectedContent, false, ""); - } finally { - client.sendStopSignal(); - } - } - - @Test - public void testDSShellWithMultipleArgs() throws Exception { - String[] args = createArguments( - "--num_containers", - "4", - "--shell_command", - "echo", - "--shell_args", - "HADOOP YARN MAPREDUCE HDFS", - "--master_memory", - "512", - "--master_vcores", - "2", - "--container_memory", - "128", - "--container_vcores", - "1" - ); - - LOG.info("Initializing DS Client"); - final Client client = - new Client(new Configuration(yarnCluster.getConfig())); - boolean initSuccess = client.init(args); - Assert.assertTrue(initSuccess); - LOG.info("Running DS Client"); - - boolean result = client.run(); - LOG.info("Client run completed. Result=" + result); - List expectedContent = new ArrayList<>(); - expectedContent.add("HADOOP YARN MAPREDUCE HDFS"); - verifyContainerLog(4, expectedContent, false, ""); - } - - @Test - public void testDSShellWithShellScript() throws Exception { - final File basedir = - new File("target", TestDistributedShell.class.getName()); - final File tmpDir = new File(basedir, "tmpDir"); - tmpDir.mkdirs(); - final File customShellScript = new File(tmpDir, "custom_script.sh"); - if (customShellScript.exists()) { - customShellScript.delete(); - } - if (!customShellScript.createNewFile()) { - Assert.fail("Can not create custom shell script file."); - } - PrintWriter fileWriter = new PrintWriter(customShellScript); - // set the output to DEBUG level - fileWriter.write("echo testDSShellWithShellScript"); - fileWriter.close(); - LOG.info(customShellScript.getAbsolutePath()); - String[] args = createArguments( - "--num_containers", - "1", - "--shell_script", - customShellScript.getAbsolutePath(), - "--master_memory", - "512", - "--master_vcores", - "2", - "--container_memory", - "128", - "--container_vcores", - "1" - ); - - LOG.info("Initializing DS Client"); - final Client client = - new Client(new Configuration(yarnCluster.getConfig())); - boolean initSuccess = client.init(args); - Assert.assertTrue(initSuccess); - LOG.info("Running DS Client"); - boolean result = client.run(); - LOG.info("Client run completed. Result=" + result); - List expectedContent = new ArrayList<>(); - expectedContent.add("testDSShellWithShellScript"); - verifyContainerLog(1, expectedContent, false, ""); - } - - @Test - public void testDSShellWithInvalidArgs() throws Exception { - Client client = new Client(new Configuration(yarnCluster.getConfig())); - int appNameCounter = 0; - LOG.info("Initializing DS Client with no args"); - try { - client.init(new String[]{}); - Assert.fail("Exception is expected"); - } catch (IllegalArgumentException e) { - Assert.assertTrue("The throw exception is not expected", - e.getMessage().contains("No args")); - } - - LOG.info("Initializing DS Client with no jar file"); - try { - String[] args = createArgsWithPostFix(appNameCounter++, - "--num_containers", - "2", - "--shell_command", - Shell.WINDOWS ? "dir" : "ls", - "--master_memory", - "512", - "--container_memory", - "128" - ); - String[] argsNoJar = Arrays.copyOfRange(args, 2, args.length); - client.init(argsNoJar); - Assert.fail("Exception is expected"); - } catch (IllegalArgumentException e) { - Assert.assertTrue("The throw exception is not expected", - e.getMessage().contains("No jar")); - } - - LOG.info("Initializing DS Client with no shell command"); - try { - String[] args = createArgsWithPostFix(appNameCounter++, - "--num_containers", - "2", - "--master_memory", - "512", - "--container_memory", - "128" - ); - client.init(args); - Assert.fail("Exception is expected"); - } catch (IllegalArgumentException e) { - Assert.assertTrue("The throw exception is not expected", - e.getMessage().contains("No shell command")); - } - - LOG.info("Initializing DS Client with invalid no. of containers"); - try { - String[] args = createArgsWithPostFix(appNameCounter++, - "--num_containers", - "-1", - "--shell_command", - Shell.WINDOWS ? "dir" : "ls", - "--master_memory", - "512", - "--container_memory", - "128" - ); - client.init(args); - Assert.fail("Exception is expected"); - } catch (IllegalArgumentException e) { - Assert.assertTrue("The throw exception is not expected", - e.getMessage().contains("Invalid no. of containers")); - } - - LOG.info("Initializing DS Client with invalid no. of vcores"); - try { - String[] args = createArgsWithPostFix(appNameCounter++, - "--num_containers", - "2", - "--shell_command", - Shell.WINDOWS ? "dir" : "ls", - "--master_memory", - "512", - "--master_vcores", - "-2", - "--container_memory", - "128", - "--container_vcores", - "1" - ); - client.init(args); - client.run(); - Assert.fail("Exception is expected"); - } catch (IllegalArgumentException e) { - Assert.assertTrue("The throw exception is not expected", - e.getMessage().contains("Invalid virtual cores specified")); - } - - LOG.info("Initializing DS Client with --shell_command and --shell_script"); - try { - String[] args = createArgsWithPostFix(appNameCounter++, - "--num_containers", - "2", - "--shell_command", - Shell.WINDOWS ? "dir" : "ls", - "--master_memory", - "512", - "--master_vcores", - "2", - "--container_memory", - "128", - "--container_vcores", - "1", - "--shell_script", - "test.sh" - ); - client.init(args); - Assert.fail("Exception is expected"); - } catch (IllegalArgumentException e) { - Assert.assertTrue("The throw exception is not expected", - e.getMessage().contains("Can not specify shell_command option " + - "and shell_script option at the same time")); - } - - LOG.info("Initializing DS Client without --shell_command and --shell_script"); - try { - String[] args = createArgsWithPostFix(appNameCounter++, - "--num_containers", - "2", - "--master_memory", - "512", - "--master_vcores", - "2", - "--container_memory", - "128", - "--container_vcores", - "1" - ); - client.init(args); - Assert.fail("Exception is expected"); - } catch (IllegalArgumentException e) { - Assert.assertTrue("The throw exception is not expected", - e.getMessage().contains("No shell command or shell script specified " + - "to be executed by application master")); - } - - LOG.info("Initializing DS Client with invalid container_type argument"); - try { - String[] args = createArgsWithPostFix(appNameCounter++, - "--num_containers", - "2", - "--master_memory", - "512", - "--master_vcores", - "2", - "--container_memory", - "128", - "--container_vcores", - "1", - "--shell_command", - "date", - "--container_type", - "UNSUPPORTED_TYPE" - ); - client.init(args); - Assert.fail("Exception is expected"); - } catch (IllegalArgumentException e) { - Assert.assertTrue("The throw exception is not expected", - e.getMessage().contains("Invalid container_type: UNSUPPORTED_TYPE")); - } - - try { - String[] args = createArgsWithPostFix(appNameCounter++, - "--num_containers", - "1", - "--shell_command", - Shell.WINDOWS ? "dir" : "ls", - "--master_resources", - "memory-mb=invalid" - ); - client.init(args); - Assert.fail("Exception is expected"); - } catch (IllegalArgumentException e) { - // do nothing - LOG.info("IllegalArgumentException exception is expected: {}", - e.getMessage()); - } - - try { - String[] args = createArgsWithPostFix(appNameCounter++, - "--num_containers", - "1", - "--shell_command", - Shell.WINDOWS ? "dir" : "ls", - "--master_resources" - ); - client.init(args); - Assert.fail("Exception is expected"); - } catch (MissingArgumentException e) { - // do nothing - LOG.info("MissingArgumentException exception is expected: {}", - e.getMessage()); - } - } - - @Test - public void testDSTimelineClientWithConnectionRefuse() throws Exception { - ApplicationMaster am = new ApplicationMaster(); - - TimelineClientImpl client = new TimelineClientImpl() { - @Override - protected TimelineWriter createTimelineWriter(Configuration conf, - UserGroupInformation authUgi, com.sun.jersey.api.client.Client client, - URI resURI) throws IOException { - TimelineWriter timelineWriter = - new DirectTimelineWriter(authUgi, client, resURI); - spyTimelineWriter = spy(timelineWriter); - return spyTimelineWriter; - } - }; - client.init(conf); - client.start(); - TestTimelineClient.mockEntityClientResponse(spyTimelineWriter, null, - false, true); - try { - UserGroupInformation ugi = mock(UserGroupInformation.class); - when(ugi.getShortUserName()).thenReturn("user1"); - // verify no ClientHandlerException get thrown out. - am.publishContainerEndEvent(client, ContainerStatus.newInstance( - BuilderUtils.newContainerId(1, 1, 1, 1), ContainerState.COMPLETE, "", - 1), "domainId", ugi); - } finally { - client.stop(); - } - } - - protected void waitForNMsToRegister() throws Exception { - GenericTestUtils.waitFor(new Supplier() { - @Override - public Boolean get() { - RMContext rmContext = yarnCluster.getResourceManager().getRMContext(); - return (rmContext.getRMNodes().size() >= NUM_NMS); - } - }, 100, 60000); - } - - @Test - public void testContainerLaunchFailureHandling() throws Exception { - String[] args = createArguments( - "--num_containers", - "2", - "--shell_command", - Shell.WINDOWS ? "dir" : "ls", - "--master_memory", - "512", - "--container_memory", - "128" - ); - - LOG.info("Initializing DS Client"); - Client client = new Client(ContainerLaunchFailAppMaster.class.getName(), - new Configuration(yarnCluster.getConfig())); - boolean initSuccess = client.init(args); - Assert.assertTrue(initSuccess); - LOG.info("Running DS Client"); - try { - boolean result = client.run(); - Assert.assertFalse(result); - } finally { - client.sendStopSignal(); - } - } - - @Test - public void testDebugFlag() throws Exception { - String[] args = createArguments( - "--num_containers", - "2", - "--shell_command", - Shell.WINDOWS ? "dir" : "ls", - "--master_memory", - "512", - "--master_vcores", - "2", - "--container_memory", - "128", - "--container_vcores", - "1", - "--debug" - ); - - LOG.info("Initializing DS Client"); - Client client = new Client(new Configuration(yarnCluster.getConfig())); - Assert.assertTrue(client.init(args)); - LOG.info("Running DS Client"); - Assert.assertTrue(client.run()); - } - - private int verifyContainerLog(int containerNum, - List expectedContent, boolean count, String expectedWord) { - File logFolder = - new File(yarnCluster.getNodeManager(0).getConfig() - .get(YarnConfiguration.NM_LOG_DIRS, - YarnConfiguration.DEFAULT_NM_LOG_DIRS)); - - File[] listOfFiles = logFolder.listFiles(); - int currentContainerLogFileIndex = -1; - for (int i = listOfFiles.length - 1; i >= 0; i--) { - if (listOfFiles[i].listFiles().length == containerNum + 1) { - currentContainerLogFileIndex = i; - break; - } - } - Assert.assertTrue(currentContainerLogFileIndex != -1); - File[] containerFiles = - listOfFiles[currentContainerLogFileIndex].listFiles(); - - int numOfWords = 0; - for (int i = 0; i < containerFiles.length; i++) { - for (File output : containerFiles[i].listFiles()) { - if (output.getName().trim().contains("stdout")) { - BufferedReader br = null; - List stdOutContent = new ArrayList<>(); - try { - - String sCurrentLine; - br = new BufferedReader(new FileReader(output)); - int numOfline = 0; - while ((sCurrentLine = br.readLine()) != null) { - if (count) { - if (sCurrentLine.contains(expectedWord)) { - numOfWords++; - } - } else if (output.getName().trim().equals("stdout")){ - if (! Shell.WINDOWS) { - Assert.assertEquals("The current is" + sCurrentLine, - expectedContent.get(numOfline), sCurrentLine.trim()); - numOfline++; - } else { - stdOutContent.add(sCurrentLine.trim()); - } - } - } - /* By executing bat script using cmd /c, - * it will output all contents from bat script first - * It is hard for us to do check line by line - * Simply check whether output from bat file contains - * all the expected messages - */ - if (Shell.WINDOWS && !count - && output.getName().trim().equals("stdout")) { - Assert.assertTrue(stdOutContent.containsAll(expectedContent)); - } - } catch (IOException e) { - LOG.error("Exception reading the buffer", e); - } finally { - try { - if (br != null) - br.close(); - } catch (IOException ex) { - LOG.error("Exception closing the bufferReader", ex); - } - } - } - } - } - return numOfWords; - } - - @Test - public void testDistributedShellResourceProfiles() throws Exception { - int appNameCounter = 0; - String[][] args = { - createArgsWithPostFix(appNameCounter++, - "--num_containers", "1", "--shell_command", - Shell.WINDOWS ? "dir" : "ls", "--container_resource_profile", - "maximum"), - createArgsWithPostFix(appNameCounter++, - "--num_containers", "1", "--shell_command", - Shell.WINDOWS ? "dir" : "ls", "--master_resource_profile", - "default"), - createArgsWithPostFix(appNameCounter++, - "--num_containers", "1", "--shell_command", - Shell.WINDOWS ? "dir" : "ls", "--master_resource_profile", - "default", "--container_resource_profile", "maximum"), - }; - - for (int i = 0; i < args.length; ++i) { - LOG.info("Initializing DS Client"); - Client client = new Client(new Configuration(yarnCluster.getConfig())); - Assert.assertTrue(client.init(args[i])); - LOG.info("Running DS Client"); - try { - client.run(); - Assert.fail("Client run should throw error"); - } catch (Exception e) { - continue; - } - } - } - - @Test - public void testDSShellWithOpportunisticContainers() throws Exception { - Client client = new Client(new Configuration(yarnCluster.getConfig())); - try { - String[] args = createArguments( - "--num_containers", - "2", - "--master_memory", - "512", - "--master_vcores", - "2", - "--container_memory", - "128", - "--container_vcores", - "1", - "--shell_command", - "date", - "--container_type", - "OPPORTUNISTIC" - ); - client.init(args); - assertTrue(client.run()); - } catch (Exception e) { - LOG.error("Job execution with opportunistic containers failed.", e); - Assert.fail("Exception. " + e.getMessage()); - } finally { - client.sendStopSignal(); - } - } - - @Test - @TimelineVersion(2.0f) - public void testDSShellWithEnforceExecutionType() throws Exception { - YarnClient yarnClient = null; - Client client = new Client(new Configuration(yarnCluster.getConfig())); - try { - String[] args = createArguments( - "--num_containers", - "2", - "--master_memory", - "512", - "--master_vcores", - "2", - "--container_memory", - "128", - "--container_vcores", - "1", - "--shell_command", - "date", - "--container_type", - "OPPORTUNISTIC", - "--enforce_execution_type" - ); - client.init(args); - final AtomicBoolean result = new AtomicBoolean(false); - Thread t = new Thread() { - public void run() { - try { - result.set(client.run()); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - }; - t.start(); - - yarnClient = YarnClient.createYarnClient(); - yarnClient.init(new Configuration(yarnCluster.getConfig())); - yarnClient.start(); - waitForContainersLaunch(yarnClient, 2); - List apps = yarnClient.getApplications(); - ApplicationReport appReport = apps.get(0); - ApplicationId appId = appReport.getApplicationId(); - List appAttempts = - yarnClient.getApplicationAttempts(appId); - ApplicationAttemptReport appAttemptReport = appAttempts.get(0); - ApplicationAttemptId appAttemptId = - appAttemptReport.getApplicationAttemptId(); - List containers = - yarnClient.getContainers(appAttemptId); - // we should get two containers. - Assert.assertEquals(2, containers.size()); - ContainerId amContainerId = appAttemptReport.getAMContainerId(); - for (ContainerReport container : containers) { - if (!container.getContainerId().equals(amContainerId)) { - Assert.assertEquals(container.getExecutionType(), - ExecutionType.OPPORTUNISTIC); - } - } - } catch (Exception e) { - LOG.error("Job execution with enforce execution type failed.", e); - Assert.fail("Exception. " + e.getMessage()); - } finally { - client.sendStopSignal(); - if (yarnClient != null) { - yarnClient.stop(); - } - } - } - - private void waitForContainersLaunch(YarnClient client, - int nContainers) throws Exception { - GenericTestUtils.waitFor(new Supplier() { - public Boolean get() { - try { - List apps = client.getApplications(); - if (apps == null || apps.isEmpty()) { - return false; - } - ApplicationId appId = apps.get(0).getApplicationId(); - List appAttempts = - client.getApplicationAttempts(appId); - if (appAttempts == null || appAttempts.isEmpty()) { - return false; - } - ApplicationAttemptId attemptId = - appAttempts.get(0).getApplicationAttemptId(); - List containers = client.getContainers(attemptId); - return (containers.size() == nContainers); - } catch (Exception e) { - return false; - } - } - }, 10, 60000); - } - - @Test - @TimelineVersion(2.0f) - public void testDistributedShellWithResources() throws Exception { - doTestDistributedShellWithResources(false); - } - - @Test - @TimelineVersion(2.0f) - public void testDistributedShellWithResourcesWithLargeContainers() - throws Exception { - doTestDistributedShellWithResources(true); - } - - public void doTestDistributedShellWithResources(boolean largeContainers) - throws Exception { - Resource clusterResource = yarnCluster.getResourceManager() - .getResourceScheduler().getClusterResource(); - String masterMemoryString = "1 Gi"; - String containerMemoryString = "512 Mi"; - long[] memVars = {1024, 512}; - - Assume.assumeTrue("The cluster doesn't have enough memory for this test", - clusterResource.getMemorySize() >= memVars[0] + memVars[1]); - Assume.assumeTrue("The cluster doesn't have enough cores for this test", - clusterResource.getVirtualCores() >= 2); - if (largeContainers) { - memVars[0] = clusterResource.getMemorySize() * 2 / 3; - memVars[0] = memVars[0] - memVars[0] % MIN_ALLOCATION_MB; - masterMemoryString = memVars[0] + "Mi"; - memVars[1] = clusterResource.getMemorySize() / 3; - memVars[1] = memVars[1] - memVars[1] % MIN_ALLOCATION_MB; - containerMemoryString = String.valueOf(memVars[1]); - } - - String[] args = createArguments( - "--num_containers", - "2", - "--shell_command", - Shell.WINDOWS ? "dir" : "ls", - "--master_resources", - "memory=" + masterMemoryString + ",vcores=1", - "--container_resources", - "memory=" + containerMemoryString + ",vcores=1" - ); - - LOG.info("Initializing DS Client"); - Client client = new Client(new Configuration(yarnCluster.getConfig())); - Assert.assertTrue(client.init(args)); - LOG.info("Running DS Client"); - final AtomicBoolean result = new AtomicBoolean(false); - Thread t = new Thread() { - public void run() { - try { - result.set(client.run()); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - }; - t.start(); - - YarnClient yarnClient = YarnClient.createYarnClient(); - yarnClient.init(new Configuration(yarnCluster.getConfig())); - yarnClient.start(); - - final AtomicBoolean testFailed = new AtomicBoolean(false); - try { - GenericTestUtils.waitFor(new Supplier() { - @Override - public Boolean get() { - if (testFailed.get()) { - return true; - } - List containers; - try { - List apps = yarnClient.getApplications(); - if (apps.isEmpty()) { - return false; - } - ApplicationReport appReport = apps.get(0); - ApplicationId appId = appReport.getApplicationId(); - List appAttempts = - yarnClient.getApplicationAttempts(appId); - if (appAttempts.isEmpty()) { - return false; - } - ApplicationAttemptReport appAttemptReport = appAttempts.get(0); - ContainerId amContainerId = appAttemptReport.getAMContainerId(); - if (amContainerId == null) { - return false; - } - ContainerReport report = yarnClient.getContainerReport( - amContainerId); - Resource masterResource = report.getAllocatedResource(); - Assert.assertEquals(memVars[0], - masterResource.getMemorySize()); - Assert.assertEquals(1, masterResource.getVirtualCores()); - containers = yarnClient.getContainers( - appAttemptReport.getApplicationAttemptId()); - if (containers.size() < 2) { - return false; - } - for (ContainerReport container : containers) { - if (!container.getContainerId().equals(amContainerId)) { - Resource containerResource = container.getAllocatedResource(); - Assert.assertEquals(memVars[1], - containerResource.getMemorySize()); - Assert.assertEquals(1, containerResource.getVirtualCores()); - } - } - return true; - } catch (Exception ex) { - LOG.error("Error waiting for expected results", ex); - testFailed.set(true); - } - return false; - } - }, 10, TEST_TIME_WINDOW_EXPIRE); - assertFalse(testFailed.get()); - } finally { - LOG.info("Signaling Client to Stop"); - client.sendStopSignal(); - if (yarnClient != null) { - LOG.info("Stopping yarnClient service"); - yarnClient.stop(); - } - } - } - - @Test(expected=ResourceNotFoundException.class) - public void testDistributedShellAMResourcesWithUnknownResource() - throws Exception { - String[] args = createArguments( - "--num_containers", - "1", - "--shell_command", - Shell.WINDOWS ? "dir" : "ls", - "--master_resources", - "unknown-resource=5" - ); - Client client = new Client(new Configuration(yarnCluster.getConfig())); - client.init(args); - client.run(); - } - - @Test(expected=IllegalArgumentException.class) - public void testDistributedShellNonExistentQueue() - throws Exception { - String[] args = createArguments( - "--num_containers", - "1", - "--shell_command", - Shell.WINDOWS ? "dir" : "ls", - "--queue", - "non-existent-queue" - ); - Client client = new Client(new Configuration(yarnCluster.getConfig())); - client.init(args); - client.run(); - } - - @Test - public void testDistributedShellWithSingleFileLocalization() - throws Exception { - String[] args = createArguments( - "--num_containers", - "1", - "--shell_command", - Shell.WINDOWS ? "type" : "cat", - "--localize_files", - "./src/test/resources/a.txt", - "--shell_args", - "a.txt" - ); - - Client client = new Client(new Configuration(yarnCluster.getConfig())); - client.init(args); - assertTrue("Client exited with an error", client.run()); - } - - @Test - public void testDistributedShellWithMultiFileLocalization() - throws Exception { - String[] args = createArguments( - "--num_containers", - "1", - "--shell_command", - Shell.WINDOWS ? "type" : "cat", - "--localize_files", - "./src/test/resources/a.txt,./src/test/resources/b.txt", - "--shell_args", - "a.txt b.txt" - ); - - Client client = new Client(new Configuration(yarnCluster.getConfig())); - client.init(args); - assertTrue("Client exited with an error", client.run()); - } - - @Test(expected=UncheckedIOException.class) - public void testDistributedShellWithNonExistentFileLocalization() - throws Exception { - String[] args = createArguments( - "--num_containers", - "1", - "--shell_command", - Shell.WINDOWS ? "type" : "cat", - "--localize_files", - "/non/existing/path/file.txt", - "--shell_args", - "file.txt" - ); - - Client client = new Client(new Configuration(yarnCluster.getConfig())); - client.init(args); - assertTrue(client.run()); - } - - - @Test - public void testDistributedShellCleanup() - throws Exception { - String appName = "DistributedShellCleanup"; - String[] args = createArguments( - "--num_containers", - "1", - "--shell_command", - Shell.WINDOWS ? "dir" : "ls" - ); - Configuration config = new Configuration(yarnCluster.getConfig()); - Client client = new Client(config); - try { - client.init(args); - client.run(); - ApplicationId appId = client.getAppId(); - String relativePath = - ApplicationMaster.getRelativePath(appName, appId.toString(), ""); - FileSystem fs1 = FileSystem.get(config); - Path path = new Path(fs1.getHomeDirectory(), relativePath); - - GenericTestUtils.waitFor(() -> { - try { - return !fs1.exists(path); - } catch (IOException e) { - return false; - } - }, 10, 60000); - - assertFalse("Distributed Shell Cleanup failed", fs1.exists(path)); - } finally { - client.sendStopSignal(); - } - } -}