YARN-3792. Test case failures in TestDistributedShell and some issue fixes related to ATSV2 (Naganarasimha G R via sjlee)

(cherry picked from commit 84f37f1c7eefec6d139cbf091c50d6c06f734323)
This commit is contained in:
Sangjin Lee 2015-06-22 20:47:56 -07:00
parent 92d90c3a24
commit 22e7ae5771
10 changed files with 77 additions and 53 deletions

View File

@ -497,7 +497,7 @@ public boolean init(String[] args) throws ParseException {
}
if (cliParser.hasOption("flow_run_id")) {
try {
flowRunId = Long.valueOf(cliParser.getOptionValue("flow_run_id"));
flowRunId = Long.parseLong(cliParser.getOptionValue("flow_run_id"));
} catch (NumberFormatException e) {
throw new IllegalArgumentException(
"Flow run is not a valid long value", e);

View File

@ -60,6 +60,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
@ -129,7 +130,6 @@ protected void setupInternal(int numNodeManager) throws Exception {
private void setupInternal(int numNodeManager, float timelineVersion)
throws Exception {
LOG.info("Starting up YARN cluster");
conf = new YarnConfiguration();
@ -140,7 +140,6 @@ private void setupInternal(int numNodeManager, float timelineVersion)
boolean enableATSServer = true;
// disable aux-service based timeline aggregators
conf.set(YarnConfiguration.NM_AUX_SERVICES, "");
conf.set(YarnConfiguration.NM_VMEM_PMEM_RATIO, "8");
conf.set(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class.getName());
conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true);
@ -155,7 +154,9 @@ private void setupInternal(int numNodeManager, float timelineVersion)
conf.setBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED, true);
conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_CONTROL_RESOURCE_MONITORING,
true);
conf.setBoolean(YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED, true);
conf.setBoolean(YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_ENABLED,
true);
conf.setBoolean(YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED, false);
// ATS version specific settings
if (timelineVersion == 1.0f) {
@ -180,6 +181,9 @@ private void setupInternal(int numNodeManager, float timelineVersion)
conf.set(YarnConfiguration.NM_AUX_SERVICES, TIMELINE_AUX_SERVICE_NAME);
conf.set(YarnConfiguration.NM_AUX_SERVICES + "." + TIMELINE_AUX_SERVICE_NAME
+ ".class", PerNodeTimelineCollectorsAuxService.class.getName());
conf.setBoolean(YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED, true);
conf.setBoolean(YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_ENABLED,
false);
} else {
Assert.fail("Wrong timeline version number: " + timelineVersion);
}
@ -187,7 +191,7 @@ private void setupInternal(int numNodeManager, float timelineVersion)
if (yarnCluster == null) {
yarnCluster =
new MiniYARNCluster(TestDistributedShell.class.getSimpleName(), 1,
numNodeManager, 1, 1, enableATSServer);
numNodeManager, 1, 1);
yarnCluster.init(conf);
yarnCluster.start();
@ -390,13 +394,15 @@ public void run() {
if (checkHostname(appReport.getHost()) && appReport.getRpcPort() == -1) {
verified = true;
}
if (appReport.getYarnApplicationState() == YarnApplicationState.FINISHED) {
if (appReport.getYarnApplicationState() == YarnApplicationState.FINISHED
&& appReport.getFinalApplicationStatus() != FinalApplicationStatus.UNDEFINED) {
break;
}
}
Assert.assertTrue(errorMessage, verified);
t.join();
LOG.info("Client run completed. Result=" + result);
LOG.info("Client run completed for testDSShell. Result=" + result);
Assert.assertTrue(result.get());
if (timelineVersionWatcher.getTimelineVersion() == 1.5f) {
@ -477,9 +483,9 @@ private void checkTimelineV1(boolean haveDomain) throws Exception {
}
}
private void checkTimelineV2(
boolean haveDomain, ApplicationId appId, boolean defaultFlow)
throws Exception {
private void checkTimelineV2(boolean haveDomain, ApplicationId appId,
boolean defaultFlow) throws Exception {
LOG.info("Started checkTimelineV2 ");
// For PoC check in /tmp/timeline_service_data YARN-3264
String tmpRoot =
FileSystemTimelineWriterImpl.DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT
@ -530,12 +536,29 @@ private void checkTimelineV2(
verifyEntityTypeFileExists(basePath,
TimelineEntityType.YARN_APPLICATION.toString(),
appMetricsTimestampFileName);
verifyStringExistsSpecifiedTimes(appEntityFile,
ApplicationMetricsConstants.CREATED_EVENT_TYPE, 1,
"Application created event should be published atleast once");
verifyStringExistsSpecifiedTimes(appEntityFile,
ApplicationMetricsConstants.FINISHED_EVENT_TYPE, 1,
"Application finished event should be published atleast once");
Assert.assertEquals(
"Application created event should be published atleast once",
1,
getNumOfStringOccurences(appEntityFile,
ApplicationMetricsConstants.CREATED_EVENT_TYPE));
// to avoid race condition of testcase, atleast check 4 times with sleep
// of 500ms
long numOfStringOccurences = 0;
for (int i = 0; i < 4; i++) {
numOfStringOccurences =
getNumOfStringOccurences(appEntityFile,
ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
if (numOfStringOccurences > 0) {
break;
} else {
Thread.sleep(500l);
}
}
Assert.assertEquals(
"Application finished event should be published atleast once",
1,
numOfStringOccurences);
// Verify RM posting AppAttempt life cycle Events are getting published
String appAttemptMetricsTimestampFileName =
@ -546,12 +569,17 @@ private void checkTimelineV2(
verifyEntityTypeFileExists(basePath,
TimelineEntityType.YARN_APPLICATION_ATTEMPT.toString(),
appAttemptMetricsTimestampFileName);
verifyStringExistsSpecifiedTimes(appAttemptEntityFile,
AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE, 1,
"AppAttempt register event should be published atleast once");
verifyStringExistsSpecifiedTimes(appAttemptEntityFile,
AppAttemptMetricsConstants.FINISHED_EVENT_TYPE, 1,
"AppAttempt finished event should be published atleast once");
Assert.assertEquals(
"AppAttempt register event should be published atleast once",
1,
getNumOfStringOccurences(appAttemptEntityFile,
AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE));
Assert.assertEquals(
"AppAttempt finished event should be published atleast once",
1,
getNumOfStringOccurences(appAttemptEntityFile,
AppAttemptMetricsConstants.FINISHED_EVENT_TYPE));
} finally {
FileUtils.deleteDirectory(tmpRootFolder.getParentFile());
}
@ -570,8 +598,7 @@ private File verifyEntityTypeFileExists(String basePath, String entityType,
return entityFile;
}
private void verifyStringExistsSpecifiedTimes(File entityFile,
String searchString, long expectedNumOfTimes, String errorMsg)
private long getNumOfStringOccurences(File entityFile, String searchString)
throws IOException {
BufferedReader reader = null;
String strLine;
@ -585,7 +612,7 @@ private void verifyStringExistsSpecifiedTimes(File entityFile,
} finally {
reader.close();
}
Assert.assertEquals(errorMsg, expectedNumOfTimes, actualCount);
return actualCount;
}
/**
@ -1261,4 +1288,3 @@ private int verifyContainerLog(int containerNum,
return numOfWords;
}
}

View File

@ -30,7 +30,9 @@
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import com.google.common.collect.ImmutableMap;
@ -40,7 +42,7 @@ public class TestDistributedShellWithNodeLabels {
static final int NUM_NMS = 2;
TestDistributedShell distShellTest;
@Before
public void setup() throws Exception {
distShellTest = new TestDistributedShell();

View File

@ -418,6 +418,14 @@ public void putObjects(String path, MultivaluedMap<String, String> params,
// timelineServiceAddress could haven't be initialized yet
// or stale (only for new timeline service)
int retries = pollTimelineServiceAddress(this.maxServiceRetries);
if (timelineServiceAddress == null) {
String errMessage = "TimelineClient has reached to max retry times : "
+ this.maxServiceRetries
+ ", but failed to fetch timeline service address. Please verify"
+ " Timeline Auxillary Service is configured in all the NMs";
LOG.error(errMessage);
throw new YarnException(errMessage);
}
// timelineServiceAddress could be stale, add retry logic here.
boolean needRetry = true;

View File

@ -130,11 +130,11 @@ public ApplicationImpl(Dispatcher dispatcher, String user, String flowId,
context, -1);
Configuration conf = context.getConf();
if (YarnConfiguration.systemMetricsPublisherEnabled(conf)) {
createAndStartTimelienClient(conf);
createAndStartTimelineClient(conf);
}
}
private void createAndStartTimelienClient(Configuration conf) {
private void createAndStartTimelineClient(Configuration conf) {
// create and start timeline client
this.timelineClient = TimelineClient.createTimelineClient(appId);
timelineClient.init(conf);

View File

@ -96,11 +96,8 @@ public class ContainersMonitorImpl extends AbstractService implements
// For posting entities in new timeline service in a non-blocking way
// TODO replace with event loop in TimelineClient.
private static ExecutorService threadPool =
Executors.newCachedThreadPool(
new ThreadFactoryBuilder().setNameFormat("TimelineService #%d")
.build());
private static ExecutorService threadPool;
@Private
public static enum ContainerMetric {
CPU, MEMORY
@ -225,6 +222,10 @@ protected void serviceInit(Configuration conf) throws Exception {
if (publishContainerMetricsToTimelineService) {
LOG.info("NodeManager has been configured to publish container " +
"metrics to Timeline Service V2.");
threadPool =
Executors.newCachedThreadPool(
new ThreadFactoryBuilder().setNameFormat("TimelineService #%d")
.build());
} else {
LOG.warn("NodeManager has not been configured to publish container " +
"metrics to Timeline Service V2.");
@ -280,6 +281,9 @@ protected void serviceStop() throws Exception {
// TODO remove threadPool after adding non-blocking call in TimelineClient
private static void shutdownAndAwaitTermination() {
if (threadPool == null) {
return;
}
threadPool.shutdown();
try {
if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
@ -689,7 +693,6 @@ public void run() {
timelineClient.putEntities(entity);
} catch (IOException|YarnException e) {
LOG.error("putEntityNonBlocking get failed: " + e);
throw new RuntimeException(e.toString());
}
}
};

View File

@ -56,7 +56,7 @@ public void postPut(ApplicationId appId, TimelineCollector collector) {
if (parts.length != 2 || parts[1].isEmpty()) {
continue;
}
switch (parts[0]) {
switch (parts[0].toUpperCase()) {
case TimelineUtils.FLOW_NAME_TAG_PREFIX:
collector.getTimelineEntityContext().setFlowName(parts[1]);
break;

View File

@ -48,21 +48,11 @@
import com.google.common.annotations.VisibleForTesting;
/**
*
* It is a singleton, and instances should be obtained via
* {@link #getInstance()}.
*
*/
@Private
@Unstable
public class NodeTimelineCollectorManager extends TimelineCollectorManager {
private static final Log LOG =
LogFactory.getLog(NodeTimelineCollectorManager.class);
private static final NodeTimelineCollectorManager INSTANCE =
new NodeTimelineCollectorManager();
// REST server for this collector manager
private HttpServer2 timelineRestServer;
@ -73,10 +63,6 @@ public class NodeTimelineCollectorManager extends TimelineCollectorManager {
static final String COLLECTOR_MANAGER_ATTR_KEY = "collector.manager";
static NodeTimelineCollectorManager getInstance() {
return INSTANCE;
}
@VisibleForTesting
protected NodeTimelineCollectorManager() {
super(NodeTimelineCollectorManager.class.getName());

View File

@ -56,8 +56,7 @@ public class PerNodeTimelineCollectorsAuxService extends AuxiliaryService {
private final NodeTimelineCollectorManager collectorManager;
public PerNodeTimelineCollectorsAuxService() {
// use the same singleton
this(NodeTimelineCollectorManager.getInstance());
this(new NodeTimelineCollectorManager());
}
@VisibleForTesting PerNodeTimelineCollectorsAuxService(

View File

@ -128,7 +128,7 @@ public boolean remove(ApplicationId appId) {
postRemove(appId, collector);
// stop the service to do clean up
collector.stop();
LOG.info("the collector service for " + appId + " was removed");
LOG.info("The collector service for " + appId + " was removed");
}
return collector != null;
}