YARN-1471. Preserve scheduler typeinfo in simulator to work with resource monitors.

Contributed by Carlo Curino. Backport by Ye Zhou.

(cherry picked from commit e07f3c87cd)
This commit is contained in:
Christopher Douglas 2017-06-05 16:01:14 -07:00 committed by Konstantin V Shvachko
parent 1d3fa87011
commit d013f4134b
8 changed files with 958 additions and 19 deletions

View File

@ -54,12 +54,16 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.sls.appmaster.AMSimulator; import org.apache.hadoop.yarn.sls.appmaster.AMSimulator;
import org.apache.hadoop.yarn.sls.conf.SLSConfiguration; import org.apache.hadoop.yarn.sls.conf.SLSConfiguration;
import org.apache.hadoop.yarn.sls.nodemanager.NMSimulator; import org.apache.hadoop.yarn.sls.nodemanager.NMSimulator;
import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator; import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator;
import org.apache.hadoop.yarn.sls.scheduler.ResourceSchedulerWrapper; import org.apache.hadoop.yarn.sls.scheduler.ResourceSchedulerWrapper;
import org.apache.hadoop.yarn.sls.scheduler.SLSCapacityScheduler;
import org.apache.hadoop.yarn.sls.scheduler.TaskRunner; import org.apache.hadoop.yarn.sls.scheduler.TaskRunner;
import org.apache.hadoop.yarn.sls.scheduler.SchedulerWrapper;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.sls.utils.SLSUtils; import org.apache.hadoop.yarn.sls.utils.SLSUtils;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.codehaus.jackson.JsonFactory; import org.codehaus.jackson.JsonFactory;
@ -144,9 +148,9 @@ public class SLSRunner {
// start application masters // start application masters
startAM(); startAM();
// set queue & tracked apps information // set queue & tracked apps information
((ResourceSchedulerWrapper) rm.getResourceScheduler()) ((SchedulerWrapper) rm.getResourceScheduler())
.setQueueSet(this.queueAppNumMap.keySet()); .setQueueSet(this.queueAppNumMap.keySet());
((ResourceSchedulerWrapper) rm.getResourceScheduler()) ((SchedulerWrapper) rm.getResourceScheduler())
.setTrackedAppSet(this.trackedApps); .setTrackedAppSet(this.trackedApps);
// print out simulation info // print out simulation info
printSimulationInfo(); printSimulationInfo();
@ -155,13 +159,24 @@ public class SLSRunner {
// starting the runner once everything is ready to go, // starting the runner once everything is ready to go,
runner.start(); runner.start();
} }
private void startRM() throws IOException, ClassNotFoundException { private void startRM() throws IOException, ClassNotFoundException {
Configuration rmConf = new YarnConfiguration(); Configuration rmConf = new YarnConfiguration();
String schedulerClass = rmConf.get(YarnConfiguration.RM_SCHEDULER); String schedulerClass = rmConf.get(YarnConfiguration.RM_SCHEDULER);
rmConf.set(SLSConfiguration.RM_SCHEDULER, schedulerClass);
rmConf.set(YarnConfiguration.RM_SCHEDULER, // For CapacityScheduler we use a sub-classing instead of wrapping
ResourceSchedulerWrapper.class.getName()); // to allow scheduler-specific invocations from monitors to work
// this can be used for other schedulers as well if we care to
// exercise/track behaviors that are not common to the scheduler api
if(Class.forName(schedulerClass) == CapacityScheduler.class) {
rmConf.set(YarnConfiguration.RM_SCHEDULER,
SLSCapacityScheduler.class.getName());
} else {
rmConf.set(YarnConfiguration.RM_SCHEDULER,
ResourceSchedulerWrapper.class.getName());
rmConf.set(SLSConfiguration.RM_SCHEDULER, schedulerClass);
}
rmConf.set(SLSConfiguration.METRICS_OUTPUT_DIR, metricsOutputDir); rmConf.set(SLSConfiguration.METRICS_OUTPUT_DIR, metricsOutputDir);
rm = new ResourceManager(); rm = new ResourceManager();
rm.init(rmConf); rm.init(rmConf);

View File

@ -69,7 +69,7 @@ import org.apache.hadoop.yarn.util.Records;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator; import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator;
import org.apache.hadoop.yarn.sls.scheduler.ResourceSchedulerWrapper; import org.apache.hadoop.yarn.sls.scheduler.SchedulerWrapper;
import org.apache.hadoop.yarn.sls.SLSRunner; import org.apache.hadoop.yarn.sls.SLSRunner;
import org.apache.hadoop.yarn.sls.scheduler.TaskRunner; import org.apache.hadoop.yarn.sls.scheduler.TaskRunner;
import org.apache.hadoop.yarn.sls.utils.SLSUtils; import org.apache.hadoop.yarn.sls.utils.SLSUtils;
@ -190,7 +190,7 @@ public abstract class AMSimulator extends TaskRunner.Task {
simulateFinishTimeMS = System.currentTimeMillis() - simulateFinishTimeMS = System.currentTimeMillis() -
SLSRunner.getRunner().getStartTimeMS(); SLSRunner.getRunner().getStartTimeMS();
// record job running information // record job running information
((ResourceSchedulerWrapper)rm.getResourceScheduler()) ((SchedulerWrapper)rm.getResourceScheduler())
.addAMRuntime(appId, .addAMRuntime(appId,
traceStartTimeMS, traceFinishTimeMS, traceStartTimeMS, traceFinishTimeMS,
simulateStartTimeMS, simulateFinishTimeMS); simulateStartTimeMS, simulateFinishTimeMS);
@ -315,13 +315,13 @@ public abstract class AMSimulator extends TaskRunner.Task {
private void trackApp() { private void trackApp() {
if (isTracked) { if (isTracked) {
((ResourceSchedulerWrapper) rm.getResourceScheduler()) ((SchedulerWrapper) rm.getResourceScheduler())
.addTrackedApp(appAttemptId, oldAppId); .addTrackedApp(appAttemptId, oldAppId);
} }
} }
public void untrackApp() { public void untrackApp() {
if (isTracked) { if (isTracked) {
((ResourceSchedulerWrapper) rm.getResourceScheduler()) ((SchedulerWrapper) rm.getResourceScheduler())
.removeTrackedApp(appAttemptId, oldAppId); .removeTrackedApp(appAttemptId, oldAppId);
} }
} }

View File

@ -104,7 +104,7 @@ import com.codahale.metrics.Timer;
@Unstable @Unstable
public class ResourceSchedulerWrapper public class ResourceSchedulerWrapper
extends AbstractYarnScheduler<SchedulerApplicationAttempt, SchedulerNode> extends AbstractYarnScheduler<SchedulerApplicationAttempt, SchedulerNode>
implements ResourceScheduler, Configurable { implements SchedulerWrapper, ResourceScheduler, Configurable {
private static final String EOL = System.getProperty("line.separator"); private static final String EOL = System.getProperty("line.separator");
private static final int SAMPLING_SIZE = 60; private static final int SAMPLING_SIZE = 60;
private ScheduledExecutorService pool; private ScheduledExecutorService pool;
@ -169,9 +169,8 @@ public class ResourceSchedulerWrapper
public void setConf(Configuration conf) { public void setConf(Configuration conf) {
this.conf = conf; this.conf = conf;
// set scheduler // set scheduler
Class<? extends ResourceScheduler> klass = Class<? extends ResourceScheduler> klass = conf.getClass(
conf.getClass(SLSConfiguration.RM_SCHEDULER, null, SLSConfiguration.RM_SCHEDULER, null, ResourceScheduler.class);
ResourceScheduler.class);
scheduler = ReflectionUtils.newInstance(klass, conf); scheduler = ReflectionUtils.newInstance(klass, conf);
// start metrics // start metrics
@ -980,4 +979,3 @@ public class ResourceSchedulerWrapper
} }
} }

View File

@ -0,0 +1,812 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.sls.scheduler;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.yarn.sls.SLSRunner;
import org.apache.hadoop.yarn.sls.conf.SLSConfiguration;
import org.apache.hadoop.yarn.sls.web.SLSWebApp;
import com.codahale.metrics.Counter;
import com.codahale.metrics.CsvReporter;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.SlidingWindowReservoir;
import com.codahale.metrics.Timer;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode
.UpdatedContainerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler
.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler
.SchedulerAppReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler
.SchedulerNodeReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event
.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event
.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event
.SchedulerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair
.FairScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo
.FifoScheduler;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.log4j.Logger;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class SLSCapacityScheduler extends CapacityScheduler implements
SchedulerWrapper,Configurable {
private static final String EOL = System.getProperty("line.separator");
private static final int SAMPLING_SIZE = 60;
private ScheduledExecutorService pool;
// counters for scheduler allocate/handle operations
private Counter schedulerAllocateCounter;
private Counter schedulerHandleCounter;
private Map<SchedulerEventType, Counter> schedulerHandleCounterMap;
// Timers for scheduler allocate/handle operations
private Timer schedulerAllocateTimer;
private Timer schedulerHandleTimer;
private Map<SchedulerEventType, Timer> schedulerHandleTimerMap;
private List<Histogram> schedulerHistogramList;
private Map<Histogram, Timer> histogramTimerMap;
private Lock samplerLock;
private Lock queueLock;
private Configuration conf;
private Map<ApplicationAttemptId, String> appQueueMap =
new ConcurrentHashMap<ApplicationAttemptId, String>();
private BufferedWriter jobRuntimeLogBW;
// Priority of the ResourceSchedulerWrapper shutdown hook.
public static final int SHUTDOWN_HOOK_PRIORITY = 30;
// web app
private SLSWebApp web;
private Map<ContainerId, Resource> preemptionContainerMap =
new ConcurrentHashMap<ContainerId, Resource>();
// metrics
private MetricRegistry metrics;
private SchedulerMetrics schedulerMetrics;
private boolean metricsON;
private String metricsOutputDir;
private BufferedWriter metricsLogBW;
private boolean running = false;
private static Map<Class, Class> defaultSchedulerMetricsMap =
new HashMap<Class, Class>();
static {
defaultSchedulerMetricsMap.put(FairScheduler.class,
FairSchedulerMetrics.class);
defaultSchedulerMetricsMap.put(FifoScheduler.class,
FifoSchedulerMetrics.class);
defaultSchedulerMetricsMap.put(CapacityScheduler.class,
CapacitySchedulerMetrics.class);
}
// must set by outside
private Set<String> queueSet;
private Set<String> trackedAppSet;
public final Logger LOG = Logger.getLogger(SLSCapacityScheduler.class);
public SLSCapacityScheduler() {
samplerLock = new ReentrantLock();
queueLock = new ReentrantLock();
}
@Override
public void setConf(Configuration conf) {
this.conf = conf;
super.setConf(conf);
// start metrics
metricsON = conf.getBoolean(SLSConfiguration.METRICS_SWITCH, true);
if (metricsON) {
try {
initMetrics();
} catch (Exception e) {
e.printStackTrace();
}
}
ShutdownHookManager.get().addShutdownHook(new Runnable() {
@Override
public void run() {
try {
if (metricsLogBW != null) {
metricsLogBW.write("]");
metricsLogBW.close();
}
if (web != null) {
web.stop();
}
tearDown();
} catch (Exception e) {
e.printStackTrace();
}
}
}, SHUTDOWN_HOOK_PRIORITY);
}
@Override
public Allocation allocate(ApplicationAttemptId attemptId,
List<ResourceRequest> resourceRequests,
List<ContainerId> containerIds,
List<String> strings, List<String> strings2,
List<UpdateContainerRequest> increaseRequests,
List<UpdateContainerRequest> decreaseRequests) {
if (metricsON) {
final Timer.Context context = schedulerAllocateTimer.time();
Allocation allocation = null;
try {
allocation = super.allocate(attemptId, resourceRequests, containerIds,
strings, strings2, increaseRequests, decreaseRequests);
return allocation;
} finally {
context.stop();
schedulerAllocateCounter.inc();
try {
updateQueueWithAllocateRequest(allocation, attemptId,
resourceRequests, containerIds);
} catch (IOException e) {
e.printStackTrace();
}
}
} else {
return super.allocate(attemptId, resourceRequests, containerIds,
strings, strings2, increaseRequests, decreaseRequests);
}
}
@Override
public void handle(SchedulerEvent schedulerEvent) {
// metrics off
if (! metricsON) {
super.handle(schedulerEvent);
return;
}
if(!running) running = true;
// metrics on
Timer.Context handlerTimer = null;
Timer.Context operationTimer = null;
NodeUpdateSchedulerEventWrapper eventWrapper;
try {
//if (schedulerEvent instanceof NodeUpdateSchedulerEvent) {
if (schedulerEvent.getType() == SchedulerEventType.NODE_UPDATE
&& schedulerEvent instanceof NodeUpdateSchedulerEvent) {
eventWrapper = new NodeUpdateSchedulerEventWrapper(
(NodeUpdateSchedulerEvent)schedulerEvent);
schedulerEvent = eventWrapper;
updateQueueWithNodeUpdate(eventWrapper);
} else if (schedulerEvent.getType() == SchedulerEventType.APP_ATTEMPT_REMOVED
&& schedulerEvent instanceof AppAttemptRemovedSchedulerEvent) {
// check if having AM Container, update resource usage information
AppAttemptRemovedSchedulerEvent appRemoveEvent =
(AppAttemptRemovedSchedulerEvent) schedulerEvent;
ApplicationAttemptId appAttemptId =
appRemoveEvent.getApplicationAttemptID();
String queue = appQueueMap.get(appAttemptId);
SchedulerAppReport app = super.getSchedulerAppInfo(appAttemptId);
if (! app.getLiveContainers().isEmpty()) { // have 0 or 1
// should have one container which is AM container
RMContainer rmc = app.getLiveContainers().iterator().next();
updateQueueMetrics(queue,
rmc.getContainer().getResource().getMemory(),
rmc.getContainer().getResource().getVirtualCores());
}
}
handlerTimer = schedulerHandleTimer.time();
operationTimer = schedulerHandleTimerMap
.get(schedulerEvent.getType()).time();
super.handle(schedulerEvent);
} finally {
if (handlerTimer != null) handlerTimer.stop();
if (operationTimer != null) operationTimer.stop();
schedulerHandleCounter.inc();
schedulerHandleCounterMap.get(schedulerEvent.getType()).inc();
if (schedulerEvent.getType() == SchedulerEventType.APP_ATTEMPT_REMOVED
&& schedulerEvent instanceof AppAttemptRemovedSchedulerEvent) {
SLSRunner.decreaseRemainingApps();
AppAttemptRemovedSchedulerEvent appRemoveEvent =
(AppAttemptRemovedSchedulerEvent) schedulerEvent;
ApplicationAttemptId appAttemptId =
appRemoveEvent.getApplicationAttemptID();
appQueueMap.remove(appRemoveEvent.getApplicationAttemptID());
} else if (schedulerEvent.getType() == SchedulerEventType.APP_ATTEMPT_ADDED
&& schedulerEvent instanceof AppAttemptAddedSchedulerEvent) {
AppAttemptAddedSchedulerEvent appAddEvent =
(AppAttemptAddedSchedulerEvent) schedulerEvent;
SchedulerApplication app =
applications.get(appAddEvent.getApplicationAttemptId().getApplicationId());
appQueueMap.put(appAddEvent.getApplicationAttemptId(), app.getQueue().getQueueName());
}
}
}
private void updateQueueWithNodeUpdate(
NodeUpdateSchedulerEventWrapper eventWrapper) {
RMNodeWrapper node = (RMNodeWrapper) eventWrapper.getRMNode();
List<UpdatedContainerInfo> containerList = node.getContainerUpdates();
for (UpdatedContainerInfo info : containerList) {
for (ContainerStatus status : info.getCompletedContainers()) {
ContainerId containerId = status.getContainerId();
SchedulerAppReport app = super.getSchedulerAppInfo(
containerId.getApplicationAttemptId());
if (app == null) {
// this happens for the AM container
// The app have already removed when the NM sends the release
// information.
continue;
}
String queue = appQueueMap.get(containerId.getApplicationAttemptId());
int releasedMemory = 0, releasedVCores = 0;
if (status.getExitStatus() == ContainerExitStatus.SUCCESS) {
for (RMContainer rmc : app.getLiveContainers()) {
if (rmc.getContainerId() == containerId) {
releasedMemory += rmc.getContainer().getResource().getMemory();
releasedVCores += rmc.getContainer()
.getResource().getVirtualCores();
break;
}
}
} else if (status.getExitStatus() == ContainerExitStatus.ABORTED) {
if (preemptionContainerMap.containsKey(containerId)) {
Resource preResource = preemptionContainerMap.get(containerId);
releasedMemory += preResource.getMemory();
releasedVCores += preResource.getVirtualCores();
preemptionContainerMap.remove(containerId);
}
}
// update queue counters
updateQueueMetrics(queue, releasedMemory, releasedVCores);
}
}
}
private void updateQueueWithAllocateRequest(Allocation allocation,
ApplicationAttemptId attemptId,
List<ResourceRequest> resourceRequests,
List<ContainerId> containerIds) throws IOException {
// update queue information
Resource pendingResource = Resources.createResource(0, 0);
Resource allocatedResource = Resources.createResource(0, 0);
String queueName = appQueueMap.get(attemptId);
// container requested
for (ResourceRequest request : resourceRequests) {
if (request.getResourceName().equals(ResourceRequest.ANY)) {
Resources.addTo(pendingResource,
Resources.multiply(request.getCapability(),
request.getNumContainers()));
}
}
// container allocated
for (Container container : allocation.getContainers()) {
Resources.addTo(allocatedResource, container.getResource());
Resources.subtractFrom(pendingResource, container.getResource());
}
// container released from AM
SchedulerAppReport report = super.getSchedulerAppInfo(attemptId);
for (ContainerId containerId : containerIds) {
Container container = null;
for (RMContainer c : report.getLiveContainers()) {
if (c.getContainerId().equals(containerId)) {
container = c.getContainer();
break;
}
}
if (container != null) {
// released allocated containers
Resources.subtractFrom(allocatedResource, container.getResource());
} else {
for (RMContainer c : report.getReservedContainers()) {
if (c.getContainerId().equals(containerId)) {
container = c.getContainer();
break;
}
}
if (container != null) {
// released reserved containers
Resources.subtractFrom(pendingResource, container.getResource());
}
}
}
// containers released/preemption from scheduler
Set<ContainerId> preemptionContainers = new HashSet<ContainerId>();
if (allocation.getContainerPreemptions() != null) {
preemptionContainers.addAll(allocation.getContainerPreemptions());
}
if (allocation.getStrictContainerPreemptions() != null) {
preemptionContainers.addAll(allocation.getStrictContainerPreemptions());
}
if (! preemptionContainers.isEmpty()) {
for (ContainerId containerId : preemptionContainers) {
if (! preemptionContainerMap.containsKey(containerId)) {
Container container = null;
for (RMContainer c : report.getLiveContainers()) {
if (c.getContainerId().equals(containerId)) {
container = c.getContainer();
break;
}
}
if (container != null) {
preemptionContainerMap.put(containerId, container.getResource());
}
}
}
}
// update metrics
SortedMap<String, Counter> counterMap = metrics.getCounters();
String names[] = new String[]{
"counter.queue." + queueName + ".pending.memory",
"counter.queue." + queueName + ".pending.cores",
"counter.queue." + queueName + ".allocated.memory",
"counter.queue." + queueName + ".allocated.cores"};
int values[] = new int[]{pendingResource.getMemory(),
pendingResource.getVirtualCores(),
allocatedResource.getMemory(), allocatedResource.getVirtualCores()};
for (int i = names.length - 1; i >= 0; i --) {
if (! counterMap.containsKey(names[i])) {
metrics.counter(names[i]);
counterMap = metrics.getCounters();
}
counterMap.get(names[i]).inc(values[i]);
}
queueLock.lock();
try {
if (! schedulerMetrics.isTracked(queueName)) {
schedulerMetrics.trackQueue(queueName);
}
} finally {
queueLock.unlock();
}
}
private void tearDown() throws IOException {
// close job runtime writer
if (jobRuntimeLogBW != null) {
jobRuntimeLogBW.close();
}
// shut pool
if (pool != null) pool.shutdown();
}
@SuppressWarnings("unchecked")
private void initMetrics() throws Exception {
metrics = new MetricRegistry();
// configuration
metricsOutputDir = conf.get(SLSConfiguration.METRICS_OUTPUT_DIR);
int metricsWebAddressPort = conf.getInt(
SLSConfiguration.METRICS_WEB_ADDRESS_PORT,
SLSConfiguration.METRICS_WEB_ADDRESS_PORT_DEFAULT);
// create SchedulerMetrics for current scheduler
String schedulerMetricsType = conf.get(CapacityScheduler.class.getName());
Class schedulerMetricsClass = schedulerMetricsType == null?
defaultSchedulerMetricsMap.get(CapacityScheduler.class) :
Class.forName(schedulerMetricsType);
schedulerMetrics = (SchedulerMetrics)ReflectionUtils
.newInstance(schedulerMetricsClass, new Configuration());
schedulerMetrics.init(this, metrics);
// register various metrics
registerJvmMetrics();
registerClusterResourceMetrics();
registerContainerAppNumMetrics();
registerSchedulerMetrics();
// .csv output
initMetricsCSVOutput();
// start web app to provide real-time tracking
web = new SLSWebApp(this, metricsWebAddressPort);
web.start();
// a thread to update histogram timer
pool = new ScheduledThreadPoolExecutor(2);
pool.scheduleAtFixedRate(new HistogramsRunnable(), 0, 1000,
TimeUnit.MILLISECONDS);
// a thread to output metrics for real-tiem tracking
pool.scheduleAtFixedRate(new MetricsLogRunnable(), 0, 1000,
TimeUnit.MILLISECONDS);
// application running information
jobRuntimeLogBW = new BufferedWriter(
new FileWriter(metricsOutputDir + "/jobruntime.csv"));
jobRuntimeLogBW.write("JobID,real_start_time,real_end_time," +
"simulate_start_time,simulate_end_time" + EOL);
jobRuntimeLogBW.flush();
}
private void registerJvmMetrics() {
// add JVM gauges
metrics.register("variable.jvm.free.memory",
new Gauge<Long>() {
@Override
public Long getValue() {
return Runtime.getRuntime().freeMemory();
}
}
);
metrics.register("variable.jvm.max.memory",
new Gauge<Long>() {
@Override
public Long getValue() {
return Runtime.getRuntime().maxMemory();
}
}
);
metrics.register("variable.jvm.total.memory",
new Gauge<Long>() {
@Override
public Long getValue() {
return Runtime.getRuntime().totalMemory();
}
}
);
}
private void registerClusterResourceMetrics() {
metrics.register("variable.cluster.allocated.memory",
new Gauge<Long>() {
@Override
public Long getValue() {
if( getRootQueueMetrics() == null) {
return 0L;
} else {
return getRootQueueMetrics().getAllocatedMB();
}
}
}
);
metrics.register("variable.cluster.allocated.vcores",
new Gauge<Integer>() {
@Override
public Integer getValue() {
if(getRootQueueMetrics() == null) {
return 0;
} else {
return getRootQueueMetrics().getAllocatedVirtualCores();
}
}
}
);
metrics.register("variable.cluster.available.memory",
new Gauge<Long>() {
@Override
public Long getValue() {
if(getRootQueueMetrics() == null) {
return 0L;
} else {
return getRootQueueMetrics().getAvailableMB();
}
}
}
);
metrics.register("variable.cluster.available.vcores",
new Gauge<Integer>() {
@Override
public Integer getValue() {
if(getRootQueueMetrics() == null) {
return 0;
} else {
return getRootQueueMetrics().getAvailableVirtualCores();
}
}
}
);
}
private void registerContainerAppNumMetrics() {
metrics.register("variable.running.application",
new Gauge<Integer>() {
@Override
public Integer getValue() {
if(getRootQueueMetrics() == null) {
return 0;
} else {
return getRootQueueMetrics().getAppsRunning();
}
}
}
);
metrics.register("variable.running.container",
new Gauge<Integer>() {
@Override
public Integer getValue() {
if(getRootQueueMetrics() == null) {
return 0;
} else {
return getRootQueueMetrics().getAllocatedContainers();
}
}
}
);
}
private void registerSchedulerMetrics() {
samplerLock.lock();
try {
// counters for scheduler operations
schedulerAllocateCounter = metrics.counter(
"counter.scheduler.operation.allocate");
schedulerHandleCounter = metrics.counter(
"counter.scheduler.operation.handle");
schedulerHandleCounterMap = new HashMap<SchedulerEventType, Counter>();
for (SchedulerEventType e : SchedulerEventType.values()) {
Counter counter = metrics.counter(
"counter.scheduler.operation.handle." + e);
schedulerHandleCounterMap.put(e, counter);
}
// timers for scheduler operations
int timeWindowSize = conf.getInt(
SLSConfiguration.METRICS_TIMER_WINDOW_SIZE,
SLSConfiguration.METRICS_TIMER_WINDOW_SIZE_DEFAULT);
schedulerAllocateTimer = new Timer(
new SlidingWindowReservoir(timeWindowSize));
schedulerHandleTimer = new Timer(
new SlidingWindowReservoir(timeWindowSize));
schedulerHandleTimerMap = new HashMap<SchedulerEventType, Timer>();
for (SchedulerEventType e : SchedulerEventType.values()) {
Timer timer = new Timer(new SlidingWindowReservoir(timeWindowSize));
schedulerHandleTimerMap.put(e, timer);
}
// histogram for scheduler operations (Samplers)
schedulerHistogramList = new ArrayList<Histogram>();
histogramTimerMap = new HashMap<Histogram, Timer>();
Histogram schedulerAllocateHistogram = new Histogram(
new SlidingWindowReservoir(SAMPLING_SIZE));
metrics.register("sampler.scheduler.operation.allocate.timecost",
schedulerAllocateHistogram);
schedulerHistogramList.add(schedulerAllocateHistogram);
histogramTimerMap.put(schedulerAllocateHistogram, schedulerAllocateTimer);
Histogram schedulerHandleHistogram = new Histogram(
new SlidingWindowReservoir(SAMPLING_SIZE));
metrics.register("sampler.scheduler.operation.handle.timecost",
schedulerHandleHistogram);
schedulerHistogramList.add(schedulerHandleHistogram);
histogramTimerMap.put(schedulerHandleHistogram, schedulerHandleTimer);
for (SchedulerEventType e : SchedulerEventType.values()) {
Histogram histogram = new Histogram(
new SlidingWindowReservoir(SAMPLING_SIZE));
metrics.register(
"sampler.scheduler.operation.handle." + e + ".timecost",
histogram);
schedulerHistogramList.add(histogram);
histogramTimerMap.put(histogram, schedulerHandleTimerMap.get(e));
}
} finally {
samplerLock.unlock();
}
}
private void initMetricsCSVOutput() {
int timeIntervalMS = conf.getInt(
SLSConfiguration.METRICS_RECORD_INTERVAL_MS,
SLSConfiguration.METRICS_RECORD_INTERVAL_MS_DEFAULT);
File dir = new File(metricsOutputDir + "/metrics");
if(! dir.exists()
&& ! dir.mkdirs()) {
LOG.error("Cannot create directory " + dir.getAbsoluteFile());
}
final CsvReporter reporter = CsvReporter.forRegistry(metrics)
.formatFor(Locale.US)
.convertRatesTo(TimeUnit.SECONDS)
.convertDurationsTo(TimeUnit.MILLISECONDS)
.build(new File(metricsOutputDir + "/metrics"));
reporter.start(timeIntervalMS, TimeUnit.MILLISECONDS);
}
class HistogramsRunnable implements Runnable {
@Override
public void run() {
samplerLock.lock();
try {
for (Histogram histogram : schedulerHistogramList) {
Timer timer = histogramTimerMap.get(histogram);
histogram.update((int) timer.getSnapshot().getMean());
}
} finally {
samplerLock.unlock();
}
}
}
class MetricsLogRunnable implements Runnable {
private boolean firstLine = true;
public MetricsLogRunnable() {
try {
metricsLogBW = new BufferedWriter(
new FileWriter(metricsOutputDir + "/realtimetrack.json"));
metricsLogBW.write("[");
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
if(running) {
// all WebApp to get real tracking json
String metrics = web.generateRealTimeTrackingMetrics();
// output
try {
if(firstLine) {
metricsLogBW.write(metrics + EOL);
firstLine = false;
} else {
metricsLogBW.write("," + metrics + EOL);
}
metricsLogBW.flush();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
// the following functions are used by AMSimulator
public void addAMRuntime(ApplicationId appId,
long traceStartTimeMS, long traceEndTimeMS,
long simulateStartTimeMS, long simulateEndTimeMS) {
try {
// write job runtime information
StringBuilder sb = new StringBuilder();
sb.append(appId).append(",").append(traceStartTimeMS).append(",")
.append(traceEndTimeMS).append(",").append(simulateStartTimeMS)
.append(",").append(simulateEndTimeMS);
jobRuntimeLogBW.write(sb.toString() + EOL);
jobRuntimeLogBW.flush();
} catch (IOException e) {
e.printStackTrace();
}
}
private void updateQueueMetrics(String queue,
int releasedMemory, int releasedVCores) {
// update queue counters
SortedMap<String, Counter> counterMap = metrics.getCounters();
if (releasedMemory != 0) {
String name = "counter.queue." + queue + ".allocated.memory";
if (! counterMap.containsKey(name)) {
metrics.counter(name);
counterMap = metrics.getCounters();
}
counterMap.get(name).inc(-releasedMemory);
}
if (releasedVCores != 0) {
String name = "counter.queue." + queue + ".allocated.cores";
if (! counterMap.containsKey(name)) {
metrics.counter(name);
counterMap = metrics.getCounters();
}
counterMap.get(name).inc(-releasedVCores);
}
}
public void setQueueSet(Set<String> queues) {
this.queueSet = queues;
}
public Set<String> getQueueSet() {
return this.queueSet;
}
public void setTrackedAppSet(Set<String> apps) {
this.trackedAppSet = apps;
}
public Set<String> getTrackedAppSet() {
return this.trackedAppSet;
}
public MetricRegistry getMetrics() {
return metrics;
}
public SchedulerMetrics getSchedulerMetrics() {
return schedulerMetrics;
}
// API open to out classes
public void addTrackedApp(ApplicationAttemptId appAttemptId,
String oldAppId) {
if (metricsON) {
schedulerMetrics.trackApp(appAttemptId, oldAppId);
}
}
public void removeTrackedApp(ApplicationAttemptId appAttemptId,
String oldAppId) {
if (metricsON) {
schedulerMetrics.untrackApp(appAttemptId, oldAppId);
}
}
@Override
public Configuration getConf() {
return conf;
}
}

View File

@ -0,0 +1,43 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.sls.scheduler;
import java.util.Set;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import com.codahale.metrics.MetricRegistry;
public interface SchedulerWrapper {
public MetricRegistry getMetrics();
public SchedulerMetrics getSchedulerMetrics();
public Set<String> getQueueSet();
public void setQueueSet(Set<String> queues);
public Set<String> getTrackedAppSet();
public void setTrackedAppSet(Set<String> apps);
public void addTrackedApp(ApplicationAttemptId appAttemptId,
String oldAppId);
public void removeTrackedApp(ApplicationAttemptId appAttemptId,
String oldAppId);
public void addAMRuntime(ApplicationId appId,
long traceStartTimeMS, long traceEndTimeMS,
long simulateStartTimeMS, long simulateEndTimeMS);
}

View File

@ -36,7 +36,7 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
import org.apache.hadoop.yarn.sls.SLSRunner; import org.apache.hadoop.yarn.sls.SLSRunner;
import org.apache.hadoop.yarn.sls.scheduler.FairSchedulerMetrics; import org.apache.hadoop.yarn.sls.scheduler.FairSchedulerMetrics;
import org.apache.hadoop.yarn.sls.scheduler.ResourceSchedulerWrapper; import org.apache.hadoop.yarn.sls.scheduler.SchedulerWrapper;
import org.apache.hadoop.yarn.sls.scheduler.SchedulerMetrics; import org.apache.hadoop.yarn.sls.scheduler.SchedulerMetrics;
import org.mortbay.jetty.Handler; import org.mortbay.jetty.Handler;
import org.mortbay.jetty.Request; import org.mortbay.jetty.Request;
@ -54,7 +54,7 @@ import com.codahale.metrics.MetricRegistry;
public class SLSWebApp extends HttpServlet { public class SLSWebApp extends HttpServlet {
private static final long serialVersionUID = 1905162041950251407L; private static final long serialVersionUID = 1905162041950251407L;
private transient Server server; private transient Server server;
private transient ResourceSchedulerWrapper wrapper; private transient SchedulerWrapper wrapper;
private transient MetricRegistry metrics; private transient MetricRegistry metrics;
private transient SchedulerMetrics schedulerMetrics; private transient SchedulerMetrics schedulerMetrics;
// metrics objects // metrics objects
@ -103,7 +103,7 @@ public class SLSWebApp extends HttpServlet {
queueAllocatedVCoresCounterMap = new HashMap<>(); queueAllocatedVCoresCounterMap = new HashMap<>();
} }
public SLSWebApp(ResourceSchedulerWrapper wrapper, int metricsAddressPort) { public SLSWebApp(SchedulerWrapper wrapper, int metricsAddressPort) {
this.wrapper = wrapper; this.wrapper = wrapper;
metrics = wrapper.getMetrics(); metrics = wrapper.getMetrics();
handleOperTimecostHistogramMap = handleOperTimecostHistogramMap =

View File

@ -0,0 +1,60 @@
<?xml version="1.0"?>
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. See accompanying LICENSE file.
-->
<!--
This file contains queue allocations for the Capacity Scheduler.
Its format is explained in the Capacity Scheduler documentation at
http://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/CapacityScheduler.html.
The documentation also includes a sample config file.
-->
<configuration>
<property>
<name>yarn.scheduler.capacity.root.queues</name>
<value>sls_queue_1,sls_queue_2,sls_queue_3</value>
<description>The queues at the this level (root is the root queue).
</description>
</property>
<property>
<name>yarn.scheduler.capacity.root.sls_queue_1.capacity</name>
<value>25</value>
</property>
<property>
<name>yarn.scheduler.capacity.root.sls_queue_1.maximum-capacity</name>
<value>100</value>
</property>
<property>
<name>yarn.scheduler.capacity.root.sls_queue_2.capacity</name>
<value>25</value>
</property>
<property>
<name>yarn.scheduler.capacity.root.sls_queue_2.maximum-capacity</name>
<value>100</value>
</property>
<property>
<name>yarn.scheduler.capacity.root.sls_queue_3.capacity</name>
<value>50</value>
</property>
<property>
<name>yarn.scheduler.capacity.root.sls_queue_3.maximum-capacity</name>
<value>100</value>
</property>
</configuration>

View File

@ -17,7 +17,18 @@
<configuration> <configuration>
<property> <property>
<name>yarn.resourcemanager.scheduler.class</name> <name>yarn.resourcemanager.scheduler.class</name>
<value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler</value> <value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler</value>
<!-- <value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler</value> -->
</property>
<property>
<name>yarn.resourcemanager.scheduler.monitor.enable</name>
<value>true</value>
</property>
<property>
<name>yarn.resourcemanager.scheduler.monitor.policies</name>
<value>org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy</value>
</property> </property>
<property> <property>