From cc0ef524d9f4853ebba71480801682c3fd40dd80 Mon Sep 17 00:00:00 2001 From: bibinchundatt Date: Thu, 14 Feb 2019 23:00:56 +0530 Subject: [PATCH] YARN-9293. Optimize MockAMLauncher event handling. Contributed by Bibin A Chundatt. --- .../org/apache/hadoop/yarn/sls/SLSRunner.java | 10 +++-- .../yarn/sls/appmaster/AMSimulator.java | 9 +++- .../yarn/sls/appmaster/MRAMSimulator.java | 10 +++-- .../yarn/sls/appmaster/StreamAMSimulator.java | 6 ++- .../sls/resourcemanager/MockAMLauncher.java | 44 +++++++++---------- .../yarn/sls/appmaster/TestAMSimulator.java | 5 ++- 6 files changed, 50 insertions(+), 34 deletions(-) diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java index 8a522fed82b..44aaf7484e4 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java @@ -55,6 +55,7 @@ import org.apache.hadoop.tools.rumen.LoggedTaskAttempt; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; @@ -105,6 +106,7 @@ public class SLSRunner extends Configured implements Tool { // AM simulator private int AM_ID; private Map amMap; + private Map appIdAMSim; private Set trackedApps; private Map amClassMap; private static int remainingApps = 0; @@ -162,7 +164,7 @@ public class SLSRunner extends Configured implements Tool { queueAppNumMap = new HashMap<>(); amMap = new ConcurrentHashMap<>(); amClassMap = new HashMap<>(); - + appIdAMSim = new ConcurrentHashMap<>(); // runner configuration setConf(tempConf); @@ -269,7 +271,7 @@ public class SLSRunner extends Configured implements Tool { rm = new ResourceManager() { @Override protected ApplicationMasterLauncher createAMLauncher() { - return new MockAMLauncher(se, this.rmContext, amMap); + return new MockAMLauncher(se, this.rmContext, appIdAMSim); } }; @@ -551,7 +553,7 @@ public class SLSRunner extends Configured implements Tool { try { createAMForJob(job, baselineTimeMS); } catch (Exception e) { - LOG.error("Failed to create an AM: {}", e.getMessage()); + LOG.error("Failed to create an AM", e); } job = reader.getNext(); @@ -763,7 +765,7 @@ public class SLSRunner extends Configured implements Tool { AM_ID++; amSim.init(heartbeatInterval, containerList, rm, this, jobStartTimeMS, jobFinishTimeMS, user, jobQueue, isTracked, oldJobId, - runner.getStartTimeMS(), amContainerResource, params); + runner.getStartTimeMS(), amContainerResource, params, appIdAMSim); if(reservationId != null) { // if we have a ReservationId, delegate reservation creation to // AMSim (reservation shape is impl specific) diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java index 8e1c256c636..aa6cf8932fc 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java @@ -114,6 +114,8 @@ public abstract class AMSimulator extends TaskRunner.Task { private ReservationSubmissionRequest reservationRequest; + private Map appIdToAMSim; + public AMSimulator() { this.responseQueue = new LinkedBlockingQueue<>(); } @@ -123,7 +125,8 @@ public abstract class AMSimulator extends TaskRunner.Task { List containerList, ResourceManager resourceManager, SLSRunner slsRunnner, long startTime, long finishTime, String simUser, String simQueue, boolean tracked, String oldApp, long baseTimeMS, - Resource amResource, Map params) { + Resource amResource, Map params, + Map appIdAMSim) { super.init(startTime, startTime + 1000000L * heartbeatInterval, heartbeatInterval); this.user = simUser; @@ -136,6 +139,7 @@ public abstract class AMSimulator extends TaskRunner.Task { this.traceStartTimeMS = startTime; this.traceFinishTimeMS = finishTime; this.amContainerResource = amResource; + this.appIdToAMSim = appIdAMSim; } /** @@ -159,6 +163,9 @@ public abstract class AMSimulator extends TaskRunner.Task { // submit application, waiting until ACCEPTED submitApp(reservationId); + // add submitted app to mapping + appIdToAMSim.put(appId, this); + // track app metrics trackApp(); } diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java index 6f0f85ff904..2779860e796 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java @@ -34,6 +34,7 @@ import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; @@ -126,10 +127,11 @@ public class MRAMSimulator extends AMSimulator { List containerList, ResourceManager rm, SLSRunner se, long traceStartTime, long traceFinishTime, String user, String queue, boolean isTracked, String oldAppId, long baselineStartTimeMS, - Resource amContainerResource, Map params) { - super.init(heartbeatInterval, containerList, rm, se, - traceStartTime, traceFinishTime, user, queue, isTracked, oldAppId, - baselineStartTimeMS, amContainerResource, params); + Resource amContainerResource, Map params, + Map appIdAMSim) { + super.init(heartbeatInterval, containerList, rm, se, traceStartTime, + traceFinishTime, user, queue, isTracked, oldAppId, baselineStartTimeMS, + amContainerResource, params, appIdAMSim); amtype = "mapreduce"; // get map/reduce tasks diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/StreamAMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/StreamAMSimulator.java index b41f5f20296..c70dd3f4a43 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/StreamAMSimulator.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/StreamAMSimulator.java @@ -24,6 +24,7 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -96,10 +97,11 @@ public class StreamAMSimulator extends AMSimulator { List containerList, ResourceManager rm, SLSRunner se, long traceStartTime, long traceFinishTime, String user, String queue, boolean isTracked, String oldAppId, long baselineStartTimeMS, - Resource amContainerResource, Map params) { + Resource amContainerResource, Map params, + Map appIdAMSim) { super.init(heartbeatInterval, containerList, rm, se, traceStartTime, traceFinishTime, user, queue, isTracked, oldAppId, baselineStartTimeMS, - amContainerResource, params); + amContainerResource, params, appIdAMSim); amtype = "stream"; allStreams.addAll(containerList); diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/resourcemanager/MockAMLauncher.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/resourcemanager/MockAMLauncher.java index b4ffb617c65..b070e18ffe0 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/resourcemanager/MockAMLauncher.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/resourcemanager/MockAMLauncher.java @@ -45,13 +45,14 @@ public class MockAMLauncher extends ApplicationMasterLauncher private static final Log LOG = LogFactory.getLog( MockAMLauncher.class); - Map amMap; + private Map appIdAMSim; + SLSRunner se; public MockAMLauncher(SLSRunner se, RMContext rmContext, - Map amMap) { + Map appIdAMSim) { super(rmContext); - this.amMap = amMap; + this.appIdAMSim = appIdAMSim; this.se = se; } @@ -86,30 +87,28 @@ public class MockAMLauncher extends ApplicationMasterLauncher event.getAppAttempt().getAppAttemptId().getApplicationId(); // find AMSimulator - for (AMSimulator ams : amMap.values()) { - if (ams.getApplicationId() != null && ams.getApplicationId().equals( - appId)) { - try { - Container amContainer = event.getAppAttempt().getMasterContainer(); + AMSimulator ams = appIdAMSim.get(appId); + if (ams != null) { + try { + Container amContainer = event.getAppAttempt().getMasterContainer(); - setupAMRMToken(event.getAppAttempt()); + setupAMRMToken(event.getAppAttempt()); - // Notify RMAppAttempt to change state - super.context.getDispatcher().getEventHandler().handle( - new RMAppAttemptEvent(event.getAppAttempt().getAppAttemptId(), - RMAppAttemptEventType.LAUNCHED)); + // Notify RMAppAttempt to change state + super.context.getDispatcher().getEventHandler().handle( + new RMAppAttemptEvent(event.getAppAttempt().getAppAttemptId(), + RMAppAttemptEventType.LAUNCHED)); - ams.notifyAMContainerLaunched( - event.getAppAttempt().getMasterContainer()); - LOG.info("Notify AM launcher launched:" + amContainer.getId()); + ams.notifyAMContainerLaunched( + event.getAppAttempt().getMasterContainer()); + LOG.info("Notify AM launcher launched:" + amContainer.getId()); - se.getNmMap().get(amContainer.getNodeId()) - .addNewContainer(amContainer, 100000000L); + se.getNmMap().get(amContainer.getNodeId()) + .addNewContainer(amContainer, 100000000L); - return; - } catch (Exception e) { - throw new YarnRuntimeException(e); - } + return; + } catch (Exception e) { + throw new YarnRuntimeException(e); } } @@ -117,4 +116,5 @@ public class MockAMLauncher extends ApplicationMasterLauncher "Didn't find any AMSimulator for applicationId=" + appId); } } + } diff --git a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java index bc8ea70e46b..e5f7c809fd9 100644 --- a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java +++ b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java @@ -18,7 +18,9 @@ package org.apache.hadoop.yarn.sls.appmaster; import com.codahale.metrics.MetricRegistry; +import java.util.HashMap; import org.apache.commons.io.FileUtils; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -139,8 +141,9 @@ public class TestAMSimulator { String appId = "app1"; String queue = "default"; List containers = new ArrayList<>(); + HashMap map = new HashMap<>(); app.init(1000, containers, rm, null, 0, 1000000L, "user1", queue, true, - appId, 0, SLSConfiguration.getAMContainerResource(conf), null); + appId, 0, SLSConfiguration.getAMContainerResource(conf), null, map); app.firstStep(); verifySchedulerMetrics(appId);