YARN-10553. Refactor TestDistributedShell (#4159)

(cherry picked from commit 890f2da624)

 Conflicts:
	hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSWithMultipleNodeManager.java
	hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java

Co-authored-by: Ahmed Hussein <50450311+amahussein@users.noreply.github.com>
This commit is contained in:
Akira Ajisaka 2022-04-13 12:22:17 +09:00 committed by GitHub
parent 07dface36a
commit 2112ef61e0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 2375 additions and 2110 deletions

View File

@ -781,7 +781,7 @@ public class ApplicationMaster {
new HelpFormatter().printHelp("ApplicationMaster", opts);
}
private void cleanup() {
protected void cleanup() {
try {
appSubmitterUgi.doAs(new PrivilegedExceptionAction<Void>() {
@Override

View File

@ -1414,21 +1414,19 @@ public class Client {
}
int waitCount = 0;
LOG.info("Waiting for Client to exit loop");
while (!isRunning.get()) {
while (isRunning.get()) {
try {
Thread.sleep(50);
} catch (InterruptedException ie) {
// do nothing
} finally {
waitCount++;
if (isRunning.get() || waitCount > 2000) {
if (++waitCount > 2000) {
break;
}
}
}
LOG.info("Stopping yarnClient within the Client");
LOG.info("Stopping yarnClient within the DS Client");
yarnClient.stop();
yarnClient.waitForServiceToStop(clientTimeout);
LOG.info("done stopping Client");
}
}

View File

@ -0,0 +1,607 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.applications.distributedshell;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.TestName;
import org.junit.rules.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.net.ServerSocketUtil;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.JarFinder;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ContainerReport;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.timeline.NameValuePair;
import org.apache.hadoop.yarn.util.LinuxResourceCalculatorPlugin;
import org.apache.hadoop.yarn.util.ProcfsBasedProcessTree;
/**
* Base class for testing DistributedShell features.
*/
public abstract class DistributedShellBaseTest {
protected static final int MIN_ALLOCATION_MB = 128;
protected static final int NUM_DATA_NODES = 1;
protected static final int TEST_TIME_OUT = 160000;
// set the timeout of the yarnClient to be 95% of the globalTimeout.
protected static final int TEST_TIME_WINDOW_EXPIRE =
(TEST_TIME_OUT * 90) / 100;
private static final Logger LOG =
LoggerFactory.getLogger(DistributedShellBaseTest.class);
private static final String APP_MASTER_JAR =
JarFinder.getJar(ApplicationMaster.class);
private static final int NUM_NMS = 1;
// set the timeout of the yarnClient to be 95% of the globalTimeout.
private static final String YARN_CLIENT_TIMEOUT =
String.valueOf(TEST_TIME_WINDOW_EXPIRE);
private static final String[] COMMON_ARGS = {
"--jar",
APP_MASTER_JAR,
"--timeout",
YARN_CLIENT_TIMEOUT,
"--appname",
""
};
private static MiniDFSCluster hdfsCluster = null;
private static MiniYARNCluster yarnCluster = null;
private static String yarnSiteBackupPath = null;
private static String yarnSitePath = null;
@Rule
public Timeout globalTimeout = new Timeout(TEST_TIME_OUT,
TimeUnit.MILLISECONDS);
@Rule
public TemporaryFolder tmpFolder = new TemporaryFolder();
@Rule
public TestName name = new TestName();
private Client dsClient;
private YarnConfiguration conf = null;
// location of the filesystem timeline writer for timeline service v.2
private String timelineV2StorageDir = null;
@BeforeClass
public static void setupUnitTests() throws Exception {
URL url = Thread.currentThread().getContextClassLoader().getResource(
"yarn-site.xml");
if (url == null) {
throw new RuntimeException(
"Could not find 'yarn-site.xml' dummy file in classpath");
}
// backup the original yarn-site file.
yarnSitePath = url.getPath();
yarnSiteBackupPath = url.getPath() + "-backup";
Files.copy(Paths.get(yarnSitePath),
Paths.get(yarnSiteBackupPath),
StandardCopyOption.COPY_ATTRIBUTES,
StandardCopyOption.REPLACE_EXISTING);
}
@AfterClass
public static void tearDownUnitTests() throws Exception {
// shutdown the clusters.
shutdownYarnCluster();
shutdownHdfsCluster();
if (yarnSitePath == null || yarnSiteBackupPath == null) {
return;
}
// restore the original yarn-site file.
if (Files.exists(Paths.get(yarnSiteBackupPath))) {
Files.move(Paths.get(yarnSiteBackupPath), Paths.get(yarnSitePath),
StandardCopyOption.REPLACE_EXISTING);
}
}
/**
* Utility function to merge two String arrays to form a new String array for
* our arguments.
*
* @param args the first set of the arguments.
* @param newArgs the second set of the arguments.
* @return a String array consists of {args, newArgs}
*/
protected static String[] mergeArgs(String[] args, String[] newArgs) {
int length = args.length + newArgs.length;
String[] result = new String[length];
System.arraycopy(args, 0, result, 0, args.length);
System.arraycopy(newArgs, 0, result, args.length, newArgs.length);
return result;
}
protected static String[] createArguments(Supplier<String> testNameProvider,
String... args) {
String[] res = mergeArgs(COMMON_ARGS, args);
// set the application name so we can track down which command is running.
res[COMMON_ARGS.length - 1] = testNameProvider.get();
return res;
}
protected static String getSleepCommand(int sec) {
// Windows doesn't have a sleep command, ping -n does the trick
return Shell.WINDOWS ? "ping -n " + (sec + 1) + " 127.0.0.1 >nul"
: "sleep " + sec;
}
protected static String getListCommand() {
return Shell.WINDOWS ? "dir" : "ls";
}
protected static String getCatCommand() {
return Shell.WINDOWS ? "type" : "cat";
}
protected static void shutdownYarnCluster() {
if (yarnCluster != null) {
try {
yarnCluster.stop();
} finally {
yarnCluster = null;
}
}
}
protected static void shutdownHdfsCluster() {
if (hdfsCluster != null) {
try {
hdfsCluster.shutdown();
} finally {
hdfsCluster = null;
}
}
}
public String getTimelineV2StorageDir() {
return timelineV2StorageDir;
}
public void setTimelineV2StorageDir() throws Exception {
timelineV2StorageDir = tmpFolder.newFolder().getAbsolutePath();
}
@Before
public void setup() throws Exception {
setupInternal(NUM_NMS, new YarnConfiguration());
}
@After
public void tearDown() throws IOException {
cleanUpDFSClient();
FileContext fsContext = FileContext.getLocalFSFileContext();
fsContext
.delete(
new Path(conf.get(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH)),
true);
shutdownYarnCluster();
shutdownHdfsCluster();
}
protected String[] createArgumentsWithAppName(String... args) {
return createArguments(() -> generateAppName(), args);
}
protected void waitForContainersLaunch(YarnClient client, int nContainers,
AtomicReference<ApplicationAttemptReport> appAttemptReportRef,
AtomicReference<List<ContainerReport>> containersListRef,
AtomicReference<ApplicationAttemptId> appAttemptIdRef,
AtomicReference<Throwable> thrownErrorRef) throws Exception {
GenericTestUtils.waitFor(() -> {
try {
List<ApplicationReport> apps = client.getApplications();
if (apps == null || apps.isEmpty()) {
return false;
}
ApplicationId appId = apps.get(0).getApplicationId();
List<ApplicationAttemptReport> appAttempts =
client.getApplicationAttempts(appId);
if (appAttempts == null || appAttempts.isEmpty()) {
return false;
}
ApplicationAttemptId attemptId =
appAttempts.get(0).getApplicationAttemptId();
List<ContainerReport> containers = client.getContainers(attemptId);
if (containers == null || containers.size() < nContainers) {
return false;
}
containersListRef.set(containers);
appAttemptIdRef.set(attemptId);
appAttemptReportRef.set(appAttempts.get(0));
} catch (Exception e) {
LOG.error("Exception waiting for Containers Launch", e);
thrownErrorRef.set(e);
}
return true;
}, 10, TEST_TIME_WINDOW_EXPIRE);
}
protected abstract void customizeConfiguration(YarnConfiguration config)
throws Exception;
protected String[] appendFlowArgsForTestDSShell(String[] args,
boolean defaultFlow) {
return args;
}
protected String[] appendDomainArgsForTestDSShell(String[] args,
boolean haveDomain) {
String[] result = args;
if (haveDomain) {
String[] domainArgs = {
"--domain",
"TEST_DOMAIN",
"--view_acls",
"reader_user reader_group",
"--modify_acls",
"writer_user writer_group",
"--create"
};
result = mergeArgs(args, domainArgs);
}
return result;
}
protected Client setAndGetDSClient(Configuration config) throws Exception {
dsClient = new Client(config);
return dsClient;
}
protected Client setAndGetDSClient(String appMasterMainClass,
Configuration config) throws Exception {
dsClient = new Client(appMasterMainClass, config);
return dsClient;
}
protected void baseTestDSShell(boolean haveDomain, boolean defaultFlow)
throws Exception {
String[] baseArgs = createArgumentsWithAppName(
"--num_containers",
"2",
"--shell_command",
getListCommand(),
"--master_memory",
"512",
"--master_vcores",
"2",
"--container_memory",
"128",
"--container_vcores",
"1");
String[] domainArgs = appendDomainArgsForTestDSShell(baseArgs, haveDomain);
String[] args = appendFlowArgsForTestDSShell(domainArgs, defaultFlow);
LOG.info("Initializing DS Client");
YarnClient yarnClient;
dsClient = setAndGetDSClient(new Configuration(yarnCluster.getConfig()));
boolean initSuccess = dsClient.init(args);
Assert.assertTrue(initSuccess);
LOG.info("Running DS Client");
final AtomicBoolean result = new AtomicBoolean(false);
Thread t = new Thread(() -> {
try {
result.set(dsClient.run());
} catch (Exception e) {
throw new RuntimeException(e);
}
});
t.start();
yarnClient = YarnClient.createYarnClient();
yarnClient.init(new Configuration(yarnCluster.getConfig()));
yarnClient.start();
AtomicInteger waitResult = new AtomicInteger(0);
AtomicReference<ApplicationId> appIdRef =
new AtomicReference<>(null);
AtomicReference<ApplicationReport> appReportRef =
new AtomicReference<>(null);
GenericTestUtils.waitFor(() -> {
try {
List<ApplicationReport> apps = yarnClient.getApplications();
if (apps.size() == 0) {
return false;
}
ApplicationReport appReport = apps.get(0);
appReportRef.set(appReport);
appIdRef.set(appReport.getApplicationId());
if (appReport.getHost().equals("N/A")) {
return false;
}
if (appReport.getRpcPort() == -1) {
waitResult.set(1);
}
if (appReport.getYarnApplicationState() == YarnApplicationState.FINISHED
&& appReport.getFinalApplicationStatus() !=
FinalApplicationStatus.UNDEFINED) {
return true;
}
} catch (Exception e) {
LOG.error("Exception get application from Yarn Client", e);
waitResult.set(2);
}
return waitResult.get() != 0;
}, 10, TEST_TIME_WINDOW_EXPIRE);
t.join();
if (waitResult.get() == 2) {
// Exception was raised
Assert.fail("Exception in getting application report. Failed");
}
if (waitResult.get() == 1) {
Assert.assertEquals("Failed waiting for expected rpc port to be -1.",
-1, appReportRef.get().getRpcPort());
}
checkTimeline(appIdRef.get(), defaultFlow, haveDomain, appReportRef.get());
}
protected void baseTestDSShell(boolean haveDomain) throws Exception {
baseTestDSShell(haveDomain, true);
}
protected void checkTimeline(ApplicationId appId,
boolean defaultFlow, boolean haveDomain,
ApplicationReport appReport) throws Exception {
TimelineDomain domain = null;
if (haveDomain) {
domain = yarnCluster.getApplicationHistoryServer()
.getTimelineStore().getDomain("TEST_DOMAIN");
Assert.assertNotNull(domain);
Assert.assertEquals("reader_user reader_group", domain.getReaders());
Assert.assertEquals("writer_user writer_group", domain.getWriters());
}
TimelineEntities entitiesAttempts = yarnCluster
.getApplicationHistoryServer()
.getTimelineStore()
.getEntities(ApplicationMaster.DSEntity.DS_APP_ATTEMPT.toString(),
null, null, null, null, null, null, null, null, null);
Assert.assertNotNull(entitiesAttempts);
Assert.assertEquals(1, entitiesAttempts.getEntities().size());
Assert.assertEquals(2, entitiesAttempts.getEntities().get(0).getEvents()
.size());
Assert.assertEquals(entitiesAttempts.getEntities().get(0).getEntityType(),
ApplicationMaster.DSEntity.DS_APP_ATTEMPT.toString());
Assert.assertEquals(haveDomain ? domain.getId() : "DEFAULT",
entitiesAttempts.getEntities().get(0).getDomainId());
String currAttemptEntityId =
entitiesAttempts.getEntities().get(0).getEntityId();
ApplicationAttemptId attemptId = ApplicationAttemptId.fromString(
currAttemptEntityId);
NameValuePair primaryFilter = new NameValuePair(
ApplicationMaster.APPID_TIMELINE_FILTER_NAME,
attemptId.getApplicationId().toString());
TimelineEntities entities = yarnCluster
.getApplicationHistoryServer()
.getTimelineStore()
.getEntities(ApplicationMaster.DSEntity.DS_CONTAINER.toString(), null,
null, null, null, null, primaryFilter, null, null, null);
Assert.assertNotNull(entities);
Assert.assertEquals(2, entities.getEntities().size());
Assert.assertEquals(entities.getEntities().get(0).getEntityType(),
ApplicationMaster.DSEntity.DS_CONTAINER.toString());
String entityId = entities.getEntities().get(0).getEntityId();
TimelineEntity entity =
yarnCluster.getApplicationHistoryServer().getTimelineStore()
.getEntity(entityId,
ApplicationMaster.DSEntity.DS_CONTAINER.toString(), null);
Assert.assertNotNull(entity);
Assert.assertEquals(entityId, entity.getEntityId());
Assert.assertEquals(haveDomain ? domain.getId() : "DEFAULT",
entities.getEntities().get(0).getDomainId());
}
protected String[] createArgsWithPostFix(int index, String... args) {
String[] res = mergeArgs(COMMON_ARGS, args);
// set the application name so we can track down which command is running.
res[COMMON_ARGS.length - 1] = generateAppName(String.format("%03d",
index));
return res;
}
protected String generateAppName() {
return generateAppName(null);
}
protected String generateAppName(String postFix) {
return name.getMethodName().replaceFirst("test", "")
.concat(postFix == null ? "" : "-" + postFix);
}
protected void setUpHDFSCluster() throws IOException {
if (hdfsCluster == null) {
HdfsConfiguration hdfsConfig = new HdfsConfiguration();
hdfsCluster = new MiniDFSCluster.Builder(hdfsConfig)
.numDataNodes(NUM_DATA_NODES).build();
hdfsCluster.waitActive();
}
}
protected void setUpYarnCluster(int numNodeManagers,
YarnConfiguration yarnConfig) throws Exception {
if (yarnCluster != null) {
return;
}
yarnCluster =
new MiniYARNCluster(getClass().getSimpleName(), 1, numNodeManagers,
1, 1);
yarnCluster.init(yarnConfig);
yarnCluster.start();
// wait for the node managers to register.
waitForNMsToRegister();
conf.set(
YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS,
MiniYARNCluster.getHostname() + ":"
+ yarnCluster.getApplicationHistoryServer().getPort());
Configuration yarnClusterConfig = yarnCluster.getConfig();
yarnClusterConfig.set(YarnConfiguration.YARN_APPLICATION_CLASSPATH,
new File(yarnSitePath).getParent());
// write the document to a buffer (not directly to the file, as that
// can cause the file being written to get read -which will then fail.
ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
yarnClusterConfig.writeXml(bytesOut);
bytesOut.close();
// write the bytes to the file in the classpath
OutputStream os = new FileOutputStream(yarnSitePath);
os.write(bytesOut.toByteArray());
os.close();
}
protected void setupInternal(int numNodeManagers,
YarnConfiguration yarnConfig) throws Exception {
LOG.info("========== Setting UP UnitTest {}#{} ==========",
getClass().getCanonicalName(), name.getMethodName());
LOG.info("Starting up YARN cluster. Timeline version {}",
getTimelineVersion());
conf = yarnConfig;
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
MIN_ALLOCATION_MB);
// reduce the tearDown waiting time
conf.setLong(YarnConfiguration.DISPATCHER_DRAIN_EVENTS_TIMEOUT, 1000);
conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 500);
conf.set("yarn.log.dir", "target");
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
// mark if we need to launch the v1 timeline server
// disable aux-service based timeline aggregators
conf.set(YarnConfiguration.NM_AUX_SERVICES, "");
conf.setBoolean(YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED, true);
conf.set(YarnConfiguration.NM_VMEM_PMEM_RATIO, "8");
conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true);
conf.set("mapreduce.jobhistory.address",
"0.0.0.0:" + ServerSocketUtil.getPort(10021, 10));
// Enable ContainersMonitorImpl
conf.set(YarnConfiguration.NM_CONTAINER_MON_RESOURCE_CALCULATOR,
LinuxResourceCalculatorPlugin.class.getName());
conf.set(YarnConfiguration.NM_CONTAINER_MON_PROCESS_TREE,
ProcfsBasedProcessTree.class.getName());
conf.setBoolean(YarnConfiguration.NM_PMEM_CHECK_ENABLED, true);
conf.setBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED, true);
conf.setBoolean(
YarnConfiguration.YARN_MINICLUSTER_CONTROL_RESOURCE_MONITORING, true);
conf.setBoolean(YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_ENABLED,
true);
conf.setBoolean(
YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true);
conf.setInt(YarnConfiguration.NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH,
10);
conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
YarnConfiguration.PROCESSOR_RM_PLACEMENT_CONSTRAINTS_HANDLER);
// ATS version specific settings
conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION,
getTimelineVersion());
// setup the configuration of relevant for each TimelineService version.
customizeConfiguration(conf);
// setup the yarn cluster.
setUpYarnCluster(numNodeManagers, conf);
}
protected NodeManager getNodeManager(int index) {
return yarnCluster.getNodeManager(index);
}
protected MiniYARNCluster getYarnCluster() {
return yarnCluster;
}
protected void setConfiguration(String key, String value) {
conf.set(key, value);
}
protected Configuration getYarnClusterConfiguration() {
return yarnCluster.getConfig();
}
protected Configuration getConfiguration() {
return conf;
}
protected ResourceManager getResourceManager() {
return yarnCluster.getResourceManager();
}
protected ResourceManager getResourceManager(int index) {
return yarnCluster.getResourceManager(index);
}
protected Client getDSClient() {
return dsClient;
}
protected void resetDSClient() {
dsClient = null;
}
protected abstract float getTimelineVersion();
protected void cleanUpDFSClient() {
if (getDSClient() != null) {
getDSClient().sendStopSignal();
resetDSClient();
}
}
private void waitForNMsToRegister() throws Exception {
GenericTestUtils.waitFor(() -> {
RMContext rmContext = yarnCluster.getResourceManager().getRMContext();
return (rmContext.getRMNodes().size() >= NUM_NMS);
}, 100, 60000);
}
protected MiniDFSCluster getHDFSCluster() {
return hdfsCluster;
}
}

View File

@ -18,11 +18,10 @@
package org.apache.hadoop.yarn.applications.distributedshell;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TestDSSleepingAppMaster extends ApplicationMaster{
public class TestDSSleepingAppMaster extends ApplicationMaster {
private static final Logger LOG = LoggerFactory
.getLogger(TestDSSleepingAppMaster.class);
@ -30,8 +29,8 @@ public class TestDSSleepingAppMaster extends ApplicationMaster{
public static void main(String[] args) {
boolean result = false;
TestDSSleepingAppMaster appMaster = new TestDSSleepingAppMaster();
try {
TestDSSleepingAppMaster appMaster = new TestDSSleepingAppMaster();
boolean doRun = appMaster.init(args);
if (!doRun) {
System.exit(0);
@ -48,6 +47,10 @@ public class TestDSSleepingAppMaster extends ApplicationMaster{
result = appMaster.finish();
} catch (Throwable t) {
System.exit(1);
} finally {
if (appMaster != null) {
appMaster.cleanup();
}
}
if (result) {
LOG.info("Application Master completed successfully. exiting");

View File

@ -0,0 +1,843 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.applications.distributedshell;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.UncheckedIOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.cli.MissingArgumentException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.LambdaTestUtils;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.client.api.impl.DirectTimelineWriter;
import org.apache.hadoop.yarn.client.api.impl.TestTimelineClient;
import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl;
import org.apache.hadoop.yarn.client.api.impl.TimelineWriter;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.ResourceNotFoundException;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.Records;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
/**
* Unit tests implementations for distributed shell on TimeLineV1.
*/
public class TestDSTimelineV10 extends DistributedShellBaseTest {
private static final Logger LOG =
LoggerFactory.getLogger(TestDSTimelineV10.class);
@Override
protected float getTimelineVersion() {
return 1.0f;
}
@Override
protected void cleanUpDFSClient() {
}
@Test
public void testDSShellWithDomain() throws Exception {
baseTestDSShell(true);
}
@Test
public void testDSShellWithoutDomain() throws Exception {
baseTestDSShell(false);
}
@Test
public void testDSRestartWithPreviousRunningContainers() throws Exception {
String[] args = createArgumentsWithAppName(
"--num_containers",
"1",
"--shell_command",
getSleepCommand(8),
"--master_memory",
"512",
"--container_memory",
"128",
"--keep_containers_across_application_attempts"
);
LOG.info("Initializing DS Client");
setAndGetDSClient(TestDSFailedAppMaster.class.getName(),
new Configuration(getYarnClusterConfiguration()));
getDSClient().init(args);
LOG.info("Running DS Client");
boolean result = getDSClient().run();
LOG.info("Client run completed. Result={}", result);
// application should succeed
Assert.assertTrue(result);
}
/*
* The sleeping period in TestDSSleepingAppMaster is set as 5 seconds.
* Set attempt_failures_validity_interval as 2.5 seconds. It will check
* how many attempt failures for previous 2.5 seconds.
* The application is expected to be successful.
*/
@Test
public void testDSAttemptFailuresValidityIntervalSuccess() throws Exception {
String[] args = createArgumentsWithAppName(
"--num_containers",
"1",
"--shell_command",
getSleepCommand(8),
"--master_memory",
"512",
"--container_memory",
"128",
"--attempt_failures_validity_interval",
"2500"
);
LOG.info("Initializing DS Client");
Configuration config = getYarnClusterConfiguration();
config.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
setAndGetDSClient(TestDSSleepingAppMaster.class.getName(),
new Configuration(config));
getDSClient().init(args);
LOG.info("Running DS Client");
boolean result = getDSClient().run();
LOG.info("Client run completed. Result=" + result);
// application should succeed
Assert.assertTrue(result);
}
/*
* The sleeping period in TestDSSleepingAppMaster is set as 5 seconds.
* Set attempt_failures_validity_interval as 15 seconds. It will check
* how many attempt failure for previous 15 seconds.
* The application is expected to be fail.
*/
@Test
public void testDSAttemptFailuresValidityIntervalFailed() throws Exception {
String[] args = createArgumentsWithAppName(
"--num_containers",
"1",
"--shell_command",
getSleepCommand(8),
"--master_memory",
"512",
"--container_memory",
"128",
"--attempt_failures_validity_interval",
"15000"
);
LOG.info("Initializing DS Client");
Configuration config = getYarnClusterConfiguration();
config.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
setAndGetDSClient(TestDSSleepingAppMaster.class.getName(),
new Configuration(config));
getDSClient().init(args);
LOG.info("Running DS Client");
boolean result = getDSClient().run();
LOG.info("Client run completed. Result=" + result);
// application should be failed
Assert.assertFalse(result);
}
@Test
public void testDSShellWithCustomLogPropertyFile() throws Exception {
final File basedir = getBaseDirForTest();
final File tmpDir = new File(basedir, "tmpDir");
tmpDir.mkdirs();
final File customLogProperty = new File(tmpDir, "custom_log4j.properties");
if (customLogProperty.exists()) {
customLogProperty.delete();
}
if (!customLogProperty.createNewFile()) {
Assert.fail("Can not create custom log4j property file.");
}
PrintWriter fileWriter = new PrintWriter(customLogProperty);
// set the output to DEBUG level
fileWriter.write("log4j.rootLogger=debug,stdout");
fileWriter.close();
String[] args = createArgumentsWithAppName(
"--num_containers",
"3",
"--shell_command",
"echo",
"--shell_args",
"HADOOP",
"--log_properties",
customLogProperty.getAbsolutePath(),
"--master_memory",
"512",
"--master_vcores",
"2",
"--container_memory",
"128",
"--container_vcores",
"1"
);
// Before run the DS, the default the log level is INFO
final Logger LOG_Client =
LoggerFactory.getLogger(Client.class);
Assert.assertTrue(LOG_Client.isInfoEnabled());
Assert.assertFalse(LOG_Client.isDebugEnabled());
final Logger LOG_AM = LoggerFactory.getLogger(ApplicationMaster.class);
Assert.assertTrue(LOG_AM.isInfoEnabled());
Assert.assertFalse(LOG_AM.isDebugEnabled());
LOG.info("Initializing DS Client");
setAndGetDSClient(new Configuration(getYarnClusterConfiguration()));
boolean initSuccess = getDSClient().init(args);
Assert.assertTrue(initSuccess);
LOG.info("Running DS Client");
boolean result = getDSClient().run();
LOG.info("Client run completed. Result=" + result);
Assert.assertTrue(verifyContainerLog(3, null, true, "DEBUG") > 10);
//After DS is finished, the log level should be DEBUG
Assert.assertTrue(LOG_Client.isInfoEnabled());
Assert.assertTrue(LOG_Client.isDebugEnabled());
Assert.assertTrue(LOG_AM.isInfoEnabled());
Assert.assertTrue(LOG_AM.isDebugEnabled());
}
@Test
public void testSpecifyingLogAggregationContext() throws Exception {
String regex = ".*(foo|bar)\\d";
String[] args = createArgumentsWithAppName(
"--shell_command",
"echo",
"--rolling_log_pattern",
regex
);
setAndGetDSClient(new Configuration(getYarnClusterConfiguration()));
Assert.assertTrue(getDSClient().init(args));
ApplicationSubmissionContext context =
Records.newRecord(ApplicationSubmissionContext.class);
getDSClient().specifyLogAggregationContext(context);
LogAggregationContext logContext = context.getLogAggregationContext();
assertEquals(logContext.getRolledLogsIncludePattern(), regex);
assertTrue(logContext.getRolledLogsExcludePattern().isEmpty());
}
@Test
public void testDSShellWithMultipleArgs() throws Exception {
String[] args = createArgumentsWithAppName(
"--num_containers",
"4",
"--shell_command",
"echo",
"--shell_args",
"HADOOP YARN MAPREDUCE HDFS",
"--master_memory",
"512",
"--master_vcores",
"2",
"--container_memory",
"128",
"--container_vcores",
"1"
);
LOG.info("Initializing DS Client");
setAndGetDSClient(new Configuration(getYarnClusterConfiguration()));
boolean initSuccess = getDSClient().init(args);
Assert.assertTrue(initSuccess);
LOG.info("Running DS Client");
boolean result = getDSClient().run();
LOG.info("Client run completed. Result=" + result);
List<String> expectedContent = new ArrayList<>();
expectedContent.add("HADOOP YARN MAPREDUCE HDFS");
verifyContainerLog(4, expectedContent, false, "");
}
@Test
public void testDSShellWithShellScript() throws Exception {
final File basedir = getBaseDirForTest();
final File tmpDir = new File(basedir, "tmpDir");
tmpDir.mkdirs();
final File customShellScript = new File(tmpDir, "custom_script.sh");
if (customShellScript.exists()) {
customShellScript.delete();
}
if (!customShellScript.createNewFile()) {
Assert.fail("Can not create custom shell script file.");
}
PrintWriter fileWriter = new PrintWriter(customShellScript);
// set the output to DEBUG level
fileWriter.write("echo testDSShellWithShellScript");
fileWriter.close();
LOG.info(customShellScript.getAbsolutePath());
String[] args = createArgumentsWithAppName(
"--num_containers",
"1",
"--shell_script",
customShellScript.getAbsolutePath(),
"--master_memory",
"512",
"--master_vcores",
"2",
"--container_memory",
"128",
"--container_vcores",
"1"
);
LOG.info("Initializing DS Client");
setAndGetDSClient(new Configuration(getYarnClusterConfiguration()));
Assert.assertTrue(getDSClient().init(args));
LOG.info("Running DS Client");
assertTrue(getDSClient().run());
List<String> expectedContent = new ArrayList<>();
expectedContent.add("testDSShellWithShellScript");
verifyContainerLog(1, expectedContent, false, "");
}
@Test
public void testDSShellWithInvalidArgs() throws Exception {
setAndGetDSClient(new Configuration(getYarnClusterConfiguration()));
int appNameCounter = 0;
LOG.info("Initializing DS Client with no args");
LambdaTestUtils.intercept(IllegalArgumentException.class,
"No args",
() -> getDSClient().init(new String[]{}));
LOG.info("Initializing DS Client with no jar file");
String[] noJarArgs = createArgsWithPostFix(appNameCounter++,
"--num_containers",
"2",
"--shell_command",
getListCommand(),
"--master_memory",
"512",
"--container_memory",
"128"
);
String[] argsNoJar = Arrays.copyOfRange(noJarArgs, 2, noJarArgs.length);
LambdaTestUtils.intercept(IllegalArgumentException.class,
"No jar",
() -> getDSClient().init(argsNoJar));
LOG.info("Initializing DS Client with no shell command");
String[] noShellCmdArgs = createArgsWithPostFix(appNameCounter++,
"--num_containers",
"2",
"--master_memory",
"512",
"--container_memory",
"128"
);
LambdaTestUtils.intercept(IllegalArgumentException.class,
"No shell command",
() -> getDSClient().init(noShellCmdArgs));
LOG.info("Initializing DS Client with invalid no. of containers");
String[] numContainersArgs = createArgsWithPostFix(appNameCounter++,
"--num_containers",
"-1",
"--shell_command",
getListCommand(),
"--master_memory",
"512",
"--container_memory",
"128"
);
LambdaTestUtils.intercept(IllegalArgumentException.class,
"Invalid no. of containers",
() -> getDSClient().init(numContainersArgs));
LOG.info("Initializing DS Client with invalid no. of vcores");
String[] vCoresArgs = createArgsWithPostFix(appNameCounter++,
"--num_containers",
"2",
"--shell_command",
getListCommand(),
"--master_memory",
"512",
"--master_vcores",
"-2",
"--container_memory",
"128",
"--container_vcores",
"1"
);
getDSClient().init(vCoresArgs);
LambdaTestUtils.intercept(IllegalArgumentException.class,
"Invalid virtual cores specified",
() -> {
getDSClient().init(vCoresArgs);
getDSClient().run();
});
LOG.info("Initializing DS Client with --shell_command and --shell_script");
String[] scriptAndCmdArgs = createArgsWithPostFix(appNameCounter++,
"--num_containers",
"2",
"--shell_command",
getListCommand(),
"--master_memory",
"512",
"--master_vcores",
"2",
"--container_memory",
"128",
"--container_vcores",
"1",
"--shell_script",
"test.sh"
);
LambdaTestUtils.intercept(IllegalArgumentException.class,
"Can not specify shell_command option and shell_script option at "
+ "the same time",
() -> getDSClient().init(scriptAndCmdArgs));
LOG.info(
"Initializing DS Client without --shell_command and --shell_script");
String[] noShellCmdNoScriptArgs = createArgsWithPostFix(appNameCounter++,
"--num_containers",
"2",
"--master_memory",
"512",
"--master_vcores",
"2",
"--container_memory",
"128",
"--container_vcores",
"1"
);
LambdaTestUtils.intercept(IllegalArgumentException.class,
"No shell command or shell script specified "
+ "to be executed by application master",
() -> getDSClient().init(noShellCmdNoScriptArgs));
LOG.info("Initializing DS Client with invalid container_type argument");
String[] invalidTypeArgs = createArgsWithPostFix(appNameCounter++,
"--num_containers",
"2",
"--master_memory",
"512",
"--master_vcores",
"2",
"--container_memory",
"128",
"--container_vcores",
"1",
"--shell_command",
"date",
"--container_type",
"UNSUPPORTED_TYPE"
);
LambdaTestUtils.intercept(IllegalArgumentException.class,
"Invalid container_type: UNSUPPORTED_TYPE",
() -> getDSClient().init(invalidTypeArgs));
String[] invalidMemArgs = createArgsWithPostFix(appNameCounter++,
"--num_containers",
"1",
"--shell_command",
getListCommand(),
"--master_resources",
"memory-mb=invalid"
);
LambdaTestUtils.intercept(IllegalArgumentException.class,
() -> getDSClient().init(invalidMemArgs));
String[] invalidMasterResArgs = createArgsWithPostFix(appNameCounter++,
"--num_containers",
"1",
"--shell_command",
getListCommand(),
"--master_resources"
);
LambdaTestUtils.intercept(MissingArgumentException.class,
() -> getDSClient().init(invalidMasterResArgs));
}
@Test
public void testDSTimelineClientWithConnectionRefuse() throws Exception {
ApplicationMaster am = new ApplicationMaster();
final AtomicReference<TimelineWriter> spyTimelineWriterRef =
new AtomicReference<>(null);
TimelineClientImpl client = new TimelineClientImpl() {
@Override
protected TimelineWriter createTimelineWriter(Configuration conf,
UserGroupInformation authUgi, com.sun.jersey.api.client.Client client,
URI resURI) throws IOException {
TimelineWriter timelineWriter =
new DirectTimelineWriter(authUgi, client, resURI);
spyTimelineWriterRef.set(spy(timelineWriter));
return spyTimelineWriterRef.get();
}
};
client.init(getConfiguration());
client.start();
TestTimelineClient.mockEntityClientResponse(spyTimelineWriterRef.get(),
null, false, true);
try {
UserGroupInformation ugi = mock(UserGroupInformation.class);
when(ugi.getShortUserName()).thenReturn("user1");
// verify no ClientHandlerException get thrown out.
am.publishContainerEndEvent(client, ContainerStatus.newInstance(
BuilderUtils.newContainerId(1, 1, 1, 1), ContainerState.COMPLETE, "",
1), "domainId", ugi);
} finally {
client.stop();
}
}
@Test
public void testContainerLaunchFailureHandling() throws Exception {
String[] args = createArgumentsWithAppName(
"--num_containers",
"2",
"--shell_command",
getListCommand(),
"--master_memory",
"512",
"--container_memory",
"128"
);
LOG.info("Initializing DS Client");
setAndGetDSClient(ContainerLaunchFailAppMaster.class.getName(),
new Configuration(getYarnClusterConfiguration()));
Assert.assertTrue(getDSClient().init(args));
LOG.info("Running DS Client");
Assert.assertFalse(getDSClient().run());
}
@Test
public void testDebugFlag() throws Exception {
String[] args = createArgumentsWithAppName(
"--num_containers",
"2",
"--shell_command",
getListCommand(),
"--master_memory",
"512",
"--master_vcores",
"2",
"--container_memory",
"128",
"--container_vcores",
"1",
"--debug"
);
LOG.info("Initializing DS Client");
setAndGetDSClient(new Configuration(getYarnClusterConfiguration()));
Assert.assertTrue(getDSClient().init(args));
LOG.info("Running DS Client");
Assert.assertTrue(getDSClient().run());
}
private int verifyContainerLog(int containerNum,
List<String> expectedContent, boolean count, String expectedWord) {
File logFolder =
new File(getNodeManager(0).getConfig()
.get(YarnConfiguration.NM_LOG_DIRS,
YarnConfiguration.DEFAULT_NM_LOG_DIRS));
File[] listOfFiles = logFolder.listFiles();
Assert.assertNotNull(listOfFiles);
int currentContainerLogFileIndex = -1;
for (int i = listOfFiles.length - 1; i >= 0; i--) {
if (listOfFiles[i].listFiles().length == containerNum + 1) {
currentContainerLogFileIndex = i;
break;
}
}
Assert.assertTrue(currentContainerLogFileIndex != -1);
File[] containerFiles =
listOfFiles[currentContainerLogFileIndex].listFiles();
int numOfWords = 0;
for (File containerFile : containerFiles) {
if (containerFile == null) {
continue;
}
for (File output : containerFile.listFiles()) {
if (output.getName().trim().contains("stdout")) {
List<String> stdOutContent = new ArrayList<>();
try (BufferedReader br = new BufferedReader(new FileReader(output))) {
String sCurrentLine;
int numOfline = 0;
while ((sCurrentLine = br.readLine()) != null) {
if (count) {
if (sCurrentLine.contains(expectedWord)) {
numOfWords++;
}
} else if (output.getName().trim().equals("stdout")) {
if (!Shell.WINDOWS) {
Assert.assertEquals("The current is" + sCurrentLine,
expectedContent.get(numOfline), sCurrentLine.trim());
numOfline++;
} else {
stdOutContent.add(sCurrentLine.trim());
}
}
}
/* By executing bat script using cmd /c,
* it will output all contents from bat script first
* It is hard for us to do check line by line
* Simply check whether output from bat file contains
* all the expected messages
*/
if (Shell.WINDOWS && !count
&& output.getName().trim().equals("stdout")) {
Assert.assertTrue(stdOutContent.containsAll(expectedContent));
}
} catch (IOException e) {
LOG.error("Exception reading the buffer", e);
}
}
}
}
return numOfWords;
}
@Test
public void testDistributedShellResourceProfiles() throws Exception {
int appNameCounter = 0;
String[][] args = {
createArgsWithPostFix(appNameCounter++,
"--num_containers", "1", "--shell_command",
getListCommand(), "--container_resource_profile",
"maximum"),
createArgsWithPostFix(appNameCounter++,
"--num_containers", "1", "--shell_command",
getListCommand(), "--master_resource_profile",
"default"),
createArgsWithPostFix(appNameCounter++,
"--num_containers", "1", "--shell_command",
getListCommand(), "--master_resource_profile",
"default", "--container_resource_profile", "maximum"),
};
for (int i = 0; i < args.length; ++i) {
LOG.info("Initializing DS Client[{}]", i);
setAndGetDSClient(new Configuration(getYarnClusterConfiguration()));
Assert.assertTrue(getDSClient().init(args[i]));
LOG.info("Running DS Client[{}]", i);
LambdaTestUtils.intercept(Exception.class,
() -> getDSClient().run());
}
}
@Test
public void testDSShellWithOpportunisticContainers() throws Exception {
setAndGetDSClient(new Configuration(getYarnClusterConfiguration()));
String[] args = createArgumentsWithAppName(
"--num_containers",
"2",
"--master_memory",
"512",
"--master_vcores",
"2",
"--container_memory",
"128",
"--container_vcores",
"1",
"--shell_command",
"date",
"--container_type",
"OPPORTUNISTIC"
);
assertTrue(getDSClient().init(args));
assertTrue(getDSClient().run());
}
@Test(expected = ResourceNotFoundException.class)
public void testDistributedShellAMResourcesWithUnknownResource()
throws Exception {
String[] args = createArgumentsWithAppName(
"--num_containers",
"1",
"--shell_command",
getListCommand(),
"--master_resources",
"unknown-resource=5"
);
setAndGetDSClient(new Configuration(getYarnClusterConfiguration()));
assertTrue(getDSClient().init(args));
getDSClient().run();
}
@Test(expected = IllegalArgumentException.class)
public void testDistributedShellNonExistentQueue()
throws Exception {
String[] args = createArgumentsWithAppName(
"--num_containers",
"1",
"--shell_command",
getListCommand(),
"--queue",
"non-existent-queue"
);
setAndGetDSClient(new Configuration(getYarnClusterConfiguration()));
assertTrue(getDSClient().init(args));
getDSClient().run();
}
@Test
public void testDistributedShellWithSingleFileLocalization()
throws Exception {
String[] args = createArgumentsWithAppName(
"--num_containers",
"1",
"--shell_command",
getCatCommand(),
"--localize_files",
"./src/test/resources/a.txt",
"--shell_args",
"a.txt"
);
setAndGetDSClient(new Configuration(getYarnClusterConfiguration()));
assertTrue(getDSClient().init(args));
assertTrue("Client exited with an error", getDSClient().run());
}
@Test
public void testDistributedShellWithMultiFileLocalization()
throws Exception {
String[] args = createArgumentsWithAppName(
"--num_containers",
"1",
"--shell_command",
getCatCommand(),
"--localize_files",
"./src/test/resources/a.txt,./src/test/resources/b.txt",
"--shell_args",
"a.txt b.txt"
);
setAndGetDSClient(new Configuration(getYarnClusterConfiguration()));
assertTrue(getDSClient().init(args));
assertTrue("Client exited with an error", getDSClient().run());
}
@Test(expected = UncheckedIOException.class)
public void testDistributedShellWithNonExistentFileLocalization()
throws Exception {
String[] args = createArgumentsWithAppName(
"--num_containers",
"1",
"--shell_command",
getCatCommand(),
"--localize_files",
"/non/existing/path/file.txt",
"--shell_args",
"file.txt"
);
setAndGetDSClient(new Configuration(getYarnClusterConfiguration()));
assertTrue(getDSClient().init(args));
assertTrue(getDSClient().run());
}
@Test
public void testDistributedShellCleanup()
throws Exception {
String[] args = createArgumentsWithAppName(
"--num_containers",
"1",
"--shell_command",
getListCommand()
);
Configuration config = new Configuration(getYarnClusterConfiguration());
setAndGetDSClient(config);
assertTrue(getDSClient().init(args));
assertTrue(getDSClient().run());
ApplicationId appId = getDSClient().getAppId();
String relativePath =
ApplicationMaster.getRelativePath(generateAppName(),
appId.toString(), "");
FileSystem fs1 = FileSystem.get(config);
Path path = new Path(fs1.getHomeDirectory(), relativePath);
GenericTestUtils.waitFor(() -> {
try {
return !fs1.exists(path);
} catch (IOException e) {
return false;
}
}, 10, 60000);
assertFalse("Distributed Shell Cleanup failed", fs1.exists(path));
}
@Override
protected void customizeConfiguration(
YarnConfiguration config) throws Exception {
config.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY,
CommonConfigurationKeysPublic.FS_DEFAULT_NAME_DEFAULT);
}
private static File getBaseDirForTest() {
return new File("target", TestDSTimelineV10.class.getName());
}
}

View File

@ -0,0 +1,100 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.applications.distributedshell;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.timeline.PluginStoreTestUtils;
/**
* Unit tests implementations for distributed shell on TimeLineV1.5.
*/
public class TestDSTimelineV15 extends DistributedShellBaseTest {
private static final Logger LOG =
LoggerFactory.getLogger(TestDSTimelineV15.class);
@Override
protected float getTimelineVersion() {
return 1.5f;
}
@Override
protected void customizeConfiguration(
YarnConfiguration config) throws Exception {
setUpHDFSCluster();
PluginStoreTestUtils.prepareFileSystemForPluginStore(
getHDFSCluster().getFileSystem());
PluginStoreTestUtils.prepareConfiguration(config, getHDFSCluster());
config.set(YarnConfiguration.TIMELINE_SERVICE_ENTITY_GROUP_PLUGIN_CLASSES,
DistributedShellTimelinePlugin.class.getName());
}
@Override
protected void checkTimeline(ApplicationId appId,
boolean defaultFlow, boolean haveDomain,
ApplicationReport appReport) throws Exception {
long scanInterval = getConfiguration().getLong(
YarnConfiguration
.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SCAN_INTERVAL_SECONDS,
YarnConfiguration
.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SCAN_INTERVAL_SECONDS_DEFAULT
);
Path doneDir = new Path(
YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_DONE_DIR_DEFAULT
);
// Wait till the data is moved to done dir, or timeout and fail
AtomicReference<Exception> exceptionRef = new AtomicReference<>(null);
GenericTestUtils.waitFor(() -> {
try {
RemoteIterator<FileStatus> iterApps =
getHDFSCluster().getFileSystem().listStatusIterator(doneDir);
return (iterApps.hasNext());
} catch (Exception e) {
exceptionRef.set(e);
LOG.error("Exception listing Done Dir", e);
return true;
}
}, scanInterval * 2, TEST_TIME_WINDOW_EXPIRE);
Assert.assertNull("Exception in getting listing status",
exceptionRef.get());
super.checkTimeline(appId, defaultFlow, haveDomain, appReport);
}
@Test
public void testDSShellWithDomain() throws Exception {
baseTestDSShell(true);
}
@Test
public void testDSShellWithoutDomain() throws Exception {
baseTestDSShell(false);
}
}

View File

@ -0,0 +1,484 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.applications.distributedshell;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerReport;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster.DSEvent;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants;
import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants;
import org.apache.hadoop.yarn.server.timelineservice.collector.PerNodeTimelineCollectorsAuxService;
import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineReaderImpl;
import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
/**
* Unit tests implementations for distributed shell on TimeLineV2.
*/
public class TestDSTimelineV20 extends DistributedShellBaseTest {
private static final Logger LOG =
LoggerFactory.getLogger(TestDSTimelineV20.class);
private static final String TIMELINE_AUX_SERVICE_NAME = "timeline_collector";
@Override
protected float getTimelineVersion() {
return 2.0f;
}
@Override
protected void customizeConfiguration(
YarnConfiguration config) throws Exception {
// disable v1 timeline server since we no longer have a server here
// enable aux-service based timeline aggregators
config.set(YarnConfiguration.NM_AUX_SERVICES, TIMELINE_AUX_SERVICE_NAME);
config.set(YarnConfiguration.NM_AUX_SERVICES + "." +
TIMELINE_AUX_SERVICE_NAME + ".class",
PerNodeTimelineCollectorsAuxService.class.getName());
config.setClass(YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS,
FileSystemTimelineWriterImpl.class,
org.apache.hadoop.yarn.server.timelineservice.storage.
TimelineWriter.class);
setTimelineV2StorageDir();
// set the file system timeline writer storage directory
config.set(FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_DIR_ROOT,
getTimelineV2StorageDir());
}
@Test
public void testDSShellWithEnforceExecutionType() throws Exception {
YarnClient yarnClient = null;
AtomicReference<Throwable> thrownError = new AtomicReference<>(null);
AtomicReference<List<ContainerReport>> containersListRef =
new AtomicReference<>(null);
AtomicReference<ApplicationAttemptId> appAttemptIdRef =
new AtomicReference<>(null);
AtomicReference<ApplicationAttemptReport> appAttemptReportRef =
new AtomicReference<>(null);
String[] args = createArgumentsWithAppName(
"--num_containers",
"2",
"--master_memory",
"512",
"--master_vcores",
"2",
"--container_memory",
"128",
"--container_vcores",
"1",
"--shell_command",
getListCommand(),
"--container_type",
"OPPORTUNISTIC",
"--enforce_execution_type"
);
try {
setAndGetDSClient(new Configuration(getYarnClusterConfiguration()));
getDSClient().init(args);
Thread dsClientRunner = new Thread(() -> {
try {
getDSClient().run();
} catch (Exception e) {
throw new RuntimeException(e);
}
});
dsClientRunner.start();
yarnClient = YarnClient.createYarnClient();
yarnClient.init(new Configuration(getYarnClusterConfiguration()));
yarnClient.start();
// expecting three containers including the AM container.
waitForContainersLaunch(yarnClient, 3, appAttemptReportRef,
containersListRef, appAttemptIdRef, thrownError);
if (thrownError.get() != null) {
Assert.fail(thrownError.get().getMessage());
}
ContainerId amContainerId = appAttemptReportRef.get().getAMContainerId();
for (ContainerReport container : containersListRef.get()) {
if (!container.getContainerId().equals(amContainerId)) {
Assert.assertEquals(container.getExecutionType(),
ExecutionType.OPPORTUNISTIC);
}
}
} catch (Exception e) {
LOG.error("Job execution with enforce execution type failed.", e);
Assert.fail("Exception. " + e.getMessage());
} finally {
if (yarnClient != null) {
yarnClient.stop();
}
}
}
@Test
public void testDistributedShellWithResources() throws Exception {
doTestDistributedShellWithResources(false);
}
@Test
public void testDistributedShellWithResourcesWithLargeContainers()
throws Exception {
doTestDistributedShellWithResources(true);
}
private void doTestDistributedShellWithResources(boolean largeContainers)
throws Exception {
AtomicReference<Throwable> thrownExceptionRef =
new AtomicReference<>(null);
AtomicReference<List<ContainerReport>> containersListRef =
new AtomicReference<>(null);
AtomicReference<ApplicationAttemptId> appAttemptIdRef =
new AtomicReference<>(null);
AtomicReference<ApplicationAttemptReport> appAttemptReportRef =
new AtomicReference<>(null);
Resource clusterResource = getYarnCluster().getResourceManager()
.getResourceScheduler().getClusterResource();
String masterMemoryString = "1 Gi";
String containerMemoryString = "512 Mi";
long[] memVars = {1024, 512};
YarnClient yarnClient = null;
Assume.assumeTrue("The cluster doesn't have enough memory for this test",
clusterResource.getMemorySize() >= memVars[0] + memVars[1]);
Assume.assumeTrue("The cluster doesn't have enough cores for this test",
clusterResource.getVirtualCores() >= 2);
if (largeContainers) {
memVars[0] = clusterResource.getMemorySize() * 2 / 3;
memVars[0] = memVars[0] - memVars[0] % MIN_ALLOCATION_MB;
masterMemoryString = memVars[0] + "Mi";
memVars[1] = clusterResource.getMemorySize() / 3;
memVars[1] = memVars[1] - memVars[1] % MIN_ALLOCATION_MB;
containerMemoryString = String.valueOf(memVars[1]);
}
String[] args = createArgumentsWithAppName(
"--num_containers",
"2",
"--shell_command",
getListCommand(),
"--master_resources",
"memory=" + masterMemoryString + ",vcores=1",
"--container_resources",
"memory=" + containerMemoryString + ",vcores=1"
);
LOG.info("Initializing DS Client");
setAndGetDSClient(new Configuration(getYarnClusterConfiguration()));
Assert.assertTrue(getDSClient().init(args));
LOG.info("Running DS Client");
final AtomicBoolean result = new AtomicBoolean(false);
Thread dsClientRunner = new Thread(() -> {
try {
result.set(getDSClient().run());
} catch (Exception e) {
throw new RuntimeException(e);
}
});
dsClientRunner.start();
try {
yarnClient = YarnClient.createYarnClient();
yarnClient.init(new Configuration(getYarnClusterConfiguration()));
yarnClient.start();
// expecting two containers.
waitForContainersLaunch(yarnClient, 2, appAttemptReportRef,
containersListRef, appAttemptIdRef, thrownExceptionRef);
if (thrownExceptionRef.get() != null) {
Assert.fail(thrownExceptionRef.get().getMessage());
}
ContainerId amContainerId = appAttemptReportRef.get().getAMContainerId();
ContainerReport report = yarnClient.getContainerReport(amContainerId);
Resource masterResource = report.getAllocatedResource();
Assert.assertEquals(memVars[0], masterResource.getMemorySize());
Assert.assertEquals(1, masterResource.getVirtualCores());
for (ContainerReport container : containersListRef.get()) {
if (!container.getContainerId().equals(amContainerId)) {
Resource containerResource = container.getAllocatedResource();
Assert.assertEquals(memVars[1],
containerResource.getMemorySize());
Assert.assertEquals(1, containerResource.getVirtualCores());
}
}
} finally {
LOG.info("Signaling Client to Stop");
if (yarnClient != null) {
LOG.info("Stopping yarnClient service");
yarnClient.stop();
}
}
}
@Test
public void testDSShellWithoutDomain() throws Exception {
baseTestDSShell(false);
}
@Test
public void testDSShellWithoutDomainDefaultFlow() throws Exception {
baseTestDSShell(false, true);
}
@Test
public void testDSShellWithoutDomainCustomizedFlow() throws Exception {
baseTestDSShell(false, false);
}
@Override
protected String[] appendFlowArgsForTestDSShell(String[] args,
boolean defaultFlow) {
if (!defaultFlow) {
String[] flowArgs = {
"--flow_name",
"test_flow_name",
"--flow_version",
"test_flow_version",
"--flow_run_id",
"12345678"
};
args = mergeArgs(args, flowArgs);
}
return args;
}
@Override
protected void checkTimeline(ApplicationId appId, boolean defaultFlow,
boolean haveDomain, ApplicationReport appReport) throws Exception {
LOG.info("Started {}#checkTimeline()", getClass().getCanonicalName());
// For PoC check using the file-based timeline writer (YARN-3264)
String tmpRoot = getTimelineV2StorageDir() + File.separator + "entities"
+ File.separator;
File tmpRootFolder = new File(tmpRoot);
try {
Assert.assertTrue(tmpRootFolder.isDirectory());
String basePath = tmpRoot +
YarnConfiguration.DEFAULT_RM_CLUSTER_ID + File.separator +
UserGroupInformation.getCurrentUser().getShortUserName() +
(defaultFlow ?
File.separator + appReport.getName() + File.separator +
TimelineUtils.DEFAULT_FLOW_VERSION + File.separator +
appReport.getStartTime() + File.separator :
File.separator + "test_flow_name" + File.separator +
"test_flow_version" + File.separator + "12345678" +
File.separator) +
appId.toString();
LOG.info("basePath for appId {}: {}", appId, basePath);
// for this test, we expect DS_APP_ATTEMPT AND DS_CONTAINER dirs
// Verify DS_APP_ATTEMPT entities posted by the client
// there will be at least one attempt, look for that file
String appTimestampFileName =
String.format("appattempt_%d_000%d_000001%s",
appId.getClusterTimestamp(), appId.getId(),
FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION);
File dsAppAttemptEntityFile = verifyEntityTypeFileExists(basePath,
"DS_APP_ATTEMPT", appTimestampFileName);
// Check if required events are published and same idprefix is sent for
// on each publish.
verifyEntityForTimeline(dsAppAttemptEntityFile,
DSEvent.DS_APP_ATTEMPT_START.toString(), 1, 1, 0, true);
// to avoid race condition of testcase, at least check 40 times with
// sleep of 50ms
verifyEntityForTimeline(dsAppAttemptEntityFile,
DSEvent.DS_APP_ATTEMPT_END.toString(), 1, 40, 50, true);
// Verify DS_CONTAINER entities posted by the client.
String containerTimestampFileName =
String.format("container_%d_000%d_01_000002.thist",
appId.getClusterTimestamp(), appId.getId());
File dsContainerEntityFile = verifyEntityTypeFileExists(basePath,
"DS_CONTAINER", containerTimestampFileName);
// Check if required events are published and same idprefix is sent for
// on each publish.
verifyEntityForTimeline(dsContainerEntityFile,
DSEvent.DS_CONTAINER_START.toString(), 1, 1, 0, true);
// to avoid race condition of testcase, at least check 40 times with
// sleep of 50ms.
verifyEntityForTimeline(dsContainerEntityFile,
DSEvent.DS_CONTAINER_END.toString(), 1, 40, 50, true);
// Verify NM posting container metrics info.
String containerMetricsTimestampFileName =
String.format("container_%d_000%d_01_000001%s",
appId.getClusterTimestamp(), appId.getId(),
FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION);
File containerEntityFile = verifyEntityTypeFileExists(basePath,
TimelineEntityType.YARN_CONTAINER.toString(),
containerMetricsTimestampFileName);
verifyEntityForTimeline(containerEntityFile,
ContainerMetricsConstants.CREATED_EVENT_TYPE, 1, 1, 0, true);
// to avoid race condition of testcase, at least check 40 times with
// sleep of 50ms
verifyEntityForTimeline(containerEntityFile,
ContainerMetricsConstants.FINISHED_EVENT_TYPE, 1, 40, 50, true);
// Verify RM posting Application life cycle Events are getting published
String appMetricsTimestampFileName =
String.format("application_%d_000%d%s",
appId.getClusterTimestamp(), appId.getId(),
FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION);
File appEntityFile =
verifyEntityTypeFileExists(basePath,
TimelineEntityType.YARN_APPLICATION.toString(),
appMetricsTimestampFileName);
// No need to check idprefix for app.
verifyEntityForTimeline(appEntityFile,
ApplicationMetricsConstants.CREATED_EVENT_TYPE, 1, 1, 0, false);
// to avoid race condition of testcase, at least check 40 times with
// sleep of 50ms
verifyEntityForTimeline(appEntityFile,
ApplicationMetricsConstants.FINISHED_EVENT_TYPE, 1, 40, 50, false);
// Verify RM posting AppAttempt life cycle Events are getting published
String appAttemptMetricsTimestampFileName =
String.format("appattempt_%d_000%d_000001%s",
appId.getClusterTimestamp(), appId.getId(),
FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION);
File appAttemptEntityFile =
verifyEntityTypeFileExists(basePath,
TimelineEntityType.YARN_APPLICATION_ATTEMPT.toString(),
appAttemptMetricsTimestampFileName);
verifyEntityForTimeline(appAttemptEntityFile,
AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE, 1, 1, 0, true);
verifyEntityForTimeline(appAttemptEntityFile,
AppAttemptMetricsConstants.FINISHED_EVENT_TYPE, 1, 1, 0, true);
} finally {
try {
FileUtils.deleteDirectory(tmpRootFolder.getParentFile());
} catch (Exception ex) {
// the recursive delete can throw an exception when one of the file
// does not exist.
LOG.warn("Exception deleting a file/subDirectory: {}", ex.getMessage());
}
}
}
/**
* Checks the events and idprefix published for an entity.
*
* @param entityFile Entity file.
* @param expectedEvent Expected event Id.
* @param numOfExpectedEvent Number of expected occurrences of expected event
* id.
* @param checkTimes Number of times to check.
* @param sleepTime Sleep time for each iteration.
* @param checkIdPrefix Whether to check idprefix.
* @throws IOException if entity file reading fails.
* @throws InterruptedException if sleep is interrupted.
*/
private void verifyEntityForTimeline(File entityFile, String expectedEvent,
long numOfExpectedEvent, int checkTimes, long sleepTime,
boolean checkIdPrefix) throws Exception {
AtomicReference<Throwable> thrownExceptionRef = new AtomicReference<>(null);
GenericTestUtils.waitFor(() -> {
String strLine;
long actualCount = 0;
long idPrefix = -1;
try (BufferedReader reader =
new BufferedReader(new FileReader(entityFile))) {
while ((strLine = reader.readLine()) != null) {
String entityLine = strLine.trim();
if (entityLine.isEmpty()) {
continue;
}
if (entityLine.contains(expectedEvent)) {
actualCount++;
}
if (expectedEvent.equals(DSEvent.DS_CONTAINER_END.toString())
&& entityLine.contains(expectedEvent)) {
TimelineEntity entity = FileSystemTimelineReaderImpl.
getTimelineRecordFromJSON(entityLine, TimelineEntity.class);
TimelineEvent event = entity.getEvents().pollFirst();
Assert.assertNotNull(event);
Assert.assertTrue("diagnostics",
event.getInfo().containsKey(ApplicationMaster.DIAGNOSTICS));
}
if (checkIdPrefix) {
TimelineEntity entity = FileSystemTimelineReaderImpl.
getTimelineRecordFromJSON(entityLine, TimelineEntity.class);
Assert.assertTrue("Entity ID prefix expected to be > 0",
entity.getIdPrefix() > 0);
if (idPrefix == -1) {
idPrefix = entity.getIdPrefix();
} else {
Assert.assertEquals(
"Entity ID prefix should be same across each publish of "
+ "same entity", idPrefix, entity.getIdPrefix());
}
}
}
} catch (Throwable e) {
LOG.error("Exception is waiting on application report", e);
thrownExceptionRef.set(e);
return true;
}
return (numOfExpectedEvent == actualCount);
}, sleepTime, (checkTimes + 1) * sleepTime);
if (thrownExceptionRef.get() != null) {
Assert.fail("verifyEntityForTimeline failed "
+ thrownExceptionRef.get().getMessage());
}
}
private File verifyEntityTypeFileExists(String basePath, String entityType,
String entityFileName) {
String outputDirPathForEntity =
basePath + File.separator + entityType + File.separator;
LOG.info("verifyEntityTypeFileExists output path for entityType {}: {}",
entityType, outputDirPathForEntity);
File outputDirForEntity = new File(outputDirPathForEntity);
Assert.assertTrue(outputDirForEntity.isDirectory());
String entityFilePath = outputDirPathForEntity + entityFileName;
File entityFile = new File(entityFilePath);
Assert.assertTrue(entityFile.exists());
return entityFile;
}
}

View File

@ -17,74 +17,108 @@
*/
package org.apache.hadoop.yarn.applications.distributedshell;
import java.io.IOException;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.junit.rules.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
public class TestDSWithMultipleNodeManager {
private static final Logger LOG =
LoggerFactory.getLogger(TestDSWithMultipleNodeManager.class);
static final int NUM_NMS = 2;
TestDistributedShell distShellTest;
private static final int NUM_NMS = 2;
@Rule
public TestName name = new TestName();
@Rule
public Timeout globalTimeout =
new Timeout(DistributedShellBaseTest.TEST_TIME_OUT,
TimeUnit.MILLISECONDS);
private DistributedShellBaseTest distShellTest;
private Client dsClient;
@BeforeClass
public static void setupUnitTests() throws Exception {
TestDSTimelineV10.setupUnitTests();
}
@AfterClass
public static void tearDownUnitTests() throws Exception {
TestDSTimelineV10.tearDownUnitTests();
}
@Before
public void setup() throws Exception {
distShellTest = new TestDistributedShell();
distShellTest.setupInternal(NUM_NMS);
distShellTest = new TestDSTimelineV10();
distShellTest.setupInternal(NUM_NMS, new YarnConfiguration());
}
@After
public void tearDown() throws Exception {
distShellTest.tearDown();
if (dsClient != null) {
dsClient.sendStopSignal();
dsClient = null;
}
if (distShellTest != null) {
distShellTest.tearDown();
distShellTest = null;
}
}
private void initializeNodeLabels() throws IOException {
RMContext rmContext = distShellTest.yarnCluster.getResourceManager(0).getRMContext();
RMContext rmContext = distShellTest.getResourceManager(0).getRMContext();
// Setup node labels
RMNodeLabelsManager labelsMgr = rmContext.getNodeLabelManager();
Set<String> labels = new HashSet<String>();
Set<String> labels = new HashSet<>();
labels.add("x");
labelsMgr.addToCluserNodeLabelsWithDefaultExclusivity(labels);
// Setup queue access to node labels
distShellTest.conf.set(PREFIX + "root.accessible-node-labels", "x");
distShellTest.conf.set(PREFIX + "root.accessible-node-labels.x.capacity",
"100");
distShellTest.conf.set(PREFIX + "root.default.accessible-node-labels", "x");
distShellTest.conf.set(PREFIX
distShellTest.setConfiguration(PREFIX + "root.accessible-node-labels", "x");
distShellTest.setConfiguration(
PREFIX + "root.accessible-node-labels.x.capacity", "100");
distShellTest.setConfiguration(
PREFIX + "root.default.accessible-node-labels", "x");
distShellTest.setConfiguration(PREFIX
+ "root.default.accessible-node-labels.x.capacity", "100");
rmContext.getScheduler().reinitialize(distShellTest.conf, rmContext);
rmContext.getScheduler().reinitialize(distShellTest.getConfiguration(),
rmContext);
// Fetch node-ids from yarn cluster
NodeId[] nodeIds = new NodeId[NUM_NMS];
for (int i = 0; i < NUM_NMS; i++) {
NodeManager mgr = distShellTest.yarnCluster.getNodeManager(i);
NodeManager mgr = distShellTest.getNodeManager(i);
nodeIds[i] = mgr.getNMContext().getNodeId();
}
@ -92,264 +126,312 @@ public class TestDSWithMultipleNodeManager {
labelsMgr.addLabelsToNode(ImmutableMap.of(nodeIds[1], labels));
}
@Test(timeout=90000)
@Test
public void testDSShellWithNodeLabelExpression() throws Exception {
NMContainerMonitor containerMonitorRunner = null;
initializeNodeLabels();
// Start NMContainerMonitor
NMContainerMonitor mon = new NMContainerMonitor();
Thread t = new Thread(mon);
t.start();
try {
// Start NMContainerMonitor
containerMonitorRunner = new NMContainerMonitor();
containerMonitorRunner.start();
// Submit a job which will sleep for 60 sec
String[] args = {
"--jar",
TestDistributedShell.APPMASTER_JAR,
"--num_containers",
"4",
"--shell_command",
"sleep",
"--shell_args",
"15",
"--master_memory",
"512",
"--master_vcores",
"2",
"--container_memory",
"128",
"--container_vcores",
"1",
"--node_label_expression",
"x"
};
// Submit a job which will sleep for 60 sec
String[] args =
DistributedShellBaseTest.createArguments(() -> generateAppName(),
"--num_containers",
"4",
"--shell_command",
"sleep",
"--shell_args",
"15",
"--master_memory",
"512",
"--master_vcores",
"2",
"--container_memory",
"128",
"--container_vcores",
"1",
"--node_label_expression",
"x"
);
LOG.info("Initializing DS Client");
final Client client =
new Client(new Configuration(distShellTest.yarnCluster.getConfig()));
boolean initSuccess = client.init(args);
Assert.assertTrue(initSuccess);
LOG.info("Running DS Client");
boolean result = client.run();
LOG.info("Client run completed. Result=" + result);
LOG.info("Initializing DS Client");
dsClient =
new Client(
new Configuration(distShellTest.getYarnClusterConfiguration()));
Assert.assertTrue(dsClient.init(args));
LOG.info("Running DS Client");
boolean result = dsClient.run();
LOG.info("Client run completed. Result={}", result);
t.interrupt();
containerMonitorRunner.stopMonitoring();
// Check maximum number of containers on each NMs
int[] maxRunningContainersOnNMs = mon.getMaxRunningContainersReport();
// Check no container allocated on NM[0]
Assert.assertEquals(0, maxRunningContainersOnNMs[0]);
// Check there're some containers allocated on NM[1]
Assert.assertTrue(maxRunningContainersOnNMs[1] > 0);
// Check maximum number of containers on each NMs
int[] maxRunningContainersOnNMs =
containerMonitorRunner.getMaxRunningContainersReport();
// Check no container allocated on NM[0]
Assert.assertEquals(0, maxRunningContainersOnNMs[0]);
// Check there are some containers allocated on NM[1]
Assert.assertTrue(maxRunningContainersOnNMs[1] > 0);
} finally {
if (containerMonitorRunner != null) {
containerMonitorRunner.stopMonitoring();
containerMonitorRunner.join();
}
}
}
@Test(timeout = 90000)
@Test
public void testDistributedShellWithPlacementConstraint()
throws Exception {
NMContainerMonitor mon = new NMContainerMonitor();
Thread t = new Thread(mon);
t.start();
NMContainerMonitor containerMonitorRunner = null;
String[] args =
DistributedShellBaseTest.createArguments(() -> generateAppName(),
"1",
"--shell_command",
DistributedShellBaseTest.getSleepCommand(15),
"--placement_spec",
"zk(1),NOTIN,NODE,zk:spark(1),NOTIN,NODE,zk"
);
try {
containerMonitorRunner = new NMContainerMonitor();
containerMonitorRunner.start();
String[] args = {
"--jar",
distShellTest.APPMASTER_JAR,
"1",
"--shell_command",
distShellTest.getSleepCommand(15),
"--placement_spec",
"zk(1),NOTIN,NODE,zk:spark(1),NOTIN,NODE,zk"
};
LOG.info("Initializing DS Client");
final Client client =
new Client(new Configuration(distShellTest.yarnCluster.getConfig()));
boolean initSuccess = client.init(args);
Assert.assertTrue(initSuccess);
LOG.info("Running DS Client");
boolean result = client.run();
LOG.info("Client run completed. Result=" + result);
LOG.info("Initializing DS Client with args {}", Arrays.toString(args));
dsClient =
new Client(
new Configuration(distShellTest.getYarnClusterConfiguration()));
Assert.assertTrue(dsClient.init(args));
LOG.info("Running DS Client");
boolean result = dsClient.run();
LOG.info("Client run completed. Result={}", result);
t.interrupt();
containerMonitorRunner.stopMonitoring();
ConcurrentMap<ApplicationId, RMApp> apps = distShellTest.yarnCluster.
getResourceManager().getRMContext().getRMApps();
RMApp app = apps.values().iterator().next();
RMAppAttempt appAttempt = app.getAppAttempts().values().iterator().next();
NodeId masterNodeId = appAttempt.getMasterContainer().getNodeId();
NodeManager nm1 = distShellTest.yarnCluster.getNodeManager(0);
ConcurrentMap<ApplicationId, RMApp> apps =
distShellTest.getResourceManager().getRMContext().getRMApps();
RMApp app = apps.values().iterator().next();
RMAppAttempt appAttempt = app.getAppAttempts().values().iterator().next();
NodeId masterNodeId = appAttempt.getMasterContainer().getNodeId();
NodeManager nm1 = distShellTest.getNodeManager(0);
int expectedNM1Count = 1;
int expectedNM2Count = 1;
if (nm1.getNMContext().getNodeId().equals(masterNodeId)) {
expectedNM1Count++;
} else {
expectedNM2Count++;
int[] expectedNMsCount = new int[]{1, 1};
if (nm1.getNMContext().getNodeId().equals(masterNodeId)) {
expectedNMsCount[0]++;
} else {
expectedNMsCount[1]++;
}
int[] maxRunningContainersOnNMs =
containerMonitorRunner.getMaxRunningContainersReport();
Assert.assertEquals(expectedNMsCount[0], maxRunningContainersOnNMs[0]);
Assert.assertEquals(expectedNMsCount[1], maxRunningContainersOnNMs[1]);
} finally {
if (containerMonitorRunner != null) {
containerMonitorRunner.stopMonitoring();
containerMonitorRunner.join();
}
}
int[] maxRunningContainersOnNMs = mon.getMaxRunningContainersReport();
Assert.assertEquals(expectedNM1Count, maxRunningContainersOnNMs[0]);
Assert.assertEquals(expectedNM2Count, maxRunningContainersOnNMs[1]);
}
@Test(timeout = 90000)
@Test
public void testDistributedShellWithAllocationTagNamespace()
throws Exception {
NMContainerMonitor mon = new NMContainerMonitor();
Thread monitorThread = new Thread(mon);
monitorThread.start();
NMContainerMonitor containerMonitorRunner = null;
Client clientB = null;
YarnClient yarnClient = null;
String[] argsA = {
"--jar",
distShellTest.APPMASTER_JAR,
"--shell_command",
distShellTest.getSleepCommand(30),
"--placement_spec",
"bar(1),notin,node,bar"
};
final Client clientA =
new Client(new Configuration(distShellTest.yarnCluster.getConfig()));
clientA.init(argsA);
final AtomicBoolean resultA = new AtomicBoolean(false);
Thread t = new Thread() {
public void run() {
String[] argsA =
DistributedShellBaseTest.createArguments(() -> generateAppName("001"),
"--shell_command",
DistributedShellBaseTest.getSleepCommand(30),
"--placement_spec",
"bar(1),notin,node,bar"
);
String[] argsB =
DistributedShellBaseTest.createArguments(() -> generateAppName("002"),
"1",
"--shell_command",
DistributedShellBaseTest.getListCommand(),
"--placement_spec",
"foo(3),notin,node,all/bar"
);
try {
containerMonitorRunner = new NMContainerMonitor();
containerMonitorRunner.start();
dsClient =
new Client(
new Configuration(distShellTest.getYarnClusterConfiguration()));
dsClient.init(argsA);
Thread dsClientRunner = new Thread(() -> {
try {
resultA.set(clientA.run());
dsClient.run();
} catch (Exception e) {
throw new RuntimeException(e);
}
});
dsClientRunner.start();
NodeId taskContainerNodeIdA;
ConcurrentMap<ApplicationId, RMApp> apps;
AtomicReference<RMApp> appARef = new AtomicReference<>(null);
AtomicReference<NodeId> masterContainerNodeIdARef =
new AtomicReference<>(null);
int[] expectedNMCounts = new int[]{0, 0};
waitForExpectedNMsCount(expectedNMCounts, appARef,
masterContainerNodeIdARef);
NodeId nodeA = distShellTest.getNodeManager(0).getNMContext().
getNodeId();
NodeId nodeB = distShellTest.getNodeManager(1).getNMContext().
getNodeId();
Assert.assertEquals(2, (expectedNMCounts[0] + expectedNMCounts[1]));
if (expectedNMCounts[0] != expectedNMCounts[1]) {
taskContainerNodeIdA = masterContainerNodeIdARef.get();
} else {
taskContainerNodeIdA =
masterContainerNodeIdARef.get().equals(nodeA) ? nodeB : nodeA;
}
};
t.start();
NodeId masterContainerNodeIdA;
NodeId taskContainerNodeIdA;
ConcurrentMap<ApplicationId, RMApp> apps;
RMApp appA;
int expectedNM1Count = 0;
int expectedNM2Count = 0;
while (true) {
if ((expectedNM1Count + expectedNM2Count) < 2) {
expectedNM1Count = distShellTest.yarnCluster.getNodeManager(0).
getNMContext().getContainers().size();
expectedNM2Count = distShellTest.yarnCluster.getNodeManager(1).
getNMContext().getContainers().size();
continue;
clientB =
new Client(
new Configuration(distShellTest.getYarnClusterConfiguration()));
clientB.init(argsB);
Assert.assertTrue(clientB.run());
containerMonitorRunner.stopMonitoring();
apps = distShellTest.getResourceManager().getRMContext().getRMApps();
Iterator<RMApp> it = apps.values().iterator();
RMApp appB = it.next();
if (appARef.get().equals(appB)) {
appB = it.next();
}
apps = distShellTest.yarnCluster.getResourceManager().getRMContext().
getRMApps();
if (apps.isEmpty()) {
Thread.sleep(10);
continue;
LOG.info("Allocation Tag NameSpace Applications are={} and {}",
appARef.get().getApplicationId(), appB.getApplicationId());
RMAppAttempt appAttemptB =
appB.getAppAttempts().values().iterator().next();
NodeId masterContainerNodeIdB =
appAttemptB.getMasterContainer().getNodeId();
if (nodeA.equals(masterContainerNodeIdB)) {
expectedNMCounts[0]++;
} else {
expectedNMCounts[1]++;
}
appA = apps.values().iterator().next();
if (appA.getAppAttempts().isEmpty()) {
Thread.sleep(10);
continue;
if (nodeA.equals(taskContainerNodeIdA)) {
expectedNMCounts[1] += 3;
} else {
expectedNMCounts[0] += 3;
}
RMAppAttempt appAttemptA = appA.getAppAttempts().values().iterator().
next();
if (appAttemptA.getMasterContainer() == null) {
Thread.sleep(10);
continue;
int[] maxRunningContainersOnNMs =
containerMonitorRunner.getMaxRunningContainersReport();
Assert.assertEquals(expectedNMCounts[0], maxRunningContainersOnNMs[0]);
Assert.assertEquals(expectedNMCounts[1], maxRunningContainersOnNMs[1]);
try {
yarnClient = YarnClient.createYarnClient();
yarnClient.init(
new Configuration(distShellTest.getYarnClusterConfiguration()));
yarnClient.start();
yarnClient.killApplication(appARef.get().getApplicationId());
} catch (Exception e) {
// Ignore Exception while killing a job
LOG.warn("Exception killing the job: {}", e.getMessage());
}
} finally {
if (yarnClient != null) {
yarnClient.stop();
}
if (clientB != null) {
clientB.sendStopSignal();
}
if (containerMonitorRunner != null) {
containerMonitorRunner.stopMonitoring();
containerMonitorRunner.join();
}
masterContainerNodeIdA = appAttemptA.getMasterContainer().getNodeId();
break;
}
NodeId nodeA = distShellTest.yarnCluster.getNodeManager(0).getNMContext().
getNodeId();
NodeId nodeB = distShellTest.yarnCluster.getNodeManager(1).getNMContext().
getNodeId();
Assert.assertEquals(2, (expectedNM1Count + expectedNM2Count));
if (expectedNM1Count != expectedNM2Count) {
taskContainerNodeIdA = masterContainerNodeIdA;
} else {
taskContainerNodeIdA = masterContainerNodeIdA.equals(nodeA) ? nodeB :
nodeA;
}
String[] argsB = {
"--jar",
distShellTest.APPMASTER_JAR,
"1",
"--shell_command",
Shell.WINDOWS ? "dir" : "ls",
"--placement_spec",
"foo(3),notin,node,all/bar"
};
final Client clientB = new Client(new Configuration(distShellTest.
yarnCluster.getConfig()));
clientB.init(argsB);
boolean resultB = clientB.run();
Assert.assertTrue(resultB);
monitorThread.interrupt();
apps = distShellTest.yarnCluster.getResourceManager().getRMContext().
getRMApps();
Iterator<RMApp> it = apps.values().iterator();
RMApp appB = it.next();
if (appA.equals(appB)) {
appB = it.next();
}
LOG.info("Allocation Tag NameSpace Applications are=" + appA.
getApplicationId() + " and " + appB.getApplicationId());
RMAppAttempt appAttemptB = appB.getAppAttempts().values().iterator().
next();
NodeId masterContainerNodeIdB = appAttemptB.getMasterContainer().
getNodeId();
if (nodeA.equals(masterContainerNodeIdB)) {
expectedNM1Count += 1;
} else {
expectedNM2Count += 1;
}
if (nodeA.equals(taskContainerNodeIdA)) {
expectedNM2Count += 3;
} else {
expectedNM1Count += 3;
}
int[] maxRunningContainersOnNMs = mon.getMaxRunningContainersReport();
Assert.assertEquals(expectedNM1Count, maxRunningContainersOnNMs[0]);
Assert.assertEquals(expectedNM2Count, maxRunningContainersOnNMs[1]);
try {
YarnClient yarnClient = YarnClient.createYarnClient();
yarnClient.init(new Configuration(distShellTest.yarnCluster.
getConfig()));
yarnClient.start();
yarnClient.killApplication(appA.getApplicationId());
} catch (Exception e) {
// Ignore Exception while killing a job
}
}
protected String generateAppName() {
return generateAppName(null);
}
protected String generateAppName(String postFix) {
return name.getMethodName().replaceFirst("test", "")
.concat(postFix == null ? "" : "-" + postFix);
}
private void waitForExpectedNMsCount(int[] expectedNMCounts,
AtomicReference<RMApp> appARef,
AtomicReference<NodeId> masterContainerNodeIdARef) throws Exception {
GenericTestUtils.waitFor(() -> {
if ((expectedNMCounts[0] + expectedNMCounts[1]) < 2) {
expectedNMCounts[0] =
distShellTest.getNodeManager(0).getNMContext()
.getContainers().size();
expectedNMCounts[1] =
distShellTest.getNodeManager(1).getNMContext()
.getContainers().size();
return false;
}
ConcurrentMap<ApplicationId, RMApp> appIDsMap =
distShellTest.getResourceManager().getRMContext().getRMApps();
if (appIDsMap.isEmpty()) {
return false;
}
appARef.set(appIDsMap.values().iterator().next());
if (appARef.get().getAppAttempts().isEmpty()) {
return false;
}
RMAppAttempt appAttemptA =
appARef.get().getAppAttempts().values().iterator().next();
if (appAttemptA.getMasterContainer() == null) {
return false;
}
masterContainerNodeIdARef.set(
appAttemptA.getMasterContainer().getNodeId());
return true;
}, 10, 60000);
}
/**
* Monitor containers running on NMs
* Monitor containers running on NMs.
*/
class NMContainerMonitor implements Runnable {
class NMContainerMonitor extends Thread {
// The interval of milliseconds of sampling (500ms)
final static int SAMPLING_INTERVAL_MS = 500;
private final static int SAMPLING_INTERVAL_MS = 500;
// The maximum number of containers running on each NMs
int[] maxRunningContainersOnNMs = new int[NUM_NMS];
private final int[] maxRunningContainersOnNMs = new int[NUM_NMS];
private final Object quitSignal = new Object();
private volatile boolean isRunning = true;
@Override
public void run() {
while (true) {
while (isRunning) {
for (int i = 0; i < NUM_NMS; i++) {
int nContainers =
distShellTest.yarnCluster.getNodeManager(i).getNMContext()
distShellTest.getNodeManager(i).getNMContext()
.getContainers().size();
if (nContainers > maxRunningContainersOnNMs[i]) {
maxRunningContainersOnNMs[i] = nContainers;
}
}
try {
Thread.sleep(SAMPLING_INTERVAL_MS);
} catch (InterruptedException e) {
e.printStackTrace();
break;
synchronized (quitSignal) {
try {
if (!isRunning) {
break;
}
quitSignal.wait(SAMPLING_INTERVAL_MS);
} catch (InterruptedException e) {
LOG.warn("NMContainerMonitor interrupted");
isRunning = false;
break;
}
}
}
}
@ -357,5 +439,15 @@ public class TestDSWithMultipleNodeManager {
public int[] getMaxRunningContainersReport() {
return maxRunningContainersOnNMs;
}
public void stopMonitoring() {
if (!isRunning) {
return;
}
synchronized (quitSignal) {
isRunning = false;
quitSignal.notifyAll();
}
}
}
}