YARN-5654. Not be able to run SLS with FairScheduler (yufeigu via rkanter)

This commit is contained in:
Robert Kanter 2017-03-29 16:18:13 -07:00
parent 4966a6e26e
commit 6a5516c238
12 changed files with 1103 additions and 1631 deletions

View File

@ -59,16 +59,14 @@ import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.sls.appmaster.AMSimulator;
import org.apache.hadoop.yarn.sls.conf.SLSConfiguration;
import org.apache.hadoop.yarn.sls.nodemanager.NMSimulator;
import org.apache.hadoop.yarn.sls.resourcemanager.MockAMLauncher;
import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator;
import org.apache.hadoop.yarn.sls.scheduler.ResourceSchedulerWrapper;
import org.apache.hadoop.yarn.sls.scheduler.SLSCapacityScheduler;
import org.apache.hadoop.yarn.sls.scheduler.SchedulerWrapper;
import org.apache.hadoop.yarn.sls.scheduler.TaskRunner;
import org.apache.hadoop.yarn.sls.scheduler.*;
import org.apache.hadoop.yarn.sls.utils.SLSUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.log4j.Logger;
@ -152,9 +150,9 @@ public class SLSRunner {
// start application masters
startAM();
// set queue & tracked apps information
((SchedulerWrapper) rm.getResourceScheduler())
((SchedulerWrapper) rm.getResourceScheduler()).getTracker()
.setQueueSet(this.queueAppNumMap.keySet());
((SchedulerWrapper) rm.getResourceScheduler())
((SchedulerWrapper) rm.getResourceScheduler()).getTracker()
.setTrackedAppSet(this.trackedApps);
// print out simulation info
printSimulationInfo();
@ -164,7 +162,7 @@ public class SLSRunner {
runner.start();
}
private void startRM() throws IOException, ClassNotFoundException {
private void startRM() throws Exception {
Configuration rmConf = new YarnConfiguration();
String schedulerClass = rmConf.get(YarnConfiguration.RM_SCHEDULER);
@ -175,10 +173,12 @@ public class SLSRunner {
if(Class.forName(schedulerClass) == CapacityScheduler.class) {
rmConf.set(YarnConfiguration.RM_SCHEDULER,
SLSCapacityScheduler.class.getName());
} else {
} else if (Class.forName(schedulerClass) == FairScheduler.class) {
rmConf.set(YarnConfiguration.RM_SCHEDULER,
ResourceSchedulerWrapper.class.getName());
rmConf.set(SLSConfiguration.RM_SCHEDULER, schedulerClass);
SLSFairScheduler.class.getName());
} else if (Class.forName(schedulerClass) == FifoScheduler.class){
// TODO add support for FifoScheduler
throw new Exception("Fifo Scheduler is not supported yet.");
}
rmConf.set(SLSConfiguration.METRICS_OUTPUT_DIR, metricsOutputDir);

View File

@ -62,6 +62,7 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.sls.scheduler.SchedulerMetrics;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.log4j.Logger;
@ -219,10 +220,12 @@ public abstract class AMSimulator extends TaskRunner.Task {
simulateFinishTimeMS = System.currentTimeMillis() -
SLSRunner.getRunner().getStartTimeMS();
// record job running information
((SchedulerWrapper)rm.getResourceScheduler())
.addAMRuntime(appId,
traceStartTimeMS, traceFinishTimeMS,
simulateStartTimeMS, simulateFinishTimeMS);
SchedulerMetrics schedulerMetrics =
((SchedulerWrapper)rm.getResourceScheduler()).getSchedulerMetrics();
if (schedulerMetrics != null) {
schedulerMetrics.addAMRuntime(appId, traceStartTimeMS, traceFinishTimeMS,
simulateStartTimeMS, simulateFinishTimeMS);
}
}
protected ResourceRequest createResourceRequest(
@ -330,13 +333,20 @@ public abstract class AMSimulator extends TaskRunner.Task {
private void trackApp() {
if (isTracked) {
((SchedulerWrapper) rm.getResourceScheduler())
.addTrackedApp(appId, oldAppId);
SchedulerMetrics schedulerMetrics =
((SchedulerWrapper)rm.getResourceScheduler()).getSchedulerMetrics();
if (schedulerMetrics != null) {
schedulerMetrics.addTrackedApp(appId, oldAppId);
}
}
}
public void untrackApp() {
if (isTracked) {
((SchedulerWrapper) rm.getResourceScheduler()).removeTrackedApp(oldAppId);
SchedulerMetrics schedulerMetrics =
((SchedulerWrapper)rm.getResourceScheduler()).getSchedulerMetrics();
if (schedulerMetrics != null) {
schedulerMetrics.removeTrackedApp(oldAppId);
}
}
}

View File

@ -1,969 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.sls.scheduler;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
import org.apache.hadoop.yarn.sls.SLSRunner;
import org.apache.hadoop.yarn.sls.conf.SLSConfiguration;
import org.apache.hadoop.yarn.sls.web.SLSWebApp;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.log4j.Logger;
import com.codahale.metrics.Counter;
import com.codahale.metrics.CsvReporter;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.SlidingWindowReservoir;
import com.codahale.metrics.Timer;
@Private
@Unstable
final public class ResourceSchedulerWrapper
extends AbstractYarnScheduler<SchedulerApplicationAttempt, SchedulerNode>
implements SchedulerWrapper, ResourceScheduler, Configurable {
private static final String EOL = System.getProperty("line.separator");
private static final int SAMPLING_SIZE = 60;
private ScheduledExecutorService pool;
// counters for scheduler allocate/handle operations
private Counter schedulerAllocateCounter;
private Counter schedulerHandleCounter;
private Map<SchedulerEventType, Counter> schedulerHandleCounterMap;
// Timers for scheduler allocate/handle operations
private Timer schedulerAllocateTimer;
private Timer schedulerHandleTimer;
private Map<SchedulerEventType, Timer> schedulerHandleTimerMap;
private List<Histogram> schedulerHistogramList;
private Map<Histogram, Timer> histogramTimerMap;
private Lock samplerLock;
private Lock queueLock;
private Configuration conf;
private ResourceScheduler scheduler;
private Map<ApplicationId, String> appQueueMap =
new ConcurrentHashMap<ApplicationId, String>();
private BufferedWriter jobRuntimeLogBW;
// Priority of the ResourceSchedulerWrapper shutdown hook.
public static final int SHUTDOWN_HOOK_PRIORITY = 30;
// web app
private SLSWebApp web;
private Map<ContainerId, Resource> preemptionContainerMap =
new ConcurrentHashMap<ContainerId, Resource>();
// metrics
private MetricRegistry metrics;
private SchedulerMetrics schedulerMetrics;
private boolean metricsON;
private String metricsOutputDir;
private BufferedWriter metricsLogBW;
private boolean running = false;
private static Map<Class, Class> defaultSchedulerMetricsMap =
new HashMap<Class, Class>();
static {
defaultSchedulerMetricsMap.put(FairScheduler.class,
FairSchedulerMetrics.class);
defaultSchedulerMetricsMap.put(FifoScheduler.class,
FifoSchedulerMetrics.class);
defaultSchedulerMetricsMap.put(CapacityScheduler.class,
CapacitySchedulerMetrics.class);
}
// must set by outside
private Set<String> queueSet;
private Set<String> trackedAppSet;
public final Logger LOG = Logger.getLogger(ResourceSchedulerWrapper.class);
public ResourceSchedulerWrapper() {
super(ResourceSchedulerWrapper.class.getName());
samplerLock = new ReentrantLock();
queueLock = new ReentrantLock();
}
@Override
public void setConf(Configuration conf) {
this.conf = conf;
// set scheduler
Class<? extends ResourceScheduler> klass = conf.getClass(
SLSConfiguration.RM_SCHEDULER, null, ResourceScheduler.class);
scheduler = ReflectionUtils.newInstance(klass, conf);
// start metrics
metricsON = conf.getBoolean(SLSConfiguration.METRICS_SWITCH, true);
if (metricsON) {
try {
initMetrics();
} catch (Exception e) {
e.printStackTrace();
}
}
ShutdownHookManager.get().addShutdownHook(new Runnable() {
@Override
public void run() {
try {
if (metricsLogBW != null) {
metricsLogBW.write("]");
metricsLogBW.close();
}
if (web != null) {
web.stop();
}
tearDown();
} catch (Exception e) {
e.printStackTrace();
}
}
}, SHUTDOWN_HOOK_PRIORITY);
}
@Override
public Allocation allocate(ApplicationAttemptId attemptId,
List<ResourceRequest> resourceRequests, List<ContainerId> containerIds,
List<String> strings, List<String> strings2,
ContainerUpdates updateRequests) {
if (metricsON) {
final Timer.Context context = schedulerAllocateTimer.time();
Allocation allocation = null;
try {
allocation = scheduler.allocate(attemptId, resourceRequests,
containerIds, strings, strings2, updateRequests);
return allocation;
} finally {
context.stop();
schedulerAllocateCounter.inc();
try {
updateQueueWithAllocateRequest(allocation, attemptId,
resourceRequests, containerIds);
} catch (IOException e) {
e.printStackTrace();
}
}
} else {
return scheduler.allocate(attemptId,
resourceRequests, containerIds, strings, strings2, updateRequests);
}
}
@Override
public void handle(SchedulerEvent schedulerEvent) {
// metrics off
if (! metricsON) {
scheduler.handle(schedulerEvent);
return;
}
if(!running) running = true;
// metrics on
Timer.Context handlerTimer = null;
Timer.Context operationTimer = null;
NodeUpdateSchedulerEventWrapper eventWrapper;
try {
//if (schedulerEvent instanceof NodeUpdateSchedulerEvent) {
if (schedulerEvent.getType() == SchedulerEventType.NODE_UPDATE
&& schedulerEvent instanceof NodeUpdateSchedulerEvent) {
eventWrapper = new NodeUpdateSchedulerEventWrapper(
(NodeUpdateSchedulerEvent)schedulerEvent);
schedulerEvent = eventWrapper;
updateQueueWithNodeUpdate(eventWrapper);
} else if (schedulerEvent.getType() == SchedulerEventType.APP_ATTEMPT_REMOVED
&& schedulerEvent instanceof AppAttemptRemovedSchedulerEvent) {
// check if having AM Container, update resource usage information
AppAttemptRemovedSchedulerEvent appRemoveEvent =
(AppAttemptRemovedSchedulerEvent) schedulerEvent;
ApplicationAttemptId appAttemptId =
appRemoveEvent.getApplicationAttemptID();
String queue = appQueueMap.get(appAttemptId.getApplicationId());
SchedulerAppReport app = scheduler.getSchedulerAppInfo(appAttemptId);
if (! app.getLiveContainers().isEmpty()) { // have 0 or 1
// should have one container which is AM container
RMContainer rmc = app.getLiveContainers().iterator().next();
updateQueueMetrics(queue,
rmc.getContainer().getResource().getMemorySize(),
rmc.getContainer().getResource().getVirtualCores());
}
}
handlerTimer = schedulerHandleTimer.time();
operationTimer = schedulerHandleTimerMap
.get(schedulerEvent.getType()).time();
scheduler.handle(schedulerEvent);
} finally {
if (handlerTimer != null) handlerTimer.stop();
if (operationTimer != null) operationTimer.stop();
schedulerHandleCounter.inc();
schedulerHandleCounterMap.get(schedulerEvent.getType()).inc();
if (schedulerEvent.getType() == SchedulerEventType.APP_REMOVED
&& schedulerEvent instanceof AppRemovedSchedulerEvent) {
SLSRunner.decreaseRemainingApps();
AppRemovedSchedulerEvent appRemoveEvent =
(AppRemovedSchedulerEvent) schedulerEvent;
appQueueMap.remove(appRemoveEvent.getApplicationID());
} else if (schedulerEvent.getType() == SchedulerEventType.APP_ADDED
&& schedulerEvent instanceof AppAddedSchedulerEvent) {
AppAddedSchedulerEvent appAddEvent =
(AppAddedSchedulerEvent) schedulerEvent;
String queueName = appAddEvent.getQueue();
appQueueMap.put(appAddEvent.getApplicationId(), queueName);
}
}
}
private void updateQueueWithNodeUpdate(
NodeUpdateSchedulerEventWrapper eventWrapper) {
RMNodeWrapper node = (RMNodeWrapper) eventWrapper.getRMNode();
List<UpdatedContainerInfo> containerList = node.getContainerUpdates();
for (UpdatedContainerInfo info : containerList) {
for (ContainerStatus status : info.getCompletedContainers()) {
ContainerId containerId = status.getContainerId();
SchedulerAppReport app = scheduler.getSchedulerAppInfo(
containerId.getApplicationAttemptId());
if (app == null) {
// this happens for the AM container
// The app have already removed when the NM sends the release
// information.
continue;
}
String queue =
appQueueMap.get(containerId.getApplicationAttemptId()
.getApplicationId());
int releasedMemory = 0, releasedVCores = 0;
if (status.getExitStatus() == ContainerExitStatus.SUCCESS) {
for (RMContainer rmc : app.getLiveContainers()) {
if (rmc.getContainerId() == containerId) {
releasedMemory += rmc.getContainer().getResource().getMemorySize();
releasedVCores += rmc.getContainer()
.getResource().getVirtualCores();
break;
}
}
} else if (status.getExitStatus() == ContainerExitStatus.ABORTED) {
if (preemptionContainerMap.containsKey(containerId)) {
Resource preResource = preemptionContainerMap.get(containerId);
releasedMemory += preResource.getMemorySize();
releasedVCores += preResource.getVirtualCores();
preemptionContainerMap.remove(containerId);
}
}
// update queue counters
updateQueueMetrics(queue, releasedMemory, releasedVCores);
}
}
}
private void updateQueueWithAllocateRequest(Allocation allocation,
ApplicationAttemptId attemptId,
List<ResourceRequest> resourceRequests,
List<ContainerId> containerIds) throws IOException {
// update queue information
Resource pendingResource = Resources.createResource(0, 0);
Resource allocatedResource = Resources.createResource(0, 0);
String queueName = appQueueMap.get(attemptId.getApplicationId());
// container requested
for (ResourceRequest request : resourceRequests) {
if (request.getResourceName().equals(ResourceRequest.ANY)) {
Resources.addTo(pendingResource,
Resources.multiply(request.getCapability(),
request.getNumContainers()));
}
}
// container allocated
for (Container container : allocation.getContainers()) {
Resources.addTo(allocatedResource, container.getResource());
Resources.subtractFrom(pendingResource, container.getResource());
}
// container released from AM
SchedulerAppReport report = scheduler.getSchedulerAppInfo(attemptId);
for (ContainerId containerId : containerIds) {
Container container = null;
for (RMContainer c : report.getLiveContainers()) {
if (c.getContainerId().equals(containerId)) {
container = c.getContainer();
break;
}
}
if (container != null) {
// released allocated containers
Resources.subtractFrom(allocatedResource, container.getResource());
} else {
for (RMContainer c : report.getReservedContainers()) {
if (c.getContainerId().equals(containerId)) {
container = c.getContainer();
break;
}
}
if (container != null) {
// released reserved containers
Resources.subtractFrom(pendingResource, container.getResource());
}
}
}
// containers released/preemption from scheduler
Set<ContainerId> preemptionContainers = new HashSet<ContainerId>();
if (allocation.getContainerPreemptions() != null) {
preemptionContainers.addAll(allocation.getContainerPreemptions());
}
if (allocation.getStrictContainerPreemptions() != null) {
preemptionContainers.addAll(allocation.getStrictContainerPreemptions());
}
if (! preemptionContainers.isEmpty()) {
for (ContainerId containerId : preemptionContainers) {
if (! preemptionContainerMap.containsKey(containerId)) {
Container container = null;
for (RMContainer c : report.getLiveContainers()) {
if (c.getContainerId().equals(containerId)) {
container = c.getContainer();
break;
}
}
if (container != null) {
preemptionContainerMap.put(containerId, container.getResource());
}
}
}
}
// update metrics
SortedMap<String, Counter> counterMap = metrics.getCounters();
String names[] = new String[]{
"counter.queue." + queueName + ".pending.memory",
"counter.queue." + queueName + ".pending.cores",
"counter.queue." + queueName + ".allocated.memory",
"counter.queue." + queueName + ".allocated.cores"};
long values[] = new long[]{pendingResource.getMemorySize(),
pendingResource.getVirtualCores(),
allocatedResource.getMemorySize(), allocatedResource.getVirtualCores()};
for (int i = names.length - 1; i >= 0; i --) {
if (! counterMap.containsKey(names[i])) {
metrics.counter(names[i]);
counterMap = metrics.getCounters();
}
counterMap.get(names[i]).inc(values[i]);
}
queueLock.lock();
try {
if (! schedulerMetrics.isTracked(queueName)) {
schedulerMetrics.trackQueue(queueName);
}
} finally {
queueLock.unlock();
}
}
private void tearDown() throws IOException {
// close job runtime writer
if (jobRuntimeLogBW != null) {
jobRuntimeLogBW.close();
}
// shut pool
if (pool != null) pool.shutdown();
}
@SuppressWarnings("unchecked")
private void initMetrics() throws Exception {
metrics = new MetricRegistry();
// configuration
metricsOutputDir = conf.get(SLSConfiguration.METRICS_OUTPUT_DIR);
int metricsWebAddressPort = conf.getInt(
SLSConfiguration.METRICS_WEB_ADDRESS_PORT,
SLSConfiguration.METRICS_WEB_ADDRESS_PORT_DEFAULT);
// create SchedulerMetrics for current scheduler
String schedulerMetricsType = conf.get(scheduler.getClass().getName());
Class schedulerMetricsClass = schedulerMetricsType == null?
defaultSchedulerMetricsMap.get(scheduler.getClass()) :
Class.forName(schedulerMetricsType);
schedulerMetrics = (SchedulerMetrics)ReflectionUtils
.newInstance(schedulerMetricsClass, new Configuration());
schedulerMetrics.init(scheduler, metrics);
// register various metrics
registerJvmMetrics();
registerClusterResourceMetrics();
registerContainerAppNumMetrics();
registerSchedulerMetrics();
// .csv output
initMetricsCSVOutput();
// start web app to provide real-time tracking
web = new SLSWebApp(this, metricsWebAddressPort);
web.start();
// a thread to update histogram timer
pool = new ScheduledThreadPoolExecutor(2);
pool.scheduleAtFixedRate(new HistogramsRunnable(), 0, 1000,
TimeUnit.MILLISECONDS);
// a thread to output metrics for real-tiem tracking
pool.scheduleAtFixedRate(new MetricsLogRunnable(), 0, 1000,
TimeUnit.MILLISECONDS);
// application running information
jobRuntimeLogBW =
new BufferedWriter(new OutputStreamWriter(new FileOutputStream(
metricsOutputDir + "/jobruntime.csv"), "UTF-8"));
jobRuntimeLogBW.write("JobID,real_start_time,real_end_time," +
"simulate_start_time,simulate_end_time" + EOL);
jobRuntimeLogBW.flush();
}
private void registerJvmMetrics() {
// add JVM gauges
metrics.register("variable.jvm.free.memory",
new Gauge<Long>() {
@Override
public Long getValue() {
return Runtime.getRuntime().freeMemory();
}
}
);
metrics.register("variable.jvm.max.memory",
new Gauge<Long>() {
@Override
public Long getValue() {
return Runtime.getRuntime().maxMemory();
}
}
);
metrics.register("variable.jvm.total.memory",
new Gauge<Long>() {
@Override
public Long getValue() {
return Runtime.getRuntime().totalMemory();
}
}
);
}
private void registerClusterResourceMetrics() {
metrics.register("variable.cluster.allocated.memory",
new Gauge<Long>() {
@Override
public Long getValue() {
if(scheduler == null || scheduler.getRootQueueMetrics() == null) {
return 0L;
} else {
return scheduler.getRootQueueMetrics().getAllocatedMB();
}
}
}
);
metrics.register("variable.cluster.allocated.vcores",
new Gauge<Integer>() {
@Override
public Integer getValue() {
if(scheduler == null || scheduler.getRootQueueMetrics() == null) {
return 0;
} else {
return scheduler.getRootQueueMetrics().getAllocatedVirtualCores();
}
}
}
);
metrics.register("variable.cluster.available.memory",
new Gauge<Long>() {
@Override
public Long getValue() {
if(scheduler == null || scheduler.getRootQueueMetrics() == null) {
return 0L;
} else {
return scheduler.getRootQueueMetrics().getAvailableMB();
}
}
}
);
metrics.register("variable.cluster.available.vcores",
new Gauge<Integer>() {
@Override
public Integer getValue() {
if(scheduler == null || scheduler.getRootQueueMetrics() == null) {
return 0;
} else {
return scheduler.getRootQueueMetrics().getAvailableVirtualCores();
}
}
}
);
}
private void registerContainerAppNumMetrics() {
metrics.register("variable.running.application",
new Gauge<Integer>() {
@Override
public Integer getValue() {
if (scheduler == null || scheduler.getRootQueueMetrics() == null) {
return 0;
} else {
return scheduler.getRootQueueMetrics().getAppsRunning();
}
}
}
);
metrics.register("variable.running.container",
new Gauge<Integer>() {
@Override
public Integer getValue() {
if(scheduler == null || scheduler.getRootQueueMetrics() == null) {
return 0;
} else {
return scheduler.getRootQueueMetrics().getAllocatedContainers();
}
}
}
);
}
private void registerSchedulerMetrics() {
samplerLock.lock();
try {
// counters for scheduler operations
schedulerAllocateCounter = metrics.counter(
"counter.scheduler.operation.allocate");
schedulerHandleCounter = metrics.counter(
"counter.scheduler.operation.handle");
schedulerHandleCounterMap = new HashMap<SchedulerEventType, Counter>();
for (SchedulerEventType e : SchedulerEventType.values()) {
Counter counter = metrics.counter(
"counter.scheduler.operation.handle." + e);
schedulerHandleCounterMap.put(e, counter);
}
// timers for scheduler operations
int timeWindowSize = conf.getInt(
SLSConfiguration.METRICS_TIMER_WINDOW_SIZE,
SLSConfiguration.METRICS_TIMER_WINDOW_SIZE_DEFAULT);
schedulerAllocateTimer = new Timer(
new SlidingWindowReservoir(timeWindowSize));
schedulerHandleTimer = new Timer(
new SlidingWindowReservoir(timeWindowSize));
schedulerHandleTimerMap = new HashMap<SchedulerEventType, Timer>();
for (SchedulerEventType e : SchedulerEventType.values()) {
Timer timer = new Timer(new SlidingWindowReservoir(timeWindowSize));
schedulerHandleTimerMap.put(e, timer);
}
// histogram for scheduler operations (Samplers)
schedulerHistogramList = new ArrayList<Histogram>();
histogramTimerMap = new HashMap<Histogram, Timer>();
Histogram schedulerAllocateHistogram = new Histogram(
new SlidingWindowReservoir(SAMPLING_SIZE));
metrics.register("sampler.scheduler.operation.allocate.timecost",
schedulerAllocateHistogram);
schedulerHistogramList.add(schedulerAllocateHistogram);
histogramTimerMap.put(schedulerAllocateHistogram, schedulerAllocateTimer);
Histogram schedulerHandleHistogram = new Histogram(
new SlidingWindowReservoir(SAMPLING_SIZE));
metrics.register("sampler.scheduler.operation.handle.timecost",
schedulerHandleHistogram);
schedulerHistogramList.add(schedulerHandleHistogram);
histogramTimerMap.put(schedulerHandleHistogram, schedulerHandleTimer);
for (SchedulerEventType e : SchedulerEventType.values()) {
Histogram histogram = new Histogram(
new SlidingWindowReservoir(SAMPLING_SIZE));
metrics.register(
"sampler.scheduler.operation.handle." + e + ".timecost",
histogram);
schedulerHistogramList.add(histogram);
histogramTimerMap.put(histogram, schedulerHandleTimerMap.get(e));
}
} finally {
samplerLock.unlock();
}
}
private void initMetricsCSVOutput() {
int timeIntervalMS = conf.getInt(
SLSConfiguration.METRICS_RECORD_INTERVAL_MS,
SLSConfiguration.METRICS_RECORD_INTERVAL_MS_DEFAULT);
File dir = new File(metricsOutputDir + "/metrics");
if(! dir.exists()
&& ! dir.mkdirs()) {
LOG.error("Cannot create directory " + dir.getAbsoluteFile());
}
final CsvReporter reporter = CsvReporter.forRegistry(metrics)
.formatFor(Locale.US)
.convertRatesTo(TimeUnit.SECONDS)
.convertDurationsTo(TimeUnit.MILLISECONDS)
.build(new File(metricsOutputDir + "/metrics"));
reporter.start(timeIntervalMS, TimeUnit.MILLISECONDS);
}
class HistogramsRunnable implements Runnable {
@Override
public void run() {
samplerLock.lock();
try {
for (Histogram histogram : schedulerHistogramList) {
Timer timer = histogramTimerMap.get(histogram);
histogram.update((int) timer.getSnapshot().getMean());
}
} finally {
samplerLock.unlock();
}
}
}
class MetricsLogRunnable implements Runnable {
private boolean firstLine = true;
public MetricsLogRunnable() {
try {
metricsLogBW =
new BufferedWriter(new OutputStreamWriter(new FileOutputStream(
metricsOutputDir + "/realtimetrack.json"), "UTF-8"));
metricsLogBW.write("[");
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
if(running) {
// all WebApp to get real tracking json
String metrics = web.generateRealTimeTrackingMetrics();
// output
try {
if(firstLine) {
metricsLogBW.write(metrics + EOL);
firstLine = false;
} else {
metricsLogBW.write("," + metrics + EOL);
}
metricsLogBW.flush();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
// the following functions are used by AMSimulator
public void addAMRuntime(ApplicationId appId,
long traceStartTimeMS, long traceEndTimeMS,
long simulateStartTimeMS, long simulateEndTimeMS) {
if (metricsON) {
try {
// write job runtime information
StringBuilder sb = new StringBuilder();
sb.append(appId).append(",").append(traceStartTimeMS).append(",")
.append(traceEndTimeMS).append(",").append(simulateStartTimeMS)
.append(",").append(simulateEndTimeMS);
jobRuntimeLogBW.write(sb.toString() + EOL);
jobRuntimeLogBW.flush();
} catch (IOException e) {
e.printStackTrace();
}
}
}
private void updateQueueMetrics(String queue,
long releasedMemory, int releasedVCores) {
// update queue counters
SortedMap<String, Counter> counterMap = metrics.getCounters();
if (releasedMemory != 0) {
String name = "counter.queue." + queue + ".allocated.memory";
if (! counterMap.containsKey(name)) {
metrics.counter(name);
counterMap = metrics.getCounters();
}
counterMap.get(name).inc(-releasedMemory);
}
if (releasedVCores != 0) {
String name = "counter.queue." + queue + ".allocated.cores";
if (! counterMap.containsKey(name)) {
metrics.counter(name);
counterMap = metrics.getCounters();
}
counterMap.get(name).inc(-releasedVCores);
}
}
public void setQueueSet(Set<String> queues) {
this.queueSet = queues;
}
public Set<String> getQueueSet() {
return this.queueSet;
}
public void setTrackedAppSet(Set<String> apps) {
this.trackedAppSet = apps;
}
public Set<String> getTrackedAppSet() {
return this.trackedAppSet;
}
public MetricRegistry getMetrics() {
return metrics;
}
public SchedulerMetrics getSchedulerMetrics() {
return schedulerMetrics;
}
// API open to out classes
public void addTrackedApp(ApplicationId appId, String oldAppId) {
if (metricsON) {
schedulerMetrics.trackApp(appId, oldAppId);
}
}
public void removeTrackedApp(String oldAppId) {
if (metricsON) {
schedulerMetrics.untrackApp(oldAppId);
}
}
@Override
public Configuration getConf() {
return conf;
}
@SuppressWarnings("unchecked")
@Override
public void serviceInit(Configuration conf) throws Exception {
((AbstractYarnScheduler<SchedulerApplicationAttempt, SchedulerNode>)
scheduler).init(conf);
super.serviceInit(conf);
initScheduler(conf);
}
private synchronized void initScheduler(Configuration configuration) throws
IOException {
this.applications =
new ConcurrentHashMap<ApplicationId,
SchedulerApplication<SchedulerApplicationAttempt>>();
}
@SuppressWarnings("unchecked")
@Override
public void serviceStart() throws Exception {
((AbstractYarnScheduler<SchedulerApplicationAttempt, SchedulerNode>)
scheduler).start();
super.serviceStart();
}
@SuppressWarnings("unchecked")
@Override
public void serviceStop() throws Exception {
((AbstractYarnScheduler<SchedulerApplicationAttempt, SchedulerNode>)
scheduler).stop();
super.serviceStop();
}
@Override
public void setRMContext(RMContext rmContext) {
scheduler.setRMContext(rmContext);
}
@Override
public void reinitialize(Configuration conf, RMContext rmContext)
throws IOException {
scheduler.reinitialize(conf, rmContext);
}
@Override
public void recover(RMStateStore.RMState rmState) throws Exception {
scheduler.recover(rmState);
}
@Override
public QueueInfo getQueueInfo(String s, boolean b, boolean b2)
throws IOException {
return scheduler.getQueueInfo(s, b, b2);
}
@Override
public List<QueueUserACLInfo> getQueueUserAclInfo() {
return scheduler.getQueueUserAclInfo();
}
@Override
public Resource getMinimumResourceCapability() {
return scheduler.getMinimumResourceCapability();
}
@Override
public Resource getMaximumResourceCapability() {
return scheduler.getMaximumResourceCapability();
}
@Override
public ResourceCalculator getResourceCalculator() {
return scheduler.getResourceCalculator();
}
@Override
public int getNumClusterNodes() {
return scheduler.getNumClusterNodes();
}
@Override
public SchedulerNodeReport getNodeReport(NodeId nodeId) {
return scheduler.getNodeReport(nodeId);
}
@Override
public SchedulerAppReport getSchedulerAppInfo(
ApplicationAttemptId attemptId) {
return scheduler.getSchedulerAppInfo(attemptId);
}
@Override
public QueueMetrics getRootQueueMetrics() {
return scheduler.getRootQueueMetrics();
}
@Override
public synchronized boolean checkAccess(UserGroupInformation callerUGI,
QueueACL acl, String queueName) {
return scheduler.checkAccess(callerUGI, acl, queueName);
}
@Override
public ApplicationResourceUsageReport getAppResourceUsageReport(
ApplicationAttemptId appAttemptId) {
return scheduler.getAppResourceUsageReport(appAttemptId);
}
@Override
public List<ApplicationAttemptId> getAppsInQueue(String queue) {
return scheduler.getAppsInQueue(queue);
}
@Override
public RMContainer getRMContainer(ContainerId containerId) {
return null;
}
@Override
public String moveApplication(ApplicationId appId, String newQueue)
throws YarnException {
return scheduler.moveApplication(appId, newQueue);
}
@Override
@LimitedPrivate("yarn")
@Unstable
public Resource getClusterResource() {
return super.getClusterResource();
}
@Override
public synchronized List<Container> getTransferredContainers(
ApplicationAttemptId currentAttempt) {
return new ArrayList<Container>();
}
@Override
public Map<ApplicationId, SchedulerApplication<SchedulerApplicationAttempt>>
getSchedulerApplications() {
return new HashMap<ApplicationId,
SchedulerApplication<SchedulerApplicationAttempt>>();
}
@Override
protected void completedContainerInternal(RMContainer rmContainer,
ContainerStatus containerStatus, RMContainerEventType event) {
// do nothing
}
@Override
public Priority checkAndGetApplicationPriority(Priority priority,
UserGroupInformation user, String queueName, ApplicationId applicationId)
throws YarnException {
// TODO Dummy implementation.
return Priority.newInstance(0);
}
}

View File

@ -17,34 +17,19 @@
*/
package org.apache.hadoop.yarn.sls.scheduler;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
@ -65,117 +50,63 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptR
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
import org.apache.hadoop.yarn.sls.SLSRunner;
import org.apache.hadoop.yarn.sls.conf.SLSConfiguration;
import org.apache.hadoop.yarn.sls.web.SLSWebApp;
import org.apache.hadoop.yarn.sls.utils.SLSUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.log4j.Logger;
import com.codahale.metrics.Counter;
import com.codahale.metrics.CsvReporter;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.SlidingWindowReservoir;
import com.codahale.metrics.Timer;
@Private
@Unstable
public class SLSCapacityScheduler extends CapacityScheduler implements
SchedulerWrapper,Configurable {
private static final String EOL = System.getProperty("line.separator");
private static final String QUEUE_COUNTER_PREFIX = "counter.queue.";
private static final int SAMPLING_SIZE = 60;
private ScheduledExecutorService pool;
// counters for scheduler allocate/handle operations
private Counter schedulerAllocateCounter;
private Counter schedulerHandleCounter;
private Map<SchedulerEventType, Counter> schedulerHandleCounterMap;
// Timers for scheduler allocate/handle operations
private Timer schedulerAllocateTimer;
private Timer schedulerHandleTimer;
private Map<SchedulerEventType, Timer> schedulerHandleTimerMap;
private List<Histogram> schedulerHistogramList;
private Map<Histogram, Timer> histogramTimerMap;
private Lock samplerLock;
private Lock queueLock;
private Configuration conf;
private Map<ApplicationAttemptId, String> appQueueMap =
new ConcurrentHashMap<ApplicationAttemptId, String>();
private BufferedWriter jobRuntimeLogBW;
// Priority of the ResourceSchedulerWrapper shutdown hook.
public static final int SHUTDOWN_HOOK_PRIORITY = 30;
// web app
private SLSWebApp web;
private Map<ContainerId, Resource> preemptionContainerMap =
new ConcurrentHashMap<ContainerId, Resource>();
// metrics
private MetricRegistry metrics;
private SchedulerMetrics schedulerMetrics;
private boolean metricsON;
private String metricsOutputDir;
private BufferedWriter metricsLogBW;
private boolean running = false;
private static Map<Class, Class> defaultSchedulerMetricsMap =
new HashMap<Class, Class>();
static {
defaultSchedulerMetricsMap.put(FairScheduler.class,
FairSchedulerMetrics.class);
defaultSchedulerMetricsMap.put(FifoScheduler.class,
FifoSchedulerMetrics.class);
defaultSchedulerMetricsMap.put(CapacityScheduler.class,
CapacitySchedulerMetrics.class);
}
// must set by outside
private Set<String> queueSet;
private Set<String> trackedAppSet;
private Tracker tracker;
public final Logger LOG = Logger.getLogger(SLSCapacityScheduler.class);
public Tracker getTracker() {
return tracker;
}
public SLSCapacityScheduler() {
samplerLock = new ReentrantLock();
queueLock = new ReentrantLock();
tracker = new Tracker();
}
@Override
public void setConf(Configuration conf) {
this.conf = conf;
super.setConf(conf);
// start metrics
metricsON = conf.getBoolean(SLSConfiguration.METRICS_SWITCH, true);
if (metricsON) {
try {
initMetrics();
schedulerMetrics = SchedulerMetrics.getInstance(conf,
CapacityScheduler.class);
schedulerMetrics.init(this, conf);
} catch (Exception e) {
e.printStackTrace();
}
}
ShutdownHookManager.get().addShutdownHook(new Runnable() {
@Override
public void run() {
try {
if (metricsLogBW != null) {
metricsLogBW.write("]");
metricsLogBW.close();
ShutdownHookManager.get().addShutdownHook(new Runnable() {
@Override public void run() {
try {
schedulerMetrics.tearDown();
} catch (Exception e) {
e.printStackTrace();
}
if (web != null) {
web.stop();
}
tearDown();
} catch (Exception e) {
e.printStackTrace();
}
}
}, SHUTDOWN_HOOK_PRIORITY);
}, SLSUtils.SHUTDOWN_HOOK_PRIORITY);
}
}
@Override
@ -184,7 +115,8 @@ public class SLSCapacityScheduler extends CapacityScheduler implements
List<String> strings, List<String> strings2,
ContainerUpdates updateRequests) {
if (metricsON) {
final Timer.Context context = schedulerAllocateTimer.time();
final Timer.Context context = schedulerMetrics.getSchedulerAllocateTimer()
.time();
Allocation allocation = null;
try {
allocation = super
@ -193,7 +125,7 @@ public class SLSCapacityScheduler extends CapacityScheduler implements
return allocation;
} finally {
context.stop();
schedulerAllocateCounter.inc();
schedulerMetrics.increaseSchedulerAllocationCounter();
try {
updateQueueWithAllocateRequest(allocation, attemptId,
resourceRequests, containerIds);
@ -209,74 +141,76 @@ public class SLSCapacityScheduler extends CapacityScheduler implements
@Override
public void handle(SchedulerEvent schedulerEvent) {
// metrics off
if (! metricsON) {
super.handle(schedulerEvent);
return;
}
if(!running) running = true;
if (!metricsON) {
super.handle(schedulerEvent);
return;
}
// metrics on
Timer.Context handlerTimer = null;
Timer.Context operationTimer = null;
if (!schedulerMetrics.isRunning()) {
schedulerMetrics.setRunning(true);
}
NodeUpdateSchedulerEventWrapper eventWrapper;
try {
//if (schedulerEvent instanceof NodeUpdateSchedulerEvent) {
if (schedulerEvent.getType() == SchedulerEventType.NODE_UPDATE
&& schedulerEvent instanceof NodeUpdateSchedulerEvent) {
eventWrapper = new NodeUpdateSchedulerEventWrapper(
(NodeUpdateSchedulerEvent)schedulerEvent);
schedulerEvent = eventWrapper;
updateQueueWithNodeUpdate(eventWrapper);
} else if (schedulerEvent.getType() == SchedulerEventType.APP_ATTEMPT_REMOVED
&& schedulerEvent instanceof AppAttemptRemovedSchedulerEvent) {
// check if having AM Container, update resource usage information
AppAttemptRemovedSchedulerEvent appRemoveEvent =
(AppAttemptRemovedSchedulerEvent) schedulerEvent;
ApplicationAttemptId appAttemptId =
appRemoveEvent.getApplicationAttemptID();
String queue = appQueueMap.get(appAttemptId);
SchedulerAppReport app = super.getSchedulerAppInfo(appAttemptId);
if (! app.getLiveContainers().isEmpty()) { // have 0 or 1
// should have one container which is AM container
RMContainer rmc = app.getLiveContainers().iterator().next();
updateQueueMetrics(queue,
rmc.getContainer().getResource().getMemorySize(),
rmc.getContainer().getResource().getVirtualCores());
}
}
Timer.Context handlerTimer = null;
Timer.Context operationTimer = null;
handlerTimer = schedulerHandleTimer.time();
operationTimer = schedulerHandleTimerMap
.get(schedulerEvent.getType()).time();
NodeUpdateSchedulerEventWrapper eventWrapper;
try {
if (schedulerEvent.getType() == SchedulerEventType.NODE_UPDATE
&& schedulerEvent instanceof NodeUpdateSchedulerEvent) {
eventWrapper = new NodeUpdateSchedulerEventWrapper(
(NodeUpdateSchedulerEvent)schedulerEvent);
schedulerEvent = eventWrapper;
updateQueueWithNodeUpdate(eventWrapper);
} else if (schedulerEvent.getType() ==
SchedulerEventType.APP_ATTEMPT_REMOVED
&& schedulerEvent instanceof AppAttemptRemovedSchedulerEvent) {
// check if having AM Container, update resource usage information
AppAttemptRemovedSchedulerEvent appRemoveEvent =
(AppAttemptRemovedSchedulerEvent) schedulerEvent;
ApplicationAttemptId appAttemptId =
appRemoveEvent.getApplicationAttemptID();
String queue = appQueueMap.get(appAttemptId);
SchedulerAppReport app = super.getSchedulerAppInfo(appAttemptId);
if (!app.getLiveContainers().isEmpty()) { // have 0 or 1
// should have one container which is AM container
RMContainer rmc = app.getLiveContainers().iterator().next();
schedulerMetrics.updateQueueMetricsByRelease(
rmc.getContainer().getResource(), queue);
}
}
super.handle(schedulerEvent);
} finally {
if (handlerTimer != null) handlerTimer.stop();
if (operationTimer != null) operationTimer.stop();
schedulerHandleCounter.inc();
schedulerHandleCounterMap.get(schedulerEvent.getType()).inc();
handlerTimer = schedulerMetrics.getSchedulerHandleTimer().time();
operationTimer = schedulerMetrics.getSchedulerHandleTimer(
schedulerEvent.getType()).time();
if (schedulerEvent.getType() == SchedulerEventType.APP_ATTEMPT_REMOVED
&& schedulerEvent instanceof AppAttemptRemovedSchedulerEvent) {
SLSRunner.decreaseRemainingApps();
AppAttemptRemovedSchedulerEvent appRemoveEvent =
(AppAttemptRemovedSchedulerEvent) schedulerEvent;
ApplicationAttemptId appAttemptId =
appRemoveEvent.getApplicationAttemptID();
appQueueMap.remove(appRemoveEvent.getApplicationAttemptID());
} else if (schedulerEvent.getType() == SchedulerEventType.APP_ATTEMPT_ADDED
&& schedulerEvent instanceof AppAttemptAddedSchedulerEvent) {
AppAttemptAddedSchedulerEvent appAddEvent =
(AppAttemptAddedSchedulerEvent) schedulerEvent;
SchedulerApplication app =
applications.get(appAddEvent.getApplicationAttemptId()
super.handle(schedulerEvent);
} finally {
if (handlerTimer != null) {
handlerTimer.stop();
}
if (operationTimer != null) {
operationTimer.stop();
}
schedulerMetrics.increaseSchedulerHandleCounter(schedulerEvent.getType());
if (schedulerEvent.getType() == SchedulerEventType.APP_ATTEMPT_REMOVED
&& schedulerEvent instanceof AppAttemptRemovedSchedulerEvent) {
SLSRunner.decreaseRemainingApps();
AppAttemptRemovedSchedulerEvent appRemoveEvent =
(AppAttemptRemovedSchedulerEvent) schedulerEvent;
appQueueMap.remove(appRemoveEvent.getApplicationAttemptID());
} else if (schedulerEvent.getType() ==
SchedulerEventType.APP_ATTEMPT_ADDED
&& schedulerEvent instanceof AppAttemptAddedSchedulerEvent) {
AppAttemptAddedSchedulerEvent appAddEvent =
(AppAttemptAddedSchedulerEvent) schedulerEvent;
SchedulerApplication app =
applications.get(appAddEvent.getApplicationAttemptId()
.getApplicationId());
appQueueMap.put(appAddEvent.getApplicationAttemptId(), app.getQueue()
.getQueueName());
}
}
appQueueMap.put(appAddEvent.getApplicationAttemptId(), app.getQueue()
.getQueueName());
}
}
}
private void updateQueueWithNodeUpdate(
@ -316,7 +250,8 @@ public class SLSCapacityScheduler extends CapacityScheduler implements
}
}
// update queue counters
updateQueueMetrics(queue, releasedMemory, releasedVCores);
schedulerMetrics.updateQueueMetricsByRelease(
Resource.newInstance(releasedMemory, releasedVCores), queue);
}
}
}
@ -395,410 +330,13 @@ public class SLSCapacityScheduler extends CapacityScheduler implements
}
// update metrics
SortedMap<String, Counter> counterMap = metrics.getCounters();
String names[] = new String[]{
"counter.queue." + queueName + ".pending.memory",
"counter.queue." + queueName + ".pending.cores",
"counter.queue." + queueName + ".allocated.memory",
"counter.queue." + queueName + ".allocated.cores"};
long values[] = new long[]{pendingResource.getMemorySize(),
pendingResource.getVirtualCores(),
allocatedResource.getMemorySize(), allocatedResource.getVirtualCores()};
for (int i = names.length - 1; i >= 0; i --) {
if (! counterMap.containsKey(names[i])) {
metrics.counter(names[i]);
counterMap = metrics.getCounters();
}
counterMap.get(names[i]).inc(values[i]);
}
queueLock.lock();
try {
if (! schedulerMetrics.isTracked(queueName)) {
schedulerMetrics.trackQueue(queueName);
}
} finally {
queueLock.unlock();
}
}
private void tearDown() throws IOException {
// close job runtime writer
if (jobRuntimeLogBW != null) {
jobRuntimeLogBW.close();
}
// shut pool
if (pool != null) pool.shutdown();
}
@SuppressWarnings({ "unchecked", "rawtypes" })
private void initMetrics() throws Exception {
metrics = new MetricRegistry();
// configuration
metricsOutputDir = conf.get(SLSConfiguration.METRICS_OUTPUT_DIR);
int metricsWebAddressPort = conf.getInt(
SLSConfiguration.METRICS_WEB_ADDRESS_PORT,
SLSConfiguration.METRICS_WEB_ADDRESS_PORT_DEFAULT);
// create SchedulerMetrics for current scheduler
String schedulerMetricsType = conf.get(CapacityScheduler.class.getName());
Class schedulerMetricsClass = schedulerMetricsType == null?
defaultSchedulerMetricsMap.get(CapacityScheduler.class) :
Class.forName(schedulerMetricsType);
schedulerMetrics = (SchedulerMetrics)ReflectionUtils
.newInstance(schedulerMetricsClass, new Configuration());
schedulerMetrics.init(this, metrics);
// register various metrics
registerJvmMetrics();
registerClusterResourceMetrics();
registerContainerAppNumMetrics();
registerSchedulerMetrics();
// .csv output
initMetricsCSVOutput();
// start web app to provide real-time tracking
web = new SLSWebApp(this, metricsWebAddressPort);
web.start();
// a thread to update histogram timer
pool = new ScheduledThreadPoolExecutor(2);
pool.scheduleAtFixedRate(new HistogramsRunnable(), 0, 1000,
TimeUnit.MILLISECONDS);
// a thread to output metrics for real-tiem tracking
pool.scheduleAtFixedRate(new MetricsLogRunnable(), 0, 1000,
TimeUnit.MILLISECONDS);
// application running information
jobRuntimeLogBW =
new BufferedWriter(new OutputStreamWriter(new FileOutputStream(
metricsOutputDir + "/jobruntime.csv"), "UTF-8"));
jobRuntimeLogBW.write("JobID,real_start_time,real_end_time," +
"simulate_start_time,simulate_end_time" + EOL);
jobRuntimeLogBW.flush();
}
private void registerJvmMetrics() {
// add JVM gauges
metrics.register("variable.jvm.free.memory",
new Gauge<Long>() {
@Override
public Long getValue() {
return Runtime.getRuntime().freeMemory();
}
}
);
metrics.register("variable.jvm.max.memory",
new Gauge<Long>() {
@Override
public Long getValue() {
return Runtime.getRuntime().maxMemory();
}
}
);
metrics.register("variable.jvm.total.memory",
new Gauge<Long>() {
@Override
public Long getValue() {
return Runtime.getRuntime().totalMemory();
}
}
);
}
private void registerClusterResourceMetrics() {
metrics.register("variable.cluster.allocated.memory",
new Gauge<Long>() {
@Override
public Long getValue() {
if( getRootQueueMetrics() == null) {
return 0L;
} else {
return getRootQueueMetrics().getAllocatedMB();
}
}
}
);
metrics.register("variable.cluster.allocated.vcores",
new Gauge<Integer>() {
@Override
public Integer getValue() {
if(getRootQueueMetrics() == null) {
return 0;
} else {
return getRootQueueMetrics().getAllocatedVirtualCores();
}
}
}
);
metrics.register("variable.cluster.available.memory",
new Gauge<Long>() {
@Override
public Long getValue() {
if(getRootQueueMetrics() == null) {
return 0L;
} else {
return getRootQueueMetrics().getAvailableMB();
}
}
}
);
metrics.register("variable.cluster.available.vcores",
new Gauge<Integer>() {
@Override
public Integer getValue() {
if(getRootQueueMetrics() == null) {
return 0;
} else {
return getRootQueueMetrics().getAvailableVirtualCores();
}
}
}
);
metrics.register("variable.cluster.reserved.memory",
new Gauge<Long>() {
@Override
public Long getValue() {
if(getRootQueueMetrics() == null) {
return 0L;
} else {
return getRootQueueMetrics().getReservedMB();
}
}
}
);
metrics.register("variable.cluster.reserved.vcores",
new Gauge<Integer>() {
@Override
public Integer getValue() {
if(getRootQueueMetrics() == null) {
return 0;
} else {
return getRootQueueMetrics().getReservedVirtualCores();
}
}
}
);
}
private void registerContainerAppNumMetrics() {
metrics.register("variable.running.application",
new Gauge<Integer>() {
@Override
public Integer getValue() {
if(getRootQueueMetrics() == null) {
return 0;
} else {
return getRootQueueMetrics().getAppsRunning();
}
}
}
);
metrics.register("variable.running.container",
new Gauge<Integer>() {
@Override
public Integer getValue() {
if(getRootQueueMetrics() == null) {
return 0;
} else {
return getRootQueueMetrics().getAllocatedContainers();
}
}
}
);
}
private void registerSchedulerMetrics() {
samplerLock.lock();
try {
// counters for scheduler operations
schedulerAllocateCounter = metrics.counter(
"counter.scheduler.operation.allocate");
schedulerHandleCounter = metrics.counter(
"counter.scheduler.operation.handle");
schedulerHandleCounterMap = new HashMap<SchedulerEventType, Counter>();
for (SchedulerEventType e : SchedulerEventType.values()) {
Counter counter = metrics.counter(
"counter.scheduler.operation.handle." + e);
schedulerHandleCounterMap.put(e, counter);
}
// timers for scheduler operations
int timeWindowSize = conf.getInt(
SLSConfiguration.METRICS_TIMER_WINDOW_SIZE,
SLSConfiguration.METRICS_TIMER_WINDOW_SIZE_DEFAULT);
schedulerAllocateTimer = new Timer(
new SlidingWindowReservoir(timeWindowSize));
schedulerHandleTimer = new Timer(
new SlidingWindowReservoir(timeWindowSize));
schedulerHandleTimerMap = new HashMap<SchedulerEventType, Timer>();
for (SchedulerEventType e : SchedulerEventType.values()) {
Timer timer = new Timer(new SlidingWindowReservoir(timeWindowSize));
schedulerHandleTimerMap.put(e, timer);
}
// histogram for scheduler operations (Samplers)
schedulerHistogramList = new ArrayList<Histogram>();
histogramTimerMap = new HashMap<Histogram, Timer>();
Histogram schedulerAllocateHistogram = new Histogram(
new SlidingWindowReservoir(SAMPLING_SIZE));
metrics.register("sampler.scheduler.operation.allocate.timecost",
schedulerAllocateHistogram);
schedulerHistogramList.add(schedulerAllocateHistogram);
histogramTimerMap.put(schedulerAllocateHistogram, schedulerAllocateTimer);
Histogram schedulerHandleHistogram = new Histogram(
new SlidingWindowReservoir(SAMPLING_SIZE));
metrics.register("sampler.scheduler.operation.handle.timecost",
schedulerHandleHistogram);
schedulerHistogramList.add(schedulerHandleHistogram);
histogramTimerMap.put(schedulerHandleHistogram, schedulerHandleTimer);
for (SchedulerEventType e : SchedulerEventType.values()) {
Histogram histogram = new Histogram(
new SlidingWindowReservoir(SAMPLING_SIZE));
metrics.register(
"sampler.scheduler.operation.handle." + e + ".timecost",
histogram);
schedulerHistogramList.add(histogram);
histogramTimerMap.put(histogram, schedulerHandleTimerMap.get(e));
}
} finally {
samplerLock.unlock();
}
}
private void initMetricsCSVOutput() {
int timeIntervalMS = conf.getInt(
SLSConfiguration.METRICS_RECORD_INTERVAL_MS,
SLSConfiguration.METRICS_RECORD_INTERVAL_MS_DEFAULT);
File dir = new File(metricsOutputDir + "/metrics");
if(! dir.exists()
&& ! dir.mkdirs()) {
LOG.error("Cannot create directory " + dir.getAbsoluteFile());
}
final CsvReporter reporter = CsvReporter.forRegistry(metrics)
.formatFor(Locale.US)
.convertRatesTo(TimeUnit.SECONDS)
.convertDurationsTo(TimeUnit.MILLISECONDS)
.build(new File(metricsOutputDir + "/metrics"));
reporter.start(timeIntervalMS, TimeUnit.MILLISECONDS);
}
class HistogramsRunnable implements Runnable {
@Override
public void run() {
samplerLock.lock();
try {
for (Histogram histogram : schedulerHistogramList) {
Timer timer = histogramTimerMap.get(histogram);
histogram.update((int) timer.getSnapshot().getMean());
}
} finally {
samplerLock.unlock();
}
}
}
class MetricsLogRunnable implements Runnable {
private boolean firstLine = true;
public MetricsLogRunnable() {
try {
metricsLogBW =
new BufferedWriter(new OutputStreamWriter(new FileOutputStream(
metricsOutputDir + "/realtimetrack.json"), "UTF-8"));
metricsLogBW.write("[");
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
if(running) {
// all WebApp to get real tracking json
String metrics = web.generateRealTimeTrackingMetrics();
// output
try {
if(firstLine) {
metricsLogBW.write(metrics + EOL);
firstLine = false;
} else {
metricsLogBW.write("," + metrics + EOL);
}
metricsLogBW.flush();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
// the following functions are used by AMSimulator
public void addAMRuntime(ApplicationId appId,
long traceStartTimeMS, long traceEndTimeMS,
long simulateStartTimeMS, long simulateEndTimeMS) {
if (metricsON) {
try {
// write job runtime information
StringBuilder sb = new StringBuilder();
sb.append(appId).append(",").append(traceStartTimeMS).append(",")
.append(traceEndTimeMS).append(",").append(simulateStartTimeMS)
.append(",").append(simulateEndTimeMS);
jobRuntimeLogBW.write(sb.toString() + EOL);
jobRuntimeLogBW.flush();
} catch (IOException e) {
e.printStackTrace();
}
}
}
private void updateQueueMetrics(String queue,
long releasedMemory, int releasedVCores) {
// update queue counters
SortedMap<String, Counter> counterMap = metrics.getCounters();
if (releasedMemory != 0) {
String name = "counter.queue." + queue + ".allocated.memory";
if (! counterMap.containsKey(name)) {
metrics.counter(name);
counterMap = metrics.getCounters();
}
counterMap.get(name).inc(-releasedMemory);
}
if (releasedVCores != 0) {
String name = "counter.queue." + queue + ".allocated.cores";
if (! counterMap.containsKey(name)) {
metrics.counter(name);
counterMap = metrics.getCounters();
}
counterMap.get(name).inc(-releasedVCores);
}
schedulerMetrics.updateQueueMetrics(pendingResource, allocatedResource,
queueName);
}
private void initQueueMetrics(CSQueue queue) {
if (queue instanceof LeafQueue) {
SortedMap<String, Counter> counterMap = metrics.getCounters();
String queueName = queue.getQueueName();
String[] names = new String[]{
QUEUE_COUNTER_PREFIX + queueName + ".pending.memory",
QUEUE_COUNTER_PREFIX + queueName + ".pending.cores",
QUEUE_COUNTER_PREFIX + queueName + ".allocated.memory",
QUEUE_COUNTER_PREFIX + queueName + ".allocated.cores" };
for (int i = names.length - 1; i >= 0; i--) {
if (!counterMap.containsKey(names[i])) {
metrics.counter(names[i]);
counterMap = metrics.getCounters();
}
}
queueLock.lock();
try {
if (!schedulerMetrics.isTracked(queueName)) {
schedulerMetrics.trackQueue(queueName);
}
} finally {
queueLock.unlock();
}
schedulerMetrics.initQueueMetric(queue.getQueueName());
return;
}
@ -811,54 +349,17 @@ public class SLSCapacityScheduler extends CapacityScheduler implements
public void serviceInit(Configuration configuration) throws Exception {
super.serviceInit(configuration);
initQueueMetrics(getRootQueue());
}
public void setQueueSet(Set<String> queues) {
this.queueSet = queues;
}
public Set<String> getQueueSet() {
return this.queueSet;
}
public void setTrackedAppSet(Set<String> apps) {
this.trackedAppSet = apps;
}
public Set<String> getTrackedAppSet() {
return this.trackedAppSet;
}
public MetricRegistry getMetrics() {
return metrics;
if (metricsON) {
initQueueMetrics(getRootQueue());
}
}
public SchedulerMetrics getSchedulerMetrics() {
return schedulerMetrics;
}
// API open to out classes
public void addTrackedApp(ApplicationId appId,
String oldAppId) {
if (metricsON) {
schedulerMetrics.trackApp(appId, oldAppId);
}
}
public void removeTrackedApp(String oldAppId) {
if (metricsON) {
schedulerMetrics.untrackApp(oldAppId);
}
}
@Override
public Configuration getConf() {
return conf;
}
}
}

View File

@ -0,0 +1,339 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.sls.scheduler;
import com.codahale.metrics.Timer;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSLeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.sls.SLSRunner;
import org.apache.hadoop.yarn.sls.conf.SLSConfiguration;
import org.apache.hadoop.yarn.sls.utils.SLSUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@Private
@Unstable
public class SLSFairScheduler extends FairScheduler
implements SchedulerWrapper, Configurable {
private SchedulerMetrics schedulerMetrics;
private boolean metricsON;
private Tracker tracker;
private Map<ContainerId, Resource> preemptionContainerMap =
new ConcurrentHashMap<>();
public SchedulerMetrics getSchedulerMetrics() {
return schedulerMetrics;
}
public Tracker getTracker() {
return tracker;
}
public SLSFairScheduler() {
tracker = new Tracker();
}
@Override
public void setConf(Configuration conf) {
super.setConfig(conf);
metricsON = conf.getBoolean(SLSConfiguration.METRICS_SWITCH, true);
if (metricsON) {
try {
schedulerMetrics = SchedulerMetrics.getInstance(conf,
FairScheduler.class);
schedulerMetrics.init(this, conf);
} catch (Exception e) {
e.printStackTrace();
}
ShutdownHookManager.get().addShutdownHook(new Runnable() {
@Override public void run() {
try {
schedulerMetrics.tearDown();
} catch (Exception e) {
e.printStackTrace();
}
}
}, SLSUtils.SHUTDOWN_HOOK_PRIORITY);
}
}
@Override
public Allocation allocate(ApplicationAttemptId attemptId,
List<ResourceRequest> resourceRequests, List<ContainerId> containerIds,
List<String> blacklistAdditions, List<String> blacklistRemovals,
ContainerUpdates updateRequests) {
if (metricsON) {
final Timer.Context context = schedulerMetrics.getSchedulerAllocateTimer()
.time();
Allocation allocation = null;
try {
allocation = super.allocate(attemptId, resourceRequests, containerIds,
blacklistAdditions, blacklistRemovals, updateRequests);
return allocation;
} finally {
context.stop();
schedulerMetrics.increaseSchedulerAllocationCounter();
try {
updateQueueWithAllocateRequest(allocation, attemptId,
resourceRequests, containerIds);
} catch (IOException e) {
e.printStackTrace();
}
}
} else {
return super.allocate(attemptId, resourceRequests, containerIds,
blacklistAdditions, blacklistRemovals, updateRequests);
}
}
@Override
public void handle(SchedulerEvent schedulerEvent) {
// metrics off
if (!metricsON) {
super.handle(schedulerEvent);
return;
}
// metrics on
if(!schedulerMetrics.isRunning()) {
schedulerMetrics.setRunning(true);
}
Timer.Context handlerTimer = null;
Timer.Context operationTimer = null;
NodeUpdateSchedulerEventWrapper eventWrapper;
try {
if (schedulerEvent.getType() == SchedulerEventType.NODE_UPDATE
&& schedulerEvent instanceof NodeUpdateSchedulerEvent) {
eventWrapper = new NodeUpdateSchedulerEventWrapper(
(NodeUpdateSchedulerEvent)schedulerEvent);
schedulerEvent = eventWrapper;
updateQueueWithNodeUpdate(eventWrapper);
} else if (
schedulerEvent.getType() == SchedulerEventType.APP_ATTEMPT_REMOVED
&& schedulerEvent instanceof AppAttemptRemovedSchedulerEvent) {
// check if having AM Container, update resource usage information
AppAttemptRemovedSchedulerEvent appRemoveEvent =
(AppAttemptRemovedSchedulerEvent) schedulerEvent;
ApplicationAttemptId appAttemptId =
appRemoveEvent.getApplicationAttemptID();
String queueName = getSchedulerApp(appAttemptId).getQueue().getName();
SchedulerAppReport app = getSchedulerAppInfo(appAttemptId);
if (!app.getLiveContainers().isEmpty()) { // have 0 or 1
// should have one container which is AM container
RMContainer rmc = app.getLiveContainers().iterator().next();
schedulerMetrics.updateQueueMetricsByRelease(
rmc.getContainer().getResource(), queueName);
}
}
handlerTimer = schedulerMetrics.getSchedulerHandleTimer().time();
operationTimer = schedulerMetrics.getSchedulerHandleTimer(
schedulerEvent.getType()).time();
super.handle(schedulerEvent);
} finally {
if (handlerTimer != null) {
handlerTimer.stop();
}
if (operationTimer != null) {
operationTimer.stop();
}
schedulerMetrics.increaseSchedulerHandleCounter(schedulerEvent.getType());
if (schedulerEvent.getType() == SchedulerEventType.APP_ATTEMPT_REMOVED
&& schedulerEvent instanceof AppAttemptRemovedSchedulerEvent) {
SLSRunner.decreaseRemainingApps();
}
}
}
private void updateQueueWithNodeUpdate(
NodeUpdateSchedulerEventWrapper eventWrapper) {
RMNodeWrapper node = (RMNodeWrapper) eventWrapper.getRMNode();
List<UpdatedContainerInfo> containerList = node.getContainerUpdates();
for (UpdatedContainerInfo info : containerList) {
for (ContainerStatus status : info.getCompletedContainers()) {
ContainerId containerId = status.getContainerId();
SchedulerAppReport app = super.getSchedulerAppInfo(
containerId.getApplicationAttemptId());
if (app == null) {
// this happens for the AM container
// The app have already removed when the NM sends the release
// information.
continue;
}
int releasedMemory = 0, releasedVCores = 0;
if (status.getExitStatus() == ContainerExitStatus.SUCCESS) {
for (RMContainer rmc : app.getLiveContainers()) {
if (rmc.getContainerId() == containerId) {
Resource resource = rmc.getContainer().getResource();
releasedMemory += resource.getMemorySize();
releasedVCores += resource.getVirtualCores();
break;
}
}
} else if (status.getExitStatus() == ContainerExitStatus.ABORTED) {
if (preemptionContainerMap.containsKey(containerId)) {
Resource preResource = preemptionContainerMap.get(containerId);
releasedMemory += preResource.getMemorySize();
releasedVCores += preResource.getVirtualCores();
preemptionContainerMap.remove(containerId);
}
}
// update queue counters
String queue = getSchedulerApp(containerId.getApplicationAttemptId()).
getQueueName();
schedulerMetrics.updateQueueMetricsByRelease(
Resource.newInstance(releasedMemory, releasedVCores), queue);
}
}
}
private void updateQueueWithAllocateRequest(Allocation allocation,
ApplicationAttemptId attemptId,
List<ResourceRequest> resourceRequests,
List<ContainerId> containerIds) throws IOException {
// update queue information
Resource pendingResource = Resources.createResource(0, 0);
Resource allocatedResource = Resources.createResource(0, 0);
// container requested
for (ResourceRequest request : resourceRequests) {
if (request.getResourceName().equals(ResourceRequest.ANY)) {
Resources.addTo(pendingResource,
Resources.multiply(request.getCapability(),
request.getNumContainers()));
}
}
// container allocated
for (Container container : allocation.getContainers()) {
Resources.addTo(allocatedResource, container.getResource());
Resources.subtractFrom(pendingResource, container.getResource());
}
// container released from AM
SchedulerAppReport report = super.getSchedulerAppInfo(attemptId);
for (ContainerId containerId : containerIds) {
Container container = null;
for (RMContainer c : report.getLiveContainers()) {
if (c.getContainerId().equals(containerId)) {
container = c.getContainer();
break;
}
}
if (container != null) {
// released allocated containers
Resources.subtractFrom(allocatedResource, container.getResource());
} else {
for (RMContainer c : report.getReservedContainers()) {
if (c.getContainerId().equals(containerId)) {
container = c.getContainer();
break;
}
}
if (container != null) {
// released reserved containers
Resources.subtractFrom(pendingResource, container.getResource());
}
}
}
// containers released/preemption from scheduler
Set<ContainerId> preemptionContainers = new HashSet<ContainerId>();
if (allocation.getContainerPreemptions() != null) {
preemptionContainers.addAll(allocation.getContainerPreemptions());
}
if (allocation.getStrictContainerPreemptions() != null) {
preemptionContainers.addAll(allocation.getStrictContainerPreemptions());
}
if (!preemptionContainers.isEmpty()) {
for (ContainerId containerId : preemptionContainers) {
if (!preemptionContainerMap.containsKey(containerId)) {
Container container = null;
for (RMContainer c : report.getLiveContainers()) {
if (c.getContainerId().equals(containerId)) {
container = c.getContainer();
break;
}
}
if (container != null) {
preemptionContainerMap.put(containerId, container.getResource());
}
}
}
}
// update metrics
String queueName = getSchedulerApp(attemptId).getQueueName();
schedulerMetrics.updateQueueMetrics(pendingResource, allocatedResource,
queueName);
}
private void initQueueMetrics(FSQueue queue) {
if (queue instanceof FSLeafQueue) {
schedulerMetrics.initQueueMetric(queue.getQueueName());
return;
}
for (FSQueue child : queue.getChildQueues()) {
initQueueMetrics(child);
}
}
@Override
public void serviceInit(Configuration conf) throws Exception {
super.serviceInit(conf);
if (metricsON) {
initQueueMetrics(getQueueManager().getRootQueue());
}
}
}

View File

@ -18,40 +18,170 @@
package org.apache.hadoop.yarn.sls.scheduler;
import java.io.BufferedWriter;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.io.File;
import java.io.FileOutputStream;
import java.io.OutputStreamWriter;
import java.util.ArrayList;
import java.util.List;
import java.util.SortedMap;
import java.util.Locale;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.Lock;
import com.codahale.metrics.Counter;
import com.codahale.metrics.CsvReporter;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.SlidingWindowReservoir;
import com.codahale.metrics.Timer;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.MetricRegistry;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
import org.apache.hadoop.yarn.sls.conf.SLSConfiguration;
import org.apache.hadoop.yarn.sls.web.SLSWebApp;
import org.apache.log4j.Logger;
@Private
@Unstable
public abstract class SchedulerMetrics {
private static final String EOL = System.getProperty("line.separator");
private static final int SAMPLING_SIZE = 60;
private static final Logger LOG = Logger.getLogger(SchedulerMetrics.class);
protected ResourceScheduler scheduler;
protected Set<String> trackedQueues;
protected MetricRegistry metrics;
protected Set<String> appTrackedMetrics;
protected Set<String> queueTrackedMetrics;
private Configuration conf;
private ScheduledExecutorService pool;
private SLSWebApp web;
// metrics
private String metricsOutputDir;
private BufferedWriter metricsLogBW;
private BufferedWriter jobRuntimeLogBW;
private boolean running = false;
// counters for scheduler allocate/handle operations
private Counter schedulerAllocateCounter;
private Counter schedulerHandleCounter;
private Map<SchedulerEventType, Counter> schedulerHandleCounterMap;
// Timers for scheduler allocate/handle operations
private Timer schedulerAllocateTimer;
private Timer schedulerHandleTimer;
private Map<SchedulerEventType, Timer> schedulerHandleTimerMap;
private List<Histogram> schedulerHistogramList;
private Map<Histogram, Timer> histogramTimerMap;
private Lock samplerLock;
private Lock queueLock;
static Class getSchedulerMetricsClass(Configuration conf,
Class schedulerClass) throws ClassNotFoundException {
Class metricClass = null;
String schedulerMetricsType = conf.get(schedulerClass.getName());
if (schedulerMetricsType != null) {
metricClass = Class.forName(schedulerMetricsType);
}
if (schedulerClass.equals(FairScheduler.class)) {
metricClass = FairSchedulerMetrics.class;
} else if (schedulerClass.equals(CapacityScheduler.class)) {
metricClass = CapacitySchedulerMetrics.class;
} else if (schedulerClass.equals(FifoScheduler.class)) {
metricClass = FifoSchedulerMetrics.class;
}
return metricClass;
}
static SchedulerMetrics getInstance(Configuration conf, Class schedulerClass)
throws ClassNotFoundException {
Class schedulerMetricClass = getSchedulerMetricsClass(conf, schedulerClass);
return (SchedulerMetrics) ReflectionUtils
.newInstance(schedulerMetricClass, new Configuration());
}
public SchedulerMetrics() {
metrics = new MetricRegistry();
appTrackedMetrics = new HashSet<>();
appTrackedMetrics.add("live.containers");
appTrackedMetrics.add("reserved.containers");
queueTrackedMetrics = new HashSet<>();
trackedQueues = new HashSet<>();
samplerLock = new ReentrantLock();
queueLock = new ReentrantLock();
}
public void init(ResourceScheduler scheduler, MetricRegistry metrics) {
this.scheduler = scheduler;
this.trackedQueues = new HashSet<>();
this.metrics = metrics;
void init(ResourceScheduler resourceScheduler, Configuration config)
throws Exception {
this.scheduler = resourceScheduler;
this.conf = config;
metricsOutputDir = conf.get(SLSConfiguration.METRICS_OUTPUT_DIR);
// register various metrics
registerJvmMetrics();
registerClusterResourceMetrics();
registerContainerAppNumMetrics();
registerSchedulerMetrics();
// .csv output
initMetricsCSVOutput();
// start web app to provide real-time tracking
int metricsWebAddressPort = conf.getInt(
SLSConfiguration.METRICS_WEB_ADDRESS_PORT,
SLSConfiguration.METRICS_WEB_ADDRESS_PORT_DEFAULT);
web = new SLSWebApp((SchedulerWrapper)scheduler, metricsWebAddressPort);
web.start();
// a thread to update histogram timer
pool = new ScheduledThreadPoolExecutor(2);
pool.scheduleAtFixedRate(new HistogramsRunnable(), 0, 1000,
TimeUnit.MILLISECONDS);
// a thread to output metrics for real-tiem tracking
pool.scheduleAtFixedRate(new MetricsLogRunnable(), 0, 1000,
TimeUnit.MILLISECONDS);
// application running information
jobRuntimeLogBW =
new BufferedWriter(new OutputStreamWriter(new FileOutputStream(
metricsOutputDir + "/jobruntime.csv"), "UTF-8"));
jobRuntimeLogBW.write("JobID,real_start_time,real_end_time," +
"simulate_start_time,simulate_end_time" + EOL);
jobRuntimeLogBW.flush();
}
public MetricRegistry getMetrics() {
return metrics;
}
protected SchedulerApplicationAttempt getSchedulerAppAttempt(
@ -117,7 +247,392 @@ public abstract class SchedulerMetrics {
public Set<String> getAppTrackedMetrics() {
return appTrackedMetrics;
}
public Set<String> getQueueTrackedMetrics() {
return queueTrackedMetrics;
}
private void registerJvmMetrics() {
// add JVM gauges
metrics.register("variable.jvm.free.memory",
new Gauge<Long>() {
@Override
public Long getValue() {
return Runtime.getRuntime().freeMemory();
}
}
);
metrics.register("variable.jvm.max.memory",
new Gauge<Long>() {
@Override
public Long getValue() {
return Runtime.getRuntime().maxMemory();
}
}
);
metrics.register("variable.jvm.total.memory",
new Gauge<Long>() {
@Override
public Long getValue() {
return Runtime.getRuntime().totalMemory();
}
}
);
}
private void registerClusterResourceMetrics() {
metrics.register("variable.cluster.allocated.memory",
new Gauge<Long>() {
@Override
public Long getValue() {
if (scheduler.getRootQueueMetrics() == null) {
return 0L;
} else {
return scheduler.getRootQueueMetrics().getAllocatedMB();
}
}
}
);
metrics.register("variable.cluster.allocated.vcores",
new Gauge<Integer>() {
@Override
public Integer getValue() {
if (scheduler.getRootQueueMetrics() == null) {
return 0;
} else {
return scheduler.getRootQueueMetrics().getAllocatedVirtualCores();
}
}
}
);
metrics.register("variable.cluster.available.memory",
new Gauge<Long>() {
@Override
public Long getValue() {
if (scheduler.getRootQueueMetrics() == null) {
return 0L;
} else {
return scheduler.getRootQueueMetrics().getAvailableMB();
}
}
}
);
metrics.register("variable.cluster.available.vcores",
new Gauge<Integer>() {
@Override
public Integer getValue() {
if (scheduler.getRootQueueMetrics() == null) {
return 0;
} else {
return scheduler.getRootQueueMetrics().getAvailableVirtualCores();
}
}
}
);
}
private void registerContainerAppNumMetrics() {
metrics.register("variable.running.application",
new Gauge<Integer>() {
@Override
public Integer getValue() {
if (scheduler.getRootQueueMetrics() == null) {
return 0;
} else {
return scheduler.getRootQueueMetrics().getAppsRunning();
}
}
}
);
metrics.register("variable.running.container",
new Gauge<Integer>() {
@Override
public Integer getValue() {
if (scheduler.getRootQueueMetrics() == null) {
return 0;
} else {
return scheduler.getRootQueueMetrics().getAllocatedContainers();
}
}
}
);
}
private void registerSchedulerMetrics() {
samplerLock.lock();
try {
// counters for scheduler operations
schedulerAllocateCounter = metrics.counter(
"counter.scheduler.operation.allocate");
schedulerHandleCounter = metrics.counter(
"counter.scheduler.operation.handle");
schedulerHandleCounterMap = new HashMap<>();
for (SchedulerEventType e : SchedulerEventType.values()) {
Counter counter = metrics.counter(
"counter.scheduler.operation.handle." + e);
schedulerHandleCounterMap.put(e, counter);
}
// timers for scheduler operations
int timeWindowSize = conf.getInt(
SLSConfiguration.METRICS_TIMER_WINDOW_SIZE,
SLSConfiguration.METRICS_TIMER_WINDOW_SIZE_DEFAULT);
schedulerAllocateTimer = new Timer(
new SlidingWindowReservoir(timeWindowSize));
schedulerHandleTimer = new Timer(
new SlidingWindowReservoir(timeWindowSize));
schedulerHandleTimerMap = new HashMap<>();
for (SchedulerEventType e : SchedulerEventType.values()) {
Timer timer = new Timer(new SlidingWindowReservoir(timeWindowSize));
schedulerHandleTimerMap.put(e, timer);
}
// histogram for scheduler operations (Samplers)
schedulerHistogramList = new ArrayList<>();
histogramTimerMap = new HashMap<>();
Histogram schedulerAllocateHistogram = new Histogram(
new SlidingWindowReservoir(SAMPLING_SIZE));
metrics.register("sampler.scheduler.operation.allocate.timecost",
schedulerAllocateHistogram);
schedulerHistogramList.add(schedulerAllocateHistogram);
histogramTimerMap.put(schedulerAllocateHistogram, schedulerAllocateTimer);
Histogram schedulerHandleHistogram = new Histogram(
new SlidingWindowReservoir(SAMPLING_SIZE));
metrics.register("sampler.scheduler.operation.handle.timecost",
schedulerHandleHistogram);
schedulerHistogramList.add(schedulerHandleHistogram);
histogramTimerMap.put(schedulerHandleHistogram, schedulerHandleTimer);
for (SchedulerEventType e : SchedulerEventType.values()) {
Histogram histogram = new Histogram(
new SlidingWindowReservoir(SAMPLING_SIZE));
metrics.register(
"sampler.scheduler.operation.handle." + e + ".timecost",
histogram);
schedulerHistogramList.add(histogram);
histogramTimerMap.put(histogram, schedulerHandleTimerMap.get(e));
}
} finally {
samplerLock.unlock();
}
}
private void initMetricsCSVOutput() {
int timeIntervalMS = conf.getInt(
SLSConfiguration.METRICS_RECORD_INTERVAL_MS,
SLSConfiguration.METRICS_RECORD_INTERVAL_MS_DEFAULT);
File dir = new File(metricsOutputDir + "/metrics");
if(!dir.exists() && !dir.mkdirs()) {
LOG.error("Cannot create directory " + dir.getAbsoluteFile());
}
final CsvReporter reporter = CsvReporter.forRegistry(metrics)
.formatFor(Locale.US)
.convertRatesTo(TimeUnit.SECONDS)
.convertDurationsTo(TimeUnit.MILLISECONDS)
.build(new File(metricsOutputDir + "/metrics"));
reporter.start(timeIntervalMS, TimeUnit.MILLISECONDS);
}
boolean isRunning() {
return running;
}
void setRunning(boolean running) {
this.running = running;
}
class HistogramsRunnable implements Runnable {
@Override
public void run() {
samplerLock.lock();
try {
for (Histogram histogram : schedulerHistogramList) {
Timer timer = histogramTimerMap.get(histogram);
histogram.update((int) timer.getSnapshot().getMean());
}
} finally {
samplerLock.unlock();
}
}
}
class MetricsLogRunnable implements Runnable {
private boolean firstLine = true;
MetricsLogRunnable() {
try {
metricsLogBW =
new BufferedWriter(new OutputStreamWriter(new FileOutputStream(
metricsOutputDir + "/realtimetrack.json"), "UTF-8"));
metricsLogBW.write("[");
} catch (IOException e) {
LOG.info(e.getMessage());
}
}
@Override
public void run() {
if(running) {
// all WebApp to get real tracking json
String trackingMetrics = web.generateRealTimeTrackingMetrics();
// output
try {
if(firstLine) {
metricsLogBW.write(trackingMetrics + EOL);
firstLine = false;
} else {
metricsLogBW.write("," + trackingMetrics + EOL);
}
metricsLogBW.flush();
} catch (IOException e) {
LOG.info(e.getMessage());
}
}
}
}
void tearDown() throws Exception {
if (metricsLogBW != null) {
metricsLogBW.write("]");
metricsLogBW.close();
}
if (web != null) {
web.stop();
}
if (jobRuntimeLogBW != null) {
jobRuntimeLogBW.close();
}
if (pool != null) {
pool.shutdown();
}
}
void increaseSchedulerAllocationCounter() {
schedulerAllocateCounter.inc();
}
void increaseSchedulerHandleCounter(SchedulerEventType schedulerEventType) {
schedulerHandleCounter.inc();
schedulerHandleCounterMap.get(schedulerEventType).inc();
}
Timer getSchedulerAllocateTimer() {
return schedulerAllocateTimer;
}
Timer getSchedulerHandleTimer() {
return schedulerHandleTimer;
}
Timer getSchedulerHandleTimer(SchedulerEventType schedulerEventType) {
return schedulerHandleTimerMap.get(schedulerEventType);
}
private enum QueueMetric {
PENDING_MEMORY("pending.memory"),
PENDING_VCORES("pending.cores"),
ALLOCATED_MEMORY("allocated.memory"),
ALLOCATED_VCORES("allocated.cores");
private String value;
QueueMetric(String value) {
this.value = value;
}
}
private String getQueueMetricName(String queue, QueueMetric metric) {
return "counter.queue." + queue + "." + metric.value;
}
private void traceQueueIfNotTraced(String queue) {
queueLock.lock();
try {
if (!isTracked(queue)) {
trackQueue(queue);
}
} finally {
queueLock.unlock();
}
}
void initQueueMetric(String queueName){
SortedMap<String, Counter> counterMap = metrics.getCounters();
for (QueueMetric queueMetric : QueueMetric.values()) {
String metricName = getQueueMetricName(queueName, queueMetric);
if (!counterMap.containsKey(metricName)) {
metrics.counter(metricName);
counterMap = metrics.getCounters();
}
}
traceQueueIfNotTraced(queueName);
}
void updateQueueMetrics(Resource pendingResource, Resource allocatedResource,
String queueName) {
SortedMap<String, Counter> counterMap = metrics.getCounters();
for(QueueMetric metric : QueueMetric.values()) {
String metricName = getQueueMetricName(queueName, metric);
if (!counterMap.containsKey(metricName)) {
metrics.counter(metricName);
counterMap = metrics.getCounters();
}
if (metric == QueueMetric.PENDING_MEMORY) {
counterMap.get(metricName).inc(pendingResource.getMemorySize());
} else if (metric == QueueMetric.PENDING_VCORES) {
counterMap.get(metricName).inc(pendingResource.getVirtualCores());
} else if (metric == QueueMetric.ALLOCATED_MEMORY) {
counterMap.get(metricName).inc(allocatedResource.getMemorySize());
} else if (metric == QueueMetric.ALLOCATED_VCORES){
counterMap.get(metricName).inc(allocatedResource.getVirtualCores());
}
}
traceQueueIfNotTraced(queueName);
}
void updateQueueMetricsByRelease(Resource releaseResource, String queue) {
SortedMap<String, Counter> counterMap = metrics.getCounters();
String name = getQueueMetricName(queue, QueueMetric.ALLOCATED_MEMORY);
if (!counterMap.containsKey(name)) {
metrics.counter(name);
counterMap = metrics.getCounters();
}
counterMap.get(name).inc(-releaseResource.getMemorySize());
String vcoreMetric =
getQueueMetricName(queue, QueueMetric.ALLOCATED_VCORES);
if (!counterMap.containsKey(vcoreMetric)) {
metrics.counter(vcoreMetric);
counterMap = metrics.getCounters();
}
counterMap.get(vcoreMetric).inc(-releaseResource.getVirtualCores());
}
public void addTrackedApp(ApplicationId appId,
String oldAppId) {
trackApp(appId, oldAppId);
}
public void removeTrackedApp(String oldAppId) {
untrackApp(oldAppId);
}
public void addAMRuntime(ApplicationId appId, long traceStartTimeMS,
long traceEndTimeMS, long simulateStartTimeMS, long simulateEndTimeMS) {
try {
// write job runtime information
StringBuilder sb = new StringBuilder();
sb.append(appId).append(",").append(traceStartTimeMS).append(",")
.append(traceEndTimeMS).append(",").append(simulateStartTimeMS)
.append(",").append(simulateEndTimeMS);
jobRuntimeLogBW.write(sb.toString() + EOL);
jobRuntimeLogBW.flush();
} catch (IOException e) {
LOG.info(e.getMessage());
}
}
}

View File

@ -17,28 +17,12 @@
*/
package org.apache.hadoop.yarn.sls.scheduler;
import java.util.Set;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import com.codahale.metrics.MetricRegistry;
@Private
@Unstable
public interface SchedulerWrapper {
MetricRegistry getMetrics();
SchedulerMetrics getSchedulerMetrics();
Set<String> getQueueSet();
void setQueueSet(Set<String> queues);
Set<String> getTrackedAppSet();
void setTrackedAppSet(Set<String> apps);
void addTrackedApp(ApplicationId appId, String oldAppId);
void removeTrackedApp(String oldAppId);
void addAMRuntime(ApplicationId appId,
long traceStartTimeMS, long traceEndTimeMS,
long simulateStartTimeMS, long simulateEndTimeMS);
Tracker getTracker();
}

View File

@ -0,0 +1,46 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.sls.scheduler;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import java.util.Set;
@Private
@Unstable
public class Tracker {
private Set<String> queueSet;
private Set<String> trackedAppSet;
public void setQueueSet(Set<String> queues) {
queueSet = queues;
}
public Set<String> getQueueSet() {
return queueSet;
}
public void setTrackedAppSet(Set<String> apps) {
trackedAppSet = apps;
}
public Set<String> getTrackedAppSet() {
return trackedAppSet;
}
}

View File

@ -43,6 +43,7 @@ import org.apache.hadoop.tools.rumen.LoggedTaskAttempt;
@Private
@Unstable
public class SLSUtils {
public static final int SHUTDOWN_HOOK_PRIORITY = 30;
// hostname includes the network path and the host name. for example
// "/default-rack/hostFoo" or "/coreSwitchA/TORSwitchB/hostBar".

View File

@ -107,12 +107,12 @@ public class SLSWebApp extends HttpServlet {
public SLSWebApp(SchedulerWrapper wrapper, int metricsAddressPort) {
this.wrapper = wrapper;
metrics = wrapper.getMetrics();
handleOperTimecostHistogramMap =
new HashMap<SchedulerEventType, Histogram>();
queueAllocatedMemoryCounterMap = new HashMap<String, Counter>();
queueAllocatedVCoresCounterMap = new HashMap<String, Counter>();
schedulerMetrics = wrapper.getSchedulerMetrics();
metrics = schedulerMetrics.getMetrics();
port = metricsAddressPort;
}
@ -226,7 +226,7 @@ public class SLSWebApp extends HttpServlet {
response.setStatus(HttpServletResponse.SC_OK);
// queues {0}
Set<String> queues = wrapper.getQueueSet();
Set<String> queues = wrapper.getTracker().getQueueSet();
StringBuilder queueInfo = new StringBuilder();
int i = 0;
@ -265,7 +265,7 @@ public class SLSWebApp extends HttpServlet {
// tracked queues {0}
StringBuilder trackedQueueInfo = new StringBuilder();
Set<String> trackedQueues = wrapper.getQueueSet();
Set<String> trackedQueues = wrapper.getTracker().getQueueSet();
for(String queue : trackedQueues) {
trackedQueueInfo.append("<option value='Queue ").append(queue)
.append("'>").append(queue).append("</option>");
@ -273,7 +273,7 @@ public class SLSWebApp extends HttpServlet {
// tracked apps {1}
StringBuilder trackedAppInfo = new StringBuilder();
Set<String> trackedApps = wrapper.getTrackedAppSet();
Set<String> trackedApps = wrapper.getTracker().getTrackedAppSet();
for(String job : trackedApps) {
trackedAppInfo.append("<option value='Job ").append(job)
.append("'>").append(job).append("</option>");
@ -422,7 +422,7 @@ public class SLSWebApp extends HttpServlet {
// allocated resource for each queue
Map<String, Double> queueAllocatedMemoryMap = new HashMap<String, Double>();
Map<String, Long> queueAllocatedVCoresMap = new HashMap<String, Long>();
for (String queue : wrapper.getQueueSet()) {
for (String queue : wrapper.getTracker().getQueueSet()) {
// memory
String key = "counter.queue." + queue + ".allocated.memory";
if (! queueAllocatedMemoryCounterMap.containsKey(queue) &&
@ -462,7 +462,7 @@ public class SLSWebApp extends HttpServlet {
.append(",\"cluster.available.memory\":").append(availableMemoryGB)
.append(",\"cluster.available.vcores\":").append(availableVCoresGB);
for (String queue : wrapper.getQueueSet()) {
for (String queue : wrapper.getTracker().getQueueSet()) {
sb.append(",\"queue.").append(queue).append(".allocated.memory\":")
.append(queueAllocatedMemoryMap.get(queue));
sb.append(",\"queue.").append(queue).append(".allocated.vcores\":")

View File

@ -22,37 +22,56 @@ import org.apache.commons.io.FileUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.sls.conf.SLSConfiguration;
import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator;
import org.apache.hadoop.yarn.sls.scheduler.FairSchedulerMetrics;
import org.apache.hadoop.yarn.sls.scheduler.SchedulerWrapper;
import org.apache.hadoop.yarn.sls.scheduler.*;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
@RunWith(Parameterized.class)
public class TestAMSimulator {
private ResourceManager rm;
private YarnConfiguration conf;
private Path metricOutputDir;
private Class slsScheduler;
private Class scheduler;
@Parameterized.Parameters
public static Collection<Object[]> params() {
return Arrays.asList(new Object[][] {
{SLSFairScheduler.class, FairScheduler.class},
{SLSCapacityScheduler.class, CapacityScheduler.class}
});
}
public TestAMSimulator(Class slsScheduler, Class scheduler) {
this.slsScheduler = slsScheduler;
this.scheduler = scheduler;
}
@Before
public void setup() {
createMetricOutputDir();
conf = new YarnConfiguration();
conf.set(SLSConfiguration.METRICS_OUTPUT_DIR, metricOutputDir.toString());
conf.set(YarnConfiguration.RM_SCHEDULER,
"org.apache.hadoop.yarn.sls.scheduler.ResourceSchedulerWrapper");
conf.set(SLSConfiguration.RM_SCHEDULER,
"org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler");
conf.set(YarnConfiguration.RM_SCHEDULER, slsScheduler.getName());
conf.set(SLSConfiguration.RM_SCHEDULER, scheduler.getName());
conf.setBoolean(SLSConfiguration.METRICS_SWITCH, true);
rm = new ResourceManager();
rm.init(conf);
@ -76,15 +95,17 @@ public class TestAMSimulator {
}
private void verifySchedulerMetrics(String appId) {
SchedulerWrapper schedulerWrapper = (SchedulerWrapper)
rm.getResourceScheduler();
MetricRegistry metricRegistry = schedulerWrapper.getMetrics();
for (FairSchedulerMetrics.Metric metric :
FairSchedulerMetrics.Metric.values()) {
String key = "variable.app." + appId + "." + metric.getValue()
+ ".memory";
Assert.assertTrue(metricRegistry.getGauges().containsKey(key));
Assert.assertNotNull(metricRegistry.getGauges().get(key).getValue());
if (scheduler.equals(FairScheduler.class)) {
SchedulerMetrics schedulerMetrics = ((SchedulerWrapper)
rm.getResourceScheduler()).getSchedulerMetrics();
MetricRegistry metricRegistry = schedulerMetrics.getMetrics();
for (FairSchedulerMetrics.Metric metric :
FairSchedulerMetrics.Metric.values()) {
String key = "variable.app." + appId + "." + metric.getValue() +
".memory";
Assert.assertTrue(metricRegistry.getGauges().containsKey(key));
Assert.assertNotNull(metricRegistry.getGauges().get(key).getValue());
}
}
}

View File

@ -21,26 +21,50 @@ import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.sls.conf.SLSConfiguration;
import org.apache.hadoop.yarn.sls.scheduler.SLSCapacityScheduler;
import org.apache.hadoop.yarn.sls.scheduler.SLSFairScheduler;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.util.Arrays;
import java.util.Collection;
@RunWith(Parameterized.class)
public class TestNMSimulator {
private final int GB = 1024;
private ResourceManager rm;
private YarnConfiguration conf;
private Class slsScheduler;
private Class scheduler;
@Parameterized.Parameters
public static Collection<Object[]> params() {
return Arrays.asList(new Object[][] {
{SLSFairScheduler.class, FairScheduler.class},
{SLSCapacityScheduler.class, CapacityScheduler.class}
});
}
public TestNMSimulator(Class slsScheduler, Class scheduler) {
this.slsScheduler = slsScheduler;
this.scheduler = scheduler;
}
@Before
public void setup() {
conf = new YarnConfiguration();
conf.set(YarnConfiguration.RM_SCHEDULER,
"org.apache.hadoop.yarn.sls.scheduler.ResourceSchedulerWrapper");
conf.set(SLSConfiguration.RM_SCHEDULER,
"org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler");
conf.set(YarnConfiguration.RM_SCHEDULER, slsScheduler.getName());
conf.set(SLSConfiguration.RM_SCHEDULER, scheduler.getName());
conf.setBoolean(SLSConfiguration.METRICS_SWITCH, false);
rm = new ResourceManager();
rm.init(conf);