YARN-9293. Optimize MockAMLauncher event handling. Contributed by Bibin A Chundatt.

This commit is contained in:
bibinchundatt 2019-02-14 22:56:52 +05:30
parent 0d7a5ac5f5
commit 134ae8fc80
6 changed files with 52 additions and 37 deletions

View File

@ -60,6 +60,7 @@ import org.apache.hadoop.tools.rumen.LoggedTaskAttempt;
import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeLabel; import org.apache.hadoop.yarn.api.records.NodeLabel;
@ -113,6 +114,7 @@ public class SLSRunner extends Configured implements Tool {
// AM simulator // AM simulator
private int AM_ID; private int AM_ID;
private Map<String, AMSimulator> amMap; private Map<String, AMSimulator> amMap;
private Map<ApplicationId, AMSimulator> appIdAMSim;
private Set<String> trackedApps; private Set<String> trackedApps;
private Map<String, Class> amClassMap; private Map<String, Class> amClassMap;
private static int remainingApps = 0; private static int remainingApps = 0;
@ -170,7 +172,7 @@ public class SLSRunner extends Configured implements Tool {
queueAppNumMap = new HashMap<>(); queueAppNumMap = new HashMap<>();
amMap = new ConcurrentHashMap<>(); amMap = new ConcurrentHashMap<>();
amClassMap = new HashMap<>(); amClassMap = new HashMap<>();
appIdAMSim = new ConcurrentHashMap<>();
// runner configuration // runner configuration
setConf(tempConf); setConf(tempConf);
@ -277,7 +279,7 @@ public class SLSRunner extends Configured implements Tool {
rm = new ResourceManager() { rm = new ResourceManager() {
@Override @Override
protected ApplicationMasterLauncher createAMLauncher() { protected ApplicationMasterLauncher createAMLauncher() {
return new MockAMLauncher(se, this.rmContext, amMap); return new MockAMLauncher(se, this.rmContext, appIdAMSim);
} }
}; };
@ -587,7 +589,7 @@ public class SLSRunner extends Configured implements Tool {
try { try {
createAMForJob(job, baselineTimeMS); createAMForJob(job, baselineTimeMS);
} catch (Exception e) { } catch (Exception e) {
LOG.error("Failed to create an AM: {}", e.getMessage()); LOG.error("Failed to create an AM", e);
} }
job = reader.getNext(); job = reader.getNext();
@ -808,7 +810,8 @@ public class SLSRunner extends Configured implements Tool {
AM_ID++; AM_ID++;
amSim.init(heartbeatInterval, containerList, rm, this, jobStartTimeMS, amSim.init(heartbeatInterval, containerList, rm, this, jobStartTimeMS,
jobFinishTimeMS, user, jobQueue, isTracked, oldJobId, jobFinishTimeMS, user, jobQueue, isTracked, oldJobId,
runner.getStartTimeMS(), amContainerResource, labelExpr, params); runner.getStartTimeMS(), amContainerResource, labelExpr, params,
appIdAMSim);
if(reservationId != null) { if(reservationId != null) {
// if we have a ReservationId, delegate reservation creation to // if we have a ReservationId, delegate reservation creation to
// AMSim (reservation shape is impl specific) // AMSim (reservation shape is impl specific)

View File

@ -116,6 +116,8 @@ public abstract class AMSimulator extends TaskRunner.Task {
private ReservationSubmissionRequest reservationRequest; private ReservationSubmissionRequest reservationRequest;
private Map<ApplicationId, AMSimulator> appIdToAMSim;
public AMSimulator() { public AMSimulator() {
this.responseQueue = new LinkedBlockingQueue<>(); this.responseQueue = new LinkedBlockingQueue<>();
} }
@ -125,8 +127,8 @@ public abstract class AMSimulator extends TaskRunner.Task {
List<ContainerSimulator> containerList, ResourceManager resourceManager, List<ContainerSimulator> containerList, ResourceManager resourceManager,
SLSRunner slsRunnner, long startTime, long finishTime, String simUser, SLSRunner slsRunnner, long startTime, long finishTime, String simUser,
String simQueue, boolean tracked, String oldApp, long baseTimeMS, String simQueue, boolean tracked, String oldApp, long baseTimeMS,
Resource amResource, String nodeLabelExpr, Resource amResource, String nodeLabelExpr, Map<String, String> params,
Map<String, String> params) { Map<ApplicationId, AMSimulator> appIdAMSim) {
super.init(startTime, startTime + 1000000L * heartbeatInterval, super.init(startTime, startTime + 1000000L * heartbeatInterval,
heartbeatInterval); heartbeatInterval);
this.user = simUser; this.user = simUser;
@ -140,6 +142,7 @@ public abstract class AMSimulator extends TaskRunner.Task {
this.traceFinishTimeMS = finishTime; this.traceFinishTimeMS = finishTime;
this.amContainerResource = amResource; this.amContainerResource = amResource;
this.nodeLabelExpression = nodeLabelExpr; this.nodeLabelExpression = nodeLabelExpr;
this.appIdToAMSim = appIdAMSim;
} }
/** /**
@ -163,6 +166,9 @@ public abstract class AMSimulator extends TaskRunner.Task {
// submit application, waiting until ACCEPTED // submit application, waiting until ACCEPTED
submitApp(reservationId); submitApp(reservationId);
// add submitted app to mapping
appIdToAMSim.put(appId, this);
// track app metrics // track app metrics
trackApp(); trackApp();
} }

View File

@ -34,6 +34,7 @@ import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
@ -127,10 +128,10 @@ public class MRAMSimulator extends AMSimulator {
long traceStartTime, long traceFinishTime, String user, String queue, long traceStartTime, long traceFinishTime, String user, String queue,
boolean isTracked, String oldAppId, long baselineStartTimeMS, boolean isTracked, String oldAppId, long baselineStartTimeMS,
Resource amContainerResource, String nodeLabelExpr, Resource amContainerResource, String nodeLabelExpr,
Map<String, String> params) { Map<String, String> params, Map<ApplicationId, AMSimulator> appIdAMSim) {
super.init(heartbeatInterval, containerList, rm, se, super.init(heartbeatInterval, containerList, rm, se, traceStartTime,
traceStartTime, traceFinishTime, user, queue, isTracked, oldAppId, traceFinishTime, user, queue, isTracked, oldAppId, baselineStartTimeMS,
baselineStartTimeMS, amContainerResource, nodeLabelExpr, params); amContainerResource, nodeLabelExpr, params, appIdAMSim);
amtype = "mapreduce"; amtype = "mapreduce";
// get map/reduce tasks // get map/reduce tasks

View File

@ -24,6 +24,7 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
@ -97,10 +98,10 @@ public class StreamAMSimulator extends AMSimulator {
long traceStartTime, long traceFinishTime, String user, String queue, long traceStartTime, long traceFinishTime, String user, String queue,
boolean isTracked, String oldAppId, long baselineStartTimeMS, boolean isTracked, String oldAppId, long baselineStartTimeMS,
Resource amContainerResource, String nodeLabelExpr, Resource amContainerResource, String nodeLabelExpr,
Map<String, String> params) { Map<String, String> params, Map<ApplicationId, AMSimulator> appIdAMSim) {
super.init(heartbeatInterval, containerList, rm, se, traceStartTime, super.init(heartbeatInterval, containerList, rm, se, traceStartTime,
traceFinishTime, user, queue, isTracked, oldAppId, baselineStartTimeMS, traceFinishTime, user, queue, isTracked, oldAppId, baselineStartTimeMS,
amContainerResource, nodeLabelExpr, params); amContainerResource, nodeLabelExpr, params, appIdAMSim);
amtype = "stream"; amtype = "stream";
allStreams.addAll(containerList); allStreams.addAll(containerList);

View File

@ -45,13 +45,14 @@ public class MockAMLauncher extends ApplicationMasterLauncher
private static final Logger LOG = LoggerFactory.getLogger( private static final Logger LOG = LoggerFactory.getLogger(
MockAMLauncher.class); MockAMLauncher.class);
Map<String, AMSimulator> amMap; private Map<ApplicationId, AMSimulator> appIdAMSim;
SLSRunner se; SLSRunner se;
public MockAMLauncher(SLSRunner se, RMContext rmContext, public MockAMLauncher(SLSRunner se, RMContext rmContext,
Map<String, AMSimulator> amMap) { Map<ApplicationId, AMSimulator> appIdAMSim) {
super(rmContext); super(rmContext);
this.amMap = amMap; this.appIdAMSim = appIdAMSim;
this.se = se; this.se = se;
} }
@ -86,30 +87,28 @@ public class MockAMLauncher extends ApplicationMasterLauncher
event.getAppAttempt().getAppAttemptId().getApplicationId(); event.getAppAttempt().getAppAttemptId().getApplicationId();
// find AMSimulator // find AMSimulator
for (AMSimulator ams : amMap.values()) { AMSimulator ams = appIdAMSim.get(appId);
if (ams.getApplicationId() != null && ams.getApplicationId().equals( if (ams != null) {
appId)) { try {
try { Container amContainer = event.getAppAttempt().getMasterContainer();
Container amContainer = event.getAppAttempt().getMasterContainer();
setupAMRMToken(event.getAppAttempt()); setupAMRMToken(event.getAppAttempt());
// Notify RMAppAttempt to change state // Notify RMAppAttempt to change state
super.context.getDispatcher().getEventHandler().handle( super.context.getDispatcher().getEventHandler().handle(
new RMAppAttemptEvent(event.getAppAttempt().getAppAttemptId(), new RMAppAttemptEvent(event.getAppAttempt().getAppAttemptId(),
RMAppAttemptEventType.LAUNCHED)); RMAppAttemptEventType.LAUNCHED));
ams.notifyAMContainerLaunched( ams.notifyAMContainerLaunched(
event.getAppAttempt().getMasterContainer()); event.getAppAttempt().getMasterContainer());
LOG.info("Notify AM launcher launched:" + amContainer.getId()); LOG.info("Notify AM launcher launched:" + amContainer.getId());
se.getNmMap().get(amContainer.getNodeId()) se.getNmMap().get(amContainer.getNodeId())
.addNewContainer(amContainer, 100000000L); .addNewContainer(amContainer, 100000000L);
return; return;
} catch (Exception e) { } catch (Exception e) {
throw new YarnRuntimeException(e); throw new YarnRuntimeException(e);
}
} }
} }
@ -117,4 +116,5 @@ public class MockAMLauncher extends ApplicationMasterLauncher
"Didn't find any AMSimulator for applicationId=" + appId); "Didn't find any AMSimulator for applicationId=" + appId);
} }
} }
} }

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.sls.appmaster; package org.apache.hadoop.yarn.sls.appmaster;
import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.MetricRegistry;
import java.util.HashMap;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.ReservationId;
@ -144,8 +145,10 @@ public class TestAMSimulator {
String appId = "app1"; String appId = "app1";
String queue = "default"; String queue = "default";
List<ContainerSimulator> containers = new ArrayList<>(); List<ContainerSimulator> containers = new ArrayList<>();
HashMap<ApplicationId, AMSimulator> map = new HashMap<>();
app.init(1000, containers, rm, null, 0, 1000000L, "user1", queue, true, app.init(1000, containers, rm, null, 0, 1000000L, "user1", queue, true,
appId, 0, SLSConfiguration.getAMContainerResource(conf), null, null); appId, 0, SLSConfiguration.getAMContainerResource(conf), null, null,
map);
app.firstStep(); app.firstStep();
verifySchedulerMetrics(appId); verifySchedulerMetrics(appId);
@ -169,9 +172,10 @@ public class TestAMSimulator {
String appId = "app1"; String appId = "app1";
String queue = "default"; String queue = "default";
List<ContainerSimulator> containers = new ArrayList<>(); List<ContainerSimulator> containers = new ArrayList<>();
HashMap<ApplicationId, AMSimulator> map = new HashMap<>();
app.init(1000, containers, rm, null, 0, 1000000L, "user1", queue, true, app.init(1000, containers, rm, null, 0, 1000000L, "user1", queue, true,
appId, 0, SLSConfiguration.getAMContainerResource(conf), appId, 0, SLSConfiguration.getAMContainerResource(conf), "label1",
"label1", null); null, map);
app.firstStep(); app.firstStep();
verifySchedulerMetrics(appId); verifySchedulerMetrics(appId);