YARN-6326. Shouldn't use AppAttemptIds to fetch applications while AM Simulator tracks app in SLS (yufeigu via rkanter)

This commit is contained in:
Robert Kanter 2017-03-21 15:21:11 -07:00
parent 0a05c5c598
commit cc938e99ec
7 changed files with 253 additions and 242 deletions

View File

@ -62,10 +62,6 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.log4j.Logger;
@ -335,13 +331,12 @@ public abstract class AMSimulator extends TaskRunner.Task {
private void trackApp() {
if (isTracked) {
((SchedulerWrapper) rm.getResourceScheduler())
.addTrackedApp(appAttemptId, oldAppId);
.addTrackedApp(appId, oldAppId);
}
}
public void untrackApp() {
if (isTracked) {
((SchedulerWrapper) rm.getResourceScheduler())
.removeTrackedApp(appAttemptId, oldAppId);
((SchedulerWrapper) rm.getResourceScheduler()).removeTrackedApp(oldAppId);
}
}

View File

@ -18,16 +18,17 @@
package org.apache.hadoop.yarn.sls.scheduler;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair
.FSAppAttempt;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable;
import org.apache.hadoop.yarn.sls.SLSRunner;
import com.codahale.metrics.Gauge;
import org.apache.hadoop.yarn.sls.SLSRunner;
@Private
@Unstable
@ -37,114 +38,131 @@ public class FairSchedulerMetrics extends SchedulerMetrics {
private int totalVCores = Integer.MAX_VALUE;
private boolean maxReset = false;
@VisibleForTesting
public enum Metric {
DEMAND("demand"),
USAGE("usage"),
MINSHARE("minshare"),
MAXSHARE("maxshare"),
FAIRSHARE("fairshare");
private String value;
Metric(String value) {
this.value = value;
}
@VisibleForTesting
public String getValue() {
return value;
}
}
public FairSchedulerMetrics() {
super();
appTrackedMetrics.add("demand.memory");
appTrackedMetrics.add("demand.vcores");
appTrackedMetrics.add("usage.memory");
appTrackedMetrics.add("usage.vcores");
appTrackedMetrics.add("minshare.memory");
appTrackedMetrics.add("minshare.vcores");
appTrackedMetrics.add("maxshare.memory");
appTrackedMetrics.add("maxshare.vcores");
appTrackedMetrics.add("fairshare.memory");
appTrackedMetrics.add("fairshare.vcores");
queueTrackedMetrics.add("demand.memory");
queueTrackedMetrics.add("demand.vcores");
queueTrackedMetrics.add("usage.memory");
queueTrackedMetrics.add("usage.vcores");
queueTrackedMetrics.add("minshare.memory");
queueTrackedMetrics.add("minshare.vcores");
queueTrackedMetrics.add("maxshare.memory");
queueTrackedMetrics.add("maxshare.vcores");
queueTrackedMetrics.add("fairshare.memory");
queueTrackedMetrics.add("fairshare.vcores");
for (Metric metric: Metric.values()) {
appTrackedMetrics.add(metric.value + ".memory");
appTrackedMetrics.add(metric.value + ".vcores");
queueTrackedMetrics.add(metric.value + ".memory");
queueTrackedMetrics.add(metric.value + ".vcores");
}
}
private long getMemorySize(Schedulable schedulable, Metric metric) {
if (schedulable != null) {
switch (metric) {
case DEMAND:
return schedulable.getDemand().getMemorySize();
case USAGE:
return schedulable.getResourceUsage().getMemorySize();
case MINSHARE:
return schedulable.getMinShare().getMemorySize();
case MAXSHARE:
return schedulable.getMaxShare().getMemorySize();
case FAIRSHARE:
return schedulable.getFairShare().getMemorySize();
default:
return 0L;
}
}
return 0L;
}
private int getVirtualCores(Schedulable schedulable, Metric metric) {
if (schedulable != null) {
switch (metric) {
case DEMAND:
return schedulable.getDemand().getVirtualCores();
case USAGE:
return schedulable.getResourceUsage().getVirtualCores();
case MINSHARE:
return schedulable.getMinShare().getVirtualCores();
case MAXSHARE:
return schedulable.getMaxShare().getVirtualCores();
case FAIRSHARE:
return schedulable.getFairShare().getVirtualCores();
default:
return 0;
}
}
return 0;
}
private void registerAppMetrics(ApplicationId appId, String oldAppId,
Metric metric) {
metrics.register(
"variable.app." + oldAppId + "." + metric.value + ".memory",
new Gauge<Long>() {
@Override
public Long getValue() {
return getMemorySize((FSAppAttempt)getSchedulerAppAttempt(appId),
metric);
}
}
);
metrics.register(
"variable.app." + oldAppId + "." + metric.value + ".vcores",
new Gauge<Integer>() {
@Override
public Integer getValue() {
return getVirtualCores((FSAppAttempt)getSchedulerAppAttempt(appId),
metric);
}
}
);
}
@Override
public void trackApp(ApplicationAttemptId appAttemptId, String oldAppId) {
super.trackApp(appAttemptId, oldAppId);
FairScheduler fair = (FairScheduler) scheduler;
final FSAppAttempt app = fair.getSchedulerApp(appAttemptId);
metrics.register("variable.app." + oldAppId + ".demand.memory",
new Gauge<Long>() {
@Override
public Long getValue() {
return app.getDemand().getMemorySize();
public void trackApp(ApplicationId appId, String oldAppId) {
super.trackApp(appId, oldAppId);
for (Metric metric: Metric.values()) {
registerAppMetrics(appId, oldAppId, metric);
}
}
private void registerQueueMetrics(FSQueue queue, Metric metric) {
metrics.register(
"variable.queue." + queue.getName() + "." + metric.value + ".memory",
new Gauge<Long>() {
@Override
public Long getValue() {
return getMemorySize(queue, metric);
}
}
}
);
metrics.register("variable.app." + oldAppId + ".demand.vcores",
new Gauge<Integer>() {
@Override
public Integer getValue() {
return app.getDemand().getVirtualCores();
metrics.register(
"variable.queue." + queue.getName() + "." + metric.value + ".vcores",
new Gauge<Integer>() {
@Override
public Integer getValue() {
return getVirtualCores(queue, metric);
}
}
}
);
metrics.register("variable.app." + oldAppId + ".usage.memory",
new Gauge<Long>() {
@Override
public Long getValue() {
return app.getResourceUsage().getMemorySize();
}
}
);
metrics.register("variable.app." + oldAppId + ".usage.vcores",
new Gauge<Integer>() {
@Override
public Integer getValue() {
return app.getResourceUsage().getVirtualCores();
}
}
);
metrics.register("variable.app." + oldAppId + ".minshare.memory",
new Gauge<Long>() {
@Override
public Long getValue() {
return app.getMinShare().getMemorySize();
}
}
);
metrics.register("variable.app." + oldAppId + ".minshare.vcores",
new Gauge<Long>() {
@Override
public Long getValue() {
return app.getMinShare().getMemorySize();
}
}
);
metrics.register("variable.app." + oldAppId + ".maxshare.memory",
new Gauge<Long>() {
@Override
public Long getValue() {
return Math.min(app.getMaxShare().getMemorySize(), totalMemoryMB);
}
}
);
metrics.register("variable.app." + oldAppId + ".maxshare.vcores",
new Gauge<Integer>() {
@Override
public Integer getValue() {
return Math.min(app.getMaxShare().getVirtualCores(), totalVCores);
}
}
);
metrics.register("variable.app." + oldAppId + ".fairshare.memory",
new Gauge<Integer>() {
@Override
public Integer getValue() {
return app.getFairShare().getVirtualCores();
}
}
);
metrics.register("variable.app." + oldAppId + ".fairshare.vcores",
new Gauge<Integer>() {
@Override
public Integer getValue() {
return app.getFairShare().getVirtualCores();
}
}
);
}
@ -153,54 +171,11 @@ public class FairSchedulerMetrics extends SchedulerMetrics {
trackedQueues.add(queueName);
FairScheduler fair = (FairScheduler) scheduler;
final FSQueue queue = fair.getQueueManager().getQueue(queueName);
metrics.register("variable.queue." + queueName + ".demand.memory",
new Gauge<Long>() {
@Override
public Long getValue() {
return queue.getDemand().getMemorySize();
}
}
);
metrics.register("variable.queue." + queueName + ".demand.vcores",
new Gauge<Integer>() {
@Override
public Integer getValue() {
return queue.getDemand().getVirtualCores();
}
}
);
metrics.register("variable.queue." + queueName + ".usage.memory",
new Gauge<Long>() {
@Override
public Long getValue() {
return queue.getResourceUsage().getMemorySize();
}
}
);
metrics.register("variable.queue." + queueName + ".usage.vcores",
new Gauge<Integer>() {
@Override
public Integer getValue() {
return queue.getResourceUsage().getVirtualCores();
}
}
);
metrics.register("variable.queue." + queueName + ".minshare.memory",
new Gauge<Long>() {
@Override
public Long getValue() {
return queue.getMinShare().getMemorySize();
}
}
);
metrics.register("variable.queue." + queueName + ".minshare.vcores",
new Gauge<Integer>() {
@Override
public Integer getValue() {
return queue.getMinShare().getVirtualCores();
}
}
);
registerQueueMetrics(queue, Metric.DEMAND);
registerQueueMetrics(queue, Metric.USAGE);
registerQueueMetrics(queue, Metric.MINSHARE);
registerQueueMetrics(queue, Metric.FAIRSHARE);
metrics.register("variable.queue." + queueName + ".maxshare.memory",
new Gauge<Long>() {
@Override
@ -233,36 +208,17 @@ public class FairSchedulerMetrics extends SchedulerMetrics {
}
}
);
metrics.register("variable.queue." + queueName + ".fairshare.memory",
new Gauge<Long>() {
@Override
public Long getValue() {
return queue.getFairShare().getMemorySize();
}
}
);
metrics.register("variable.queue." + queueName + ".fairshare.vcores",
new Gauge<Integer>() {
@Override
public Integer getValue() {
return queue.getFairShare().getVirtualCores();
}
}
);
}
@Override
public void untrackQueue(String queueName) {
trackedQueues.remove(queueName);
metrics.remove("variable.queue." + queueName + ".demand.memory");
metrics.remove("variable.queue." + queueName + ".demand.vcores");
metrics.remove("variable.queue." + queueName + ".usage.memory");
metrics.remove("variable.queue." + queueName + ".usage.vcores");
metrics.remove("variable.queue." + queueName + ".minshare.memory");
metrics.remove("variable.queue." + queueName + ".minshare.vcores");
metrics.remove("variable.queue." + queueName + ".maxshare.memory");
metrics.remove("variable.queue." + queueName + ".maxshare.vcores");
metrics.remove("variable.queue." + queueName + ".fairshare.memory");
metrics.remove("variable.queue." + queueName + ".fairshare.vcores");
for (Metric metric: Metric.values()) {
metrics.remove("variable.queue." + queueName + "." +
metric.value + ".memory");
metrics.remove("variable.queue." + queueName + "." +
metric.value + ".vcores");
}
}
}

View File

@ -70,7 +70,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
@ -793,17 +792,15 @@ final public class ResourceSchedulerWrapper
}
// API open to out classes
public void addTrackedApp(ApplicationAttemptId appAttemptId,
String oldAppId) {
public void addTrackedApp(ApplicationId appId, String oldAppId) {
if (metricsON) {
schedulerMetrics.trackApp(appAttemptId, oldAppId);
schedulerMetrics.trackApp(appId, oldAppId);
}
}
public void removeTrackedApp(ApplicationAttemptId appAttemptId,
String oldAppId) {
public void removeTrackedApp(String oldAppId) {
if (metricsON) {
schedulerMetrics.untrackApp(appAttemptId, oldAppId);
schedulerMetrics.untrackApp(oldAppId);
}
}

View File

@ -839,17 +839,16 @@ public class SLSCapacityScheduler extends CapacityScheduler implements
}
// API open to out classes
public void addTrackedApp(ApplicationAttemptId appAttemptId,
public void addTrackedApp(ApplicationId appId,
String oldAppId) {
if (metricsON) {
schedulerMetrics.trackApp(appAttemptId, oldAppId);
schedulerMetrics.trackApp(appId, oldAppId);
}
}
public void removeTrackedApp(ApplicationAttemptId appAttemptId,
String oldAppId) {
public void removeTrackedApp(String oldAppId) {
if (metricsON) {
schedulerMetrics.untrackApp(appAttemptId, oldAppId);
schedulerMetrics.untrackApp(oldAppId);
}
}

View File

@ -23,11 +23,11 @@ import java.util.Set;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler
.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler
.SchedulerAppReport;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.MetricRegistry;
@ -42,42 +42,61 @@ public abstract class SchedulerMetrics {
protected Set<String> queueTrackedMetrics;
public SchedulerMetrics() {
appTrackedMetrics = new HashSet<String>();
appTrackedMetrics = new HashSet<>();
appTrackedMetrics.add("live.containers");
appTrackedMetrics.add("reserved.containers");
queueTrackedMetrics = new HashSet<String>();
queueTrackedMetrics = new HashSet<>();
}
public void init(ResourceScheduler scheduler, MetricRegistry metrics) {
this.scheduler = scheduler;
this.trackedQueues = new HashSet<String>();
this.trackedQueues = new HashSet<>();
this.metrics = metrics;
}
public void trackApp(final ApplicationAttemptId appAttemptId,
String oldAppId) {
protected SchedulerApplicationAttempt getSchedulerAppAttempt(
ApplicationId appId) {
AbstractYarnScheduler yarnScheduler = (AbstractYarnScheduler)scheduler;
SchedulerApplication app = (SchedulerApplication)yarnScheduler
.getSchedulerApplications().get(appId);
if (app == null) {
return null;
}
return app.getCurrentAppAttempt();
}
public void trackApp(final ApplicationId appId, String oldAppId) {
metrics.register("variable.app." + oldAppId + ".live.containers",
new Gauge<Integer>() {
@Override
public Integer getValue() {
SchedulerAppReport app = scheduler.getSchedulerAppInfo(appAttemptId);
return app.getLiveContainers().size();
new Gauge<Integer>() {
@Override
public Integer getValue() {
SchedulerApplicationAttempt appAttempt =
getSchedulerAppAttempt(appId);
if (appAttempt != null) {
return appAttempt.getLiveContainers().size();
} else {
return 0;
}
}
}
}
);
metrics.register("variable.app." + oldAppId + ".reserved.containers",
new Gauge<Integer>() {
@Override
public Integer getValue() {
SchedulerAppReport app = scheduler.getSchedulerAppInfo(appAttemptId);
return app.getReservedContainers().size();
new Gauge<Integer>() {
@Override
public Integer getValue() {
SchedulerApplicationAttempt appAttempt =
getSchedulerAppAttempt(appId);
if (appAttempt != null) {
return appAttempt.getReservedContainers().size();
} else {
return 0;
}
}
}
}
);
}
public void untrackApp(ApplicationAttemptId appAttemptId,
String oldAppId) {
public void untrackApp(String oldAppId) {
for (String m : appTrackedMetrics) {
metrics.remove("variable.app." + oldAppId + "." + m);
}

View File

@ -21,7 +21,6 @@ import java.util.Set;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import com.codahale.metrics.MetricRegistry;
@ -30,18 +29,16 @@ import com.codahale.metrics.MetricRegistry;
@Unstable
public interface SchedulerWrapper {
public MetricRegistry getMetrics();
public SchedulerMetrics getSchedulerMetrics();
public Set<String> getQueueSet();
public void setQueueSet(Set<String> queues);
public Set<String> getTrackedAppSet();
public void setTrackedAppSet(Set<String> apps);
public void addTrackedApp(ApplicationAttemptId appAttemptId,
String oldAppId);
public void removeTrackedApp(ApplicationAttemptId appAttemptId,
String oldAppId);
public void addAMRuntime(ApplicationId appId,
long traceStartTimeMS, long traceEndTimeMS,
long simulateStartTimeMS, long simulateEndTimeMS);
MetricRegistry getMetrics();
SchedulerMetrics getSchedulerMetrics();
Set<String> getQueueSet();
void setQueueSet(Set<String> queues);
Set<String> getTrackedAppSet();
void setTrackedAppSet(Set<String> apps);
void addTrackedApp(ApplicationId appId, String oldAppId);
void removeTrackedApp(String oldAppId);
void addAMRuntime(ApplicationId appId,
long traceStartTimeMS, long traceEndTimeMS,
long simulateStartTimeMS, long simulateEndTimeMS);
}

View File

@ -17,32 +17,43 @@
*/
package org.apache.hadoop.yarn.sls.appmaster;
import com.codahale.metrics.MetricRegistry;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.sls.conf.SLSConfiguration;
import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator;
import org.apache.hadoop.yarn.sls.scheduler.FairSchedulerMetrics;
import org.apache.hadoop.yarn.sls.scheduler.SchedulerWrapper;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
public class TestAMSimulator {
private ResourceManager rm;
private YarnConfiguration conf;
private Path metricOutputDir;
@Before
public void setup() {
createMetricOutputDir();
conf = new YarnConfiguration();
conf.set(SLSConfiguration.METRICS_OUTPUT_DIR, metricOutputDir.toString());
conf.set(YarnConfiguration.RM_SCHEDULER,
"org.apache.hadoop.yarn.sls.scheduler.ResourceSchedulerWrapper");
conf.set(SLSConfiguration.RM_SCHEDULER,
"org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler");
conf.setBoolean(SLSConfiguration.METRICS_SWITCH, false);
conf.setBoolean(SLSConfiguration.METRICS_SWITCH, true);
rm = new ResourceManager();
rm.init(conf);
rm.start();
@ -64,14 +75,49 @@ public class TestAMSimulator {
}
}
private void verifySchedulerMetrics(String appId) {
SchedulerWrapper schedulerWrapper = (SchedulerWrapper)
rm.getResourceScheduler();
MetricRegistry metricRegistry = schedulerWrapper.getMetrics();
for (FairSchedulerMetrics.Metric metric :
FairSchedulerMetrics.Metric.values()) {
String key = "variable.app." + appId + "." + metric.getValue()
+ ".memory";
Assert.assertTrue(metricRegistry.getGauges().containsKey(key));
Assert.assertNotNull(metricRegistry.getGauges().get(key).getValue());
}
}
private void createMetricOutputDir() {
Path testDir = Paths.get(System.getProperty("test.build.data"));
try {
metricOutputDir = Files.createTempDirectory(testDir, "output");
} catch (IOException e) {
Assert.fail(e.toString());
}
}
private void deleteMetricOutputDir() {
try {
FileUtils.deleteDirectory(metricOutputDir.toFile());
} catch (IOException e) {
Assert.fail(e.toString());
}
}
@Test
public void testAMSimulator() throws Exception {
// Register one app
MockAMSimulator app = new MockAMSimulator();
List<ContainerSimulator> containers = new ArrayList<ContainerSimulator>();
app.init(1, 1000, containers, rm, null, 0, 1000000l, "user1", "default",
false, "app1");
String appId = "app1";
String queue = "default";
List<ContainerSimulator> containers = new ArrayList<>();
app.init(1, 1000, containers, rm, null, 0, 1000000L, "user1", queue,
true, appId);
app.firstStep();
verifySchedulerMetrics(appId);
Assert.assertEquals(1, rm.getRMContext().getRMApps().size());
Assert.assertNotNull(rm.getRMContext().getRMApps().get(app.appId));
@ -82,5 +128,7 @@ public class TestAMSimulator {
@After
public void tearDown() {
rm.stop();
deleteMetricOutputDir();
}
}