YARN-9293. Optimize MockAMLauncher event handling. Contributed by Bibin A Chundatt.

This commit is contained in:
bibinchundatt 2019-02-14 23:00:56 +05:30
parent ec08eed542
commit cc0ef524d9
6 changed files with 50 additions and 34 deletions

View File

@ -55,6 +55,7 @@ import org.apache.hadoop.tools.rumen.LoggedTaskAttempt;
import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.NodeState;
@ -105,6 +106,7 @@ public class SLSRunner extends Configured implements Tool {
// AM simulator // AM simulator
private int AM_ID; private int AM_ID;
private Map<String, AMSimulator> amMap; private Map<String, AMSimulator> amMap;
private Map<ApplicationId, AMSimulator> appIdAMSim;
private Set<String> trackedApps; private Set<String> trackedApps;
private Map<String, Class> amClassMap; private Map<String, Class> amClassMap;
private static int remainingApps = 0; private static int remainingApps = 0;
@ -162,7 +164,7 @@ public class SLSRunner extends Configured implements Tool {
queueAppNumMap = new HashMap<>(); queueAppNumMap = new HashMap<>();
amMap = new ConcurrentHashMap<>(); amMap = new ConcurrentHashMap<>();
amClassMap = new HashMap<>(); amClassMap = new HashMap<>();
appIdAMSim = new ConcurrentHashMap<>();
// runner configuration // runner configuration
setConf(tempConf); setConf(tempConf);
@ -269,7 +271,7 @@ public class SLSRunner extends Configured implements Tool {
rm = new ResourceManager() { rm = new ResourceManager() {
@Override @Override
protected ApplicationMasterLauncher createAMLauncher() { protected ApplicationMasterLauncher createAMLauncher() {
return new MockAMLauncher(se, this.rmContext, amMap); return new MockAMLauncher(se, this.rmContext, appIdAMSim);
} }
}; };
@ -551,7 +553,7 @@ public class SLSRunner extends Configured implements Tool {
try { try {
createAMForJob(job, baselineTimeMS); createAMForJob(job, baselineTimeMS);
} catch (Exception e) { } catch (Exception e) {
LOG.error("Failed to create an AM: {}", e.getMessage()); LOG.error("Failed to create an AM", e);
} }
job = reader.getNext(); job = reader.getNext();
@ -763,7 +765,7 @@ public class SLSRunner extends Configured implements Tool {
AM_ID++; AM_ID++;
amSim.init(heartbeatInterval, containerList, rm, this, jobStartTimeMS, amSim.init(heartbeatInterval, containerList, rm, this, jobStartTimeMS,
jobFinishTimeMS, user, jobQueue, isTracked, oldJobId, jobFinishTimeMS, user, jobQueue, isTracked, oldJobId,
runner.getStartTimeMS(), amContainerResource, params); runner.getStartTimeMS(), amContainerResource, params, appIdAMSim);
if(reservationId != null) { if(reservationId != null) {
// if we have a ReservationId, delegate reservation creation to // if we have a ReservationId, delegate reservation creation to
// AMSim (reservation shape is impl specific) // AMSim (reservation shape is impl specific)

View File

@ -114,6 +114,8 @@ public abstract class AMSimulator extends TaskRunner.Task {
private ReservationSubmissionRequest reservationRequest; private ReservationSubmissionRequest reservationRequest;
private Map<ApplicationId, AMSimulator> appIdToAMSim;
public AMSimulator() { public AMSimulator() {
this.responseQueue = new LinkedBlockingQueue<>(); this.responseQueue = new LinkedBlockingQueue<>();
} }
@ -123,7 +125,8 @@ public abstract class AMSimulator extends TaskRunner.Task {
List<ContainerSimulator> containerList, ResourceManager resourceManager, List<ContainerSimulator> containerList, ResourceManager resourceManager,
SLSRunner slsRunnner, long startTime, long finishTime, String simUser, SLSRunner slsRunnner, long startTime, long finishTime, String simUser,
String simQueue, boolean tracked, String oldApp, long baseTimeMS, String simQueue, boolean tracked, String oldApp, long baseTimeMS,
Resource amResource, Map<String, String> params) { Resource amResource, Map<String, String> params,
Map<ApplicationId, AMSimulator> appIdAMSim) {
super.init(startTime, startTime + 1000000L * heartbeatInterval, super.init(startTime, startTime + 1000000L * heartbeatInterval,
heartbeatInterval); heartbeatInterval);
this.user = simUser; this.user = simUser;
@ -136,6 +139,7 @@ public abstract class AMSimulator extends TaskRunner.Task {
this.traceStartTimeMS = startTime; this.traceStartTimeMS = startTime;
this.traceFinishTimeMS = finishTime; this.traceFinishTimeMS = finishTime;
this.amContainerResource = amResource; this.amContainerResource = amResource;
this.appIdToAMSim = appIdAMSim;
} }
/** /**
@ -159,6 +163,9 @@ public abstract class AMSimulator extends TaskRunner.Task {
// submit application, waiting until ACCEPTED // submit application, waiting until ACCEPTED
submitApp(reservationId); submitApp(reservationId);
// add submitted app to mapping
appIdToAMSim.put(appId, this);
// track app metrics // track app metrics
trackApp(); trackApp();
} }

View File

@ -34,6 +34,7 @@ import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
@ -126,10 +127,11 @@ public class MRAMSimulator extends AMSimulator {
List<ContainerSimulator> containerList, ResourceManager rm, SLSRunner se, List<ContainerSimulator> containerList, ResourceManager rm, SLSRunner se,
long traceStartTime, long traceFinishTime, String user, String queue, long traceStartTime, long traceFinishTime, String user, String queue,
boolean isTracked, String oldAppId, long baselineStartTimeMS, boolean isTracked, String oldAppId, long baselineStartTimeMS,
Resource amContainerResource, Map<String, String> params) { Resource amContainerResource, Map<String, String> params,
super.init(heartbeatInterval, containerList, rm, se, Map<ApplicationId, AMSimulator> appIdAMSim) {
traceStartTime, traceFinishTime, user, queue, isTracked, oldAppId, super.init(heartbeatInterval, containerList, rm, se, traceStartTime,
baselineStartTimeMS, amContainerResource, params); traceFinishTime, user, queue, isTracked, oldAppId, baselineStartTimeMS,
amContainerResource, params, appIdAMSim);
amtype = "mapreduce"; amtype = "mapreduce";
// get map/reduce tasks // get map/reduce tasks

View File

@ -24,6 +24,7 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
@ -96,10 +97,11 @@ public class StreamAMSimulator extends AMSimulator {
List<ContainerSimulator> containerList, ResourceManager rm, SLSRunner se, List<ContainerSimulator> containerList, ResourceManager rm, SLSRunner se,
long traceStartTime, long traceFinishTime, String user, String queue, long traceStartTime, long traceFinishTime, String user, String queue,
boolean isTracked, String oldAppId, long baselineStartTimeMS, boolean isTracked, String oldAppId, long baselineStartTimeMS,
Resource amContainerResource, Map<String, String> params) { Resource amContainerResource, Map<String, String> params,
Map<ApplicationId, AMSimulator> appIdAMSim) {
super.init(heartbeatInterval, containerList, rm, se, traceStartTime, super.init(heartbeatInterval, containerList, rm, se, traceStartTime,
traceFinishTime, user, queue, isTracked, oldAppId, baselineStartTimeMS, traceFinishTime, user, queue, isTracked, oldAppId, baselineStartTimeMS,
amContainerResource, params); amContainerResource, params, appIdAMSim);
amtype = "stream"; amtype = "stream";
allStreams.addAll(containerList); allStreams.addAll(containerList);

View File

@ -45,13 +45,14 @@ public class MockAMLauncher extends ApplicationMasterLauncher
private static final Log LOG = LogFactory.getLog( private static final Log LOG = LogFactory.getLog(
MockAMLauncher.class); MockAMLauncher.class);
Map<String, AMSimulator> amMap; private Map<ApplicationId, AMSimulator> appIdAMSim;
SLSRunner se; SLSRunner se;
public MockAMLauncher(SLSRunner se, RMContext rmContext, public MockAMLauncher(SLSRunner se, RMContext rmContext,
Map<String, AMSimulator> amMap) { Map<ApplicationId, AMSimulator> appIdAMSim) {
super(rmContext); super(rmContext);
this.amMap = amMap; this.appIdAMSim = appIdAMSim;
this.se = se; this.se = se;
} }
@ -86,30 +87,28 @@ public class MockAMLauncher extends ApplicationMasterLauncher
event.getAppAttempt().getAppAttemptId().getApplicationId(); event.getAppAttempt().getAppAttemptId().getApplicationId();
// find AMSimulator // find AMSimulator
for (AMSimulator ams : amMap.values()) { AMSimulator ams = appIdAMSim.get(appId);
if (ams.getApplicationId() != null && ams.getApplicationId().equals( if (ams != null) {
appId)) { try {
try { Container amContainer = event.getAppAttempt().getMasterContainer();
Container amContainer = event.getAppAttempt().getMasterContainer();
setupAMRMToken(event.getAppAttempt()); setupAMRMToken(event.getAppAttempt());
// Notify RMAppAttempt to change state // Notify RMAppAttempt to change state
super.context.getDispatcher().getEventHandler().handle( super.context.getDispatcher().getEventHandler().handle(
new RMAppAttemptEvent(event.getAppAttempt().getAppAttemptId(), new RMAppAttemptEvent(event.getAppAttempt().getAppAttemptId(),
RMAppAttemptEventType.LAUNCHED)); RMAppAttemptEventType.LAUNCHED));
ams.notifyAMContainerLaunched( ams.notifyAMContainerLaunched(
event.getAppAttempt().getMasterContainer()); event.getAppAttempt().getMasterContainer());
LOG.info("Notify AM launcher launched:" + amContainer.getId()); LOG.info("Notify AM launcher launched:" + amContainer.getId());
se.getNmMap().get(amContainer.getNodeId()) se.getNmMap().get(amContainer.getNodeId())
.addNewContainer(amContainer, 100000000L); .addNewContainer(amContainer, 100000000L);
return; return;
} catch (Exception e) { } catch (Exception e) {
throw new YarnRuntimeException(e); throw new YarnRuntimeException(e);
}
} }
} }
@ -117,4 +116,5 @@ public class MockAMLauncher extends ApplicationMasterLauncher
"Didn't find any AMSimulator for applicationId=" + appId); "Didn't find any AMSimulator for applicationId=" + appId);
} }
} }
} }

View File

@ -18,7 +18,9 @@
package org.apache.hadoop.yarn.sls.appmaster; package org.apache.hadoop.yarn.sls.appmaster;
import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.MetricRegistry;
import java.util.HashMap;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
@ -139,8 +141,9 @@ public class TestAMSimulator {
String appId = "app1"; String appId = "app1";
String queue = "default"; String queue = "default";
List<ContainerSimulator> containers = new ArrayList<>(); List<ContainerSimulator> containers = new ArrayList<>();
HashMap<ApplicationId, AMSimulator> map = new HashMap<>();
app.init(1000, containers, rm, null, 0, 1000000L, "user1", queue, true, app.init(1000, containers, rm, null, 0, 1000000L, "user1", queue, true,
appId, 0, SLSConfiguration.getAMContainerResource(conf), null); appId, 0, SLSConfiguration.getAMContainerResource(conf), null, map);
app.firstStep(); app.firstStep();
verifySchedulerMetrics(appId); verifySchedulerMetrics(appId);