diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java index 79437f98938..501d11e0ccd 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java @@ -40,12 +40,15 @@ import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.sls.appmaster.AMSimulator; import org.apache.hadoop.yarn.sls.conf.SLSConfiguration; import org.apache.hadoop.yarn.sls.nodemanager.NMSimulator; import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator; import org.apache.hadoop.yarn.sls.scheduler.ResourceSchedulerWrapper; +import org.apache.hadoop.yarn.sls.scheduler.SLSCapacityScheduler; import org.apache.hadoop.yarn.sls.scheduler.TaskRunner; +import org.apache.hadoop.yarn.sls.scheduler.SchedulerWrapper; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; @@ -140,9 +143,9 @@ public void start() throws Exception { // start application masters startAM(); // set queue & tracked apps information - ((ResourceSchedulerWrapper) rm.getResourceScheduler()) + ((SchedulerWrapper) rm.getResourceScheduler()) .setQueueSet(this.queueAppNumMap.keySet()); - ((ResourceSchedulerWrapper) rm.getResourceScheduler()) + ((SchedulerWrapper) rm.getResourceScheduler()) .setTrackedAppSet(this.trackedApps); // print out simulation info printSimulationInfo(); @@ -151,13 +154,24 @@ public void start() throws Exception { // starting the runner once everything is ready to go, runner.start(); } - + private void startRM() throws IOException, ClassNotFoundException { Configuration rmConf = new YarnConfiguration(); String schedulerClass = rmConf.get(YarnConfiguration.RM_SCHEDULER); - rmConf.set(SLSConfiguration.RM_SCHEDULER, schedulerClass); - rmConf.set(YarnConfiguration.RM_SCHEDULER, - ResourceSchedulerWrapper.class.getName()); + + // For CapacityScheduler we use a sub-classing instead of wrapping + // to allow scheduler-specific invocations from monitors to work + // this can be used for other schedulers as well if we care to + // exercise/track behaviors that are not common to the scheduler api + if(Class.forName(schedulerClass) == CapacityScheduler.class) { + rmConf.set(YarnConfiguration.RM_SCHEDULER, + SLSCapacityScheduler.class.getName()); + } else { + rmConf.set(YarnConfiguration.RM_SCHEDULER, + ResourceSchedulerWrapper.class.getName()); + rmConf.set(SLSConfiguration.RM_SCHEDULER, schedulerClass); + } + rmConf.set(SLSConfiguration.METRICS_OUTPUT_DIR, metricsOutputDir); rm = new ResourceManager(); rm.init(rmConf); diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java index c6b994e5400..67c09940120 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java @@ -65,7 +65,7 @@ import org.apache.log4j.Logger; import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator; -import org.apache.hadoop.yarn.sls.scheduler.ResourceSchedulerWrapper; +import org.apache.hadoop.yarn.sls.scheduler.SchedulerWrapper; import org.apache.hadoop.yarn.sls.SLSRunner; import org.apache.hadoop.yarn.sls.scheduler.TaskRunner; import org.apache.hadoop.yarn.sls.utils.SLSUtils; @@ -193,7 +193,7 @@ public Object run() throws Exception { simulateFinishTimeMS = System.currentTimeMillis() - SLSRunner.getRunner().getStartTimeMS(); // record job running information - ((ResourceSchedulerWrapper)rm.getResourceScheduler()) + ((SchedulerWrapper)rm.getResourceScheduler()) .addAMRuntime(appId, traceStartTimeMS, traceFinishTimeMS, simulateStartTimeMS, simulateFinishTimeMS); @@ -314,13 +314,13 @@ public RegisterApplicationMasterResponse run() throws Exception { private void trackApp() { if (isTracked) { - ((ResourceSchedulerWrapper) rm.getResourceScheduler()) + ((SchedulerWrapper) rm.getResourceScheduler()) .addTrackedApp(appAttemptId, oldAppId); } } public void untrackApp() { if (isTracked) { - ((ResourceSchedulerWrapper) rm.getResourceScheduler()) + ((SchedulerWrapper) rm.getResourceScheduler()) .removeTrackedApp(appAttemptId, oldAppId); } } diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java index ad066e75d28..bc7f7a086ad 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java @@ -85,8 +85,8 @@ import com.codahale.metrics.SlidingWindowReservoir; import com.codahale.metrics.Timer; -public class ResourceSchedulerWrapper implements ResourceScheduler, - Configurable { +public class ResourceSchedulerWrapper implements + SchedulerWrapper,ResourceScheduler,Configurable { private static final String EOL = System.getProperty("line.separator"); private static final int SAMPLING_SIZE = 60; private ScheduledExecutorService pool; @@ -150,9 +150,8 @@ public ResourceSchedulerWrapper() { public void setConf(Configuration conf) { this.conf = conf; // set scheduler - Class klass = - conf.getClass(SLSConfiguration.RM_SCHEDULER, null, - ResourceScheduler.class); + Class klass = conf.getClass( + SLSConfiguration.RM_SCHEDULER, null, ResourceScheduler.class); scheduler = ReflectionUtils.newInstance(klass, conf); // start metrics @@ -861,4 +860,3 @@ public List getAppsInQueue(String queue) { return scheduler.getAppsInQueue(queue); } } - diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java new file mode 100644 index 00000000000..1b304de79af --- /dev/null +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java @@ -0,0 +1,808 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.sls.scheduler; + +import org.apache.hadoop.util.ShutdownHookManager; +import org.apache.hadoop.yarn.sls.SLSRunner; +import org.apache.hadoop.yarn.sls.conf.SLSConfiguration; +import org.apache.hadoop.yarn.sls.web.SLSWebApp; +import com.codahale.metrics.Counter; +import com.codahale.metrics.CsvReporter; +import com.codahale.metrics.Gauge; +import com.codahale.metrics.Histogram; +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.SlidingWindowReservoir; +import com.codahale.metrics.Timer; + +import org.apache.hadoop.security.AccessControlException; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerExitStatus; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.QueueInfo; +import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; +import org.apache.hadoop.yarn.api.records.QueueACL; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode + .UpdatedContainerInfo; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler + .ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler + .SchedulerAppReport; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler + .SchedulerNodeReport; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity + .CapacityScheduler; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event + .NodeUpdateSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event + .SchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event + .SchedulerEventType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair + .FairScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo + .FifoScheduler; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.apache.log4j.Logger; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import java.util.SortedMap; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +public class SLSCapacityScheduler extends CapacityScheduler implements + SchedulerWrapper,Configurable { + private static final String EOL = System.getProperty("line.separator"); + private static final int SAMPLING_SIZE = 60; + private ScheduledExecutorService pool; + // counters for scheduler allocate/handle operations + private Counter schedulerAllocateCounter; + private Counter schedulerHandleCounter; + private Map schedulerHandleCounterMap; + // Timers for scheduler allocate/handle operations + private Timer schedulerAllocateTimer; + private Timer schedulerHandleTimer; + private Map schedulerHandleTimerMap; + private List schedulerHistogramList; + private Map histogramTimerMap; + private Lock samplerLock; + private Lock queueLock; + + private Configuration conf; + + private Map appQueueMap = + new ConcurrentHashMap(); + private BufferedWriter jobRuntimeLogBW; + + // Priority of the ResourceSchedulerWrapper shutdown hook. + public static final int SHUTDOWN_HOOK_PRIORITY = 30; + + // web app + private SLSWebApp web; + + private Map preemptionContainerMap = + new ConcurrentHashMap(); + + // metrics + private MetricRegistry metrics; + private SchedulerMetrics schedulerMetrics; + private boolean metricsON; + private String metricsOutputDir; + private BufferedWriter metricsLogBW; + private boolean running = false; + private static Map defaultSchedulerMetricsMap = + new HashMap(); + static { + defaultSchedulerMetricsMap.put(FairScheduler.class, + FairSchedulerMetrics.class); + defaultSchedulerMetricsMap.put(FifoScheduler.class, + FifoSchedulerMetrics.class); + defaultSchedulerMetricsMap.put(CapacityScheduler.class, + CapacitySchedulerMetrics.class); + } + // must set by outside + private Set queueSet; + private Set trackedAppSet; + + public final Logger LOG = Logger.getLogger(SLSCapacityScheduler.class); + + public SLSCapacityScheduler() { + samplerLock = new ReentrantLock(); + queueLock = new ReentrantLock(); + } + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + super.setConf(conf); + // start metrics + metricsON = conf.getBoolean(SLSConfiguration.METRICS_SWITCH, true); + if (metricsON) { + try { + initMetrics(); + } catch (Exception e) { + e.printStackTrace(); + } + } + + ShutdownHookManager.get().addShutdownHook(new Runnable() { + @Override + public void run() { + try { + if (metricsLogBW != null) { + metricsLogBW.write("]"); + metricsLogBW.close(); + } + if (web != null) { + web.stop(); + } + tearDown(); + } catch (Exception e) { + e.printStackTrace(); + } + } + }, SHUTDOWN_HOOK_PRIORITY); + } + + @Override + public Allocation allocate(ApplicationAttemptId attemptId, + List resourceRequests, + List containerIds, + List strings, List strings2) { + if (metricsON) { + final Timer.Context context = schedulerAllocateTimer.time(); + Allocation allocation = null; + try { + allocation = super.allocate(attemptId, resourceRequests, + containerIds, strings, strings2); + return allocation; + } finally { + context.stop(); + schedulerAllocateCounter.inc(); + try { + updateQueueWithAllocateRequest(allocation, attemptId, + resourceRequests, containerIds); + } catch (IOException e) { + e.printStackTrace(); + } + } + } else { + return super.allocate(attemptId, + resourceRequests, containerIds, strings, strings2); + } + } + + @Override + public void handle(SchedulerEvent schedulerEvent) { + // metrics off + if (! metricsON) { + super.handle(schedulerEvent); + return; + } + if(!running) running = true; + + // metrics on + Timer.Context handlerTimer = null; + Timer.Context operationTimer = null; + + NodeUpdateSchedulerEventWrapper eventWrapper; + try { + //if (schedulerEvent instanceof NodeUpdateSchedulerEvent) { + if (schedulerEvent.getType() == SchedulerEventType.NODE_UPDATE + && schedulerEvent instanceof NodeUpdateSchedulerEvent) { + eventWrapper = new NodeUpdateSchedulerEventWrapper( + (NodeUpdateSchedulerEvent)schedulerEvent); + schedulerEvent = eventWrapper; + updateQueueWithNodeUpdate(eventWrapper); + } else if (schedulerEvent.getType() == SchedulerEventType.APP_ATTEMPT_REMOVED + && schedulerEvent instanceof AppAttemptRemovedSchedulerEvent) { + // check if having AM Container, update resource usage information + AppAttemptRemovedSchedulerEvent appRemoveEvent = + (AppAttemptRemovedSchedulerEvent) schedulerEvent; + ApplicationAttemptId appAttemptId = + appRemoveEvent.getApplicationAttemptID(); + String queue = appQueueMap.get(appAttemptId); + SchedulerAppReport app = super.getSchedulerAppInfo(appAttemptId); + if (! app.getLiveContainers().isEmpty()) { // have 0 or 1 + // should have one container which is AM container + RMContainer rmc = app.getLiveContainers().iterator().next(); + updateQueueMetrics(queue, + rmc.getContainer().getResource().getMemory(), + rmc.getContainer().getResource().getVirtualCores()); + } + } + + handlerTimer = schedulerHandleTimer.time(); + operationTimer = schedulerHandleTimerMap + .get(schedulerEvent.getType()).time(); + + super.handle(schedulerEvent); + } finally { + if (handlerTimer != null) handlerTimer.stop(); + if (operationTimer != null) operationTimer.stop(); + schedulerHandleCounter.inc(); + schedulerHandleCounterMap.get(schedulerEvent.getType()).inc(); + + if (schedulerEvent.getType() == SchedulerEventType.APP_ATTEMPT_REMOVED + && schedulerEvent instanceof AppAttemptRemovedSchedulerEvent) { + SLSRunner.decreaseRemainingApps(); + AppAttemptRemovedSchedulerEvent appRemoveEvent = + (AppAttemptRemovedSchedulerEvent) schedulerEvent; + ApplicationAttemptId appAttemptId = + appRemoveEvent.getApplicationAttemptID(); + appQueueMap.remove(appRemoveEvent.getApplicationAttemptID()); + } else if (schedulerEvent.getType() == SchedulerEventType.APP_ATTEMPT_ADDED + && schedulerEvent instanceof AppAttemptAddedSchedulerEvent) { + AppAttemptAddedSchedulerEvent appAddEvent = + (AppAttemptAddedSchedulerEvent) schedulerEvent; + String queueName = appAddEvent.getQueue(); + appQueueMap.put(appAddEvent.getApplicationAttemptId(), queueName); + } + } + } + + private void updateQueueWithNodeUpdate( + NodeUpdateSchedulerEventWrapper eventWrapper) { + RMNodeWrapper node = (RMNodeWrapper) eventWrapper.getRMNode(); + List containerList = node.getContainerUpdates(); + for (UpdatedContainerInfo info : containerList) { + for (ContainerStatus status : info.getCompletedContainers()) { + ContainerId containerId = status.getContainerId(); + SchedulerAppReport app = super.getSchedulerAppInfo( + containerId.getApplicationAttemptId()); + + if (app == null) { + // this happens for the AM container + // The app have already removed when the NM sends the release + // information. + continue; + } + + String queue = appQueueMap.get(containerId.getApplicationAttemptId()); + int releasedMemory = 0, releasedVCores = 0; + if (status.getExitStatus() == ContainerExitStatus.SUCCESS) { + for (RMContainer rmc : app.getLiveContainers()) { + if (rmc.getContainerId() == containerId) { + releasedMemory += rmc.getContainer().getResource().getMemory(); + releasedVCores += rmc.getContainer() + .getResource().getVirtualCores(); + break; + } + } + } else if (status.getExitStatus() == ContainerExitStatus.ABORTED) { + if (preemptionContainerMap.containsKey(containerId)) { + Resource preResource = preemptionContainerMap.get(containerId); + releasedMemory += preResource.getMemory(); + releasedVCores += preResource.getVirtualCores(); + preemptionContainerMap.remove(containerId); + } + } + // update queue counters + updateQueueMetrics(queue, releasedMemory, releasedVCores); + } + } + } + + private void updateQueueWithAllocateRequest(Allocation allocation, + ApplicationAttemptId attemptId, + List resourceRequests, + List containerIds) throws IOException { + // update queue information + Resource pendingResource = Resources.createResource(0, 0); + Resource allocatedResource = Resources.createResource(0, 0); + String queueName = appQueueMap.get(attemptId); + // container requested + for (ResourceRequest request : resourceRequests) { + if (request.getResourceName().equals(ResourceRequest.ANY)) { + Resources.addTo(pendingResource, + Resources.multiply(request.getCapability(), + request.getNumContainers())); + } + } + // container allocated + for (Container container : allocation.getContainers()) { + Resources.addTo(allocatedResource, container.getResource()); + Resources.subtractFrom(pendingResource, container.getResource()); + } + // container released from AM + SchedulerAppReport report = super.getSchedulerAppInfo(attemptId); + for (ContainerId containerId : containerIds) { + Container container = null; + for (RMContainer c : report.getLiveContainers()) { + if (c.getContainerId().equals(containerId)) { + container = c.getContainer(); + break; + } + } + if (container != null) { + // released allocated containers + Resources.subtractFrom(allocatedResource, container.getResource()); + } else { + for (RMContainer c : report.getReservedContainers()) { + if (c.getContainerId().equals(containerId)) { + container = c.getContainer(); + break; + } + } + if (container != null) { + // released reserved containers + Resources.subtractFrom(pendingResource, container.getResource()); + } + } + } + // containers released/preemption from scheduler + Set preemptionContainers = new HashSet(); + if (allocation.getContainerPreemptions() != null) { + preemptionContainers.addAll(allocation.getContainerPreemptions()); + } + if (allocation.getStrictContainerPreemptions() != null) { + preemptionContainers.addAll(allocation.getStrictContainerPreemptions()); + } + if (! preemptionContainers.isEmpty()) { + for (ContainerId containerId : preemptionContainers) { + if (! preemptionContainerMap.containsKey(containerId)) { + Container container = null; + for (RMContainer c : report.getLiveContainers()) { + if (c.getContainerId().equals(containerId)) { + container = c.getContainer(); + break; + } + } + if (container != null) { + preemptionContainerMap.put(containerId, container.getResource()); + } + } + + } + } + + // update metrics + SortedMap counterMap = metrics.getCounters(); + String names[] = new String[]{ + "counter.queue." + queueName + ".pending.memory", + "counter.queue." + queueName + ".pending.cores", + "counter.queue." + queueName + ".allocated.memory", + "counter.queue." + queueName + ".allocated.cores"}; + int values[] = new int[]{pendingResource.getMemory(), + pendingResource.getVirtualCores(), + allocatedResource.getMemory(), allocatedResource.getVirtualCores()}; + for (int i = names.length - 1; i >= 0; i --) { + if (! counterMap.containsKey(names[i])) { + metrics.counter(names[i]); + counterMap = metrics.getCounters(); + } + counterMap.get(names[i]).inc(values[i]); + } + + queueLock.lock(); + try { + if (! schedulerMetrics.isTracked(queueName)) { + schedulerMetrics.trackQueue(queueName); + } + } finally { + queueLock.unlock(); + } + } + + private void tearDown() throws IOException { + // close job runtime writer + if (jobRuntimeLogBW != null) { + jobRuntimeLogBW.close(); + } + // shut pool + if (pool != null) pool.shutdown(); + } + + @SuppressWarnings("unchecked") + private void initMetrics() throws Exception { + metrics = new MetricRegistry(); + // configuration + metricsOutputDir = conf.get(SLSConfiguration.METRICS_OUTPUT_DIR); + int metricsWebAddressPort = conf.getInt( + SLSConfiguration.METRICS_WEB_ADDRESS_PORT, + SLSConfiguration.METRICS_WEB_ADDRESS_PORT_DEFAULT); + // create SchedulerMetrics for current scheduler + String schedulerMetricsType = conf.get(CapacityScheduler.class.getName()); + Class schedulerMetricsClass = schedulerMetricsType == null? + defaultSchedulerMetricsMap.get(CapacityScheduler.class) : + Class.forName(schedulerMetricsType); + schedulerMetrics = (SchedulerMetrics)ReflectionUtils + .newInstance(schedulerMetricsClass, new Configuration()); + schedulerMetrics.init(this, metrics); + + // register various metrics + registerJvmMetrics(); + registerClusterResourceMetrics(); + registerContainerAppNumMetrics(); + registerSchedulerMetrics(); + + // .csv output + initMetricsCSVOutput(); + + // start web app to provide real-time tracking + web = new SLSWebApp(this, metricsWebAddressPort); + web.start(); + + // a thread to update histogram timer + pool = new ScheduledThreadPoolExecutor(2); + pool.scheduleAtFixedRate(new HistogramsRunnable(), 0, 1000, + TimeUnit.MILLISECONDS); + + // a thread to output metrics for real-tiem tracking + pool.scheduleAtFixedRate(new MetricsLogRunnable(), 0, 1000, + TimeUnit.MILLISECONDS); + + // application running information + jobRuntimeLogBW = new BufferedWriter( + new FileWriter(metricsOutputDir + "/jobruntime.csv")); + jobRuntimeLogBW.write("JobID,real_start_time,real_end_time," + + "simulate_start_time,simulate_end_time" + EOL); + jobRuntimeLogBW.flush(); + } + + private void registerJvmMetrics() { + // add JVM gauges + metrics.register("variable.jvm.free.memory", + new Gauge() { + @Override + public Long getValue() { + return Runtime.getRuntime().freeMemory(); + } + } + ); + metrics.register("variable.jvm.max.memory", + new Gauge() { + @Override + public Long getValue() { + return Runtime.getRuntime().maxMemory(); + } + } + ); + metrics.register("variable.jvm.total.memory", + new Gauge() { + @Override + public Long getValue() { + return Runtime.getRuntime().totalMemory(); + } + } + ); + } + + private void registerClusterResourceMetrics() { + metrics.register("variable.cluster.allocated.memory", + new Gauge() { + @Override + public Integer getValue() { + if( getRootQueueMetrics() == null) { + return 0; + } else { + return getRootQueueMetrics().getAllocatedMB(); + } + } + } + ); + metrics.register("variable.cluster.allocated.vcores", + new Gauge() { + @Override + public Integer getValue() { + if(getRootQueueMetrics() == null) { + return 0; + } else { + return getRootQueueMetrics().getAllocatedVirtualCores(); + } + } + } + ); + metrics.register("variable.cluster.available.memory", + new Gauge() { + @Override + public Integer getValue() { + if(getRootQueueMetrics() == null) { + return 0; + } else { + return getRootQueueMetrics().getAvailableMB(); + } + } + } + ); + metrics.register("variable.cluster.available.vcores", + new Gauge() { + @Override + public Integer getValue() { + if(getRootQueueMetrics() == null) { + return 0; + } else { + return getRootQueueMetrics().getAvailableVirtualCores(); + } + } + } + ); + } + + private void registerContainerAppNumMetrics() { + metrics.register("variable.running.application", + new Gauge() { + @Override + public Integer getValue() { + if(getRootQueueMetrics() == null) { + return 0; + } else { + return getRootQueueMetrics().getAppsRunning(); + } + } + } + ); + metrics.register("variable.running.container", + new Gauge() { + @Override + public Integer getValue() { + if(getRootQueueMetrics() == null) { + return 0; + } else { + return getRootQueueMetrics().getAllocatedContainers(); + } + } + } + ); + } + + private void registerSchedulerMetrics() { + samplerLock.lock(); + try { + // counters for scheduler operations + schedulerAllocateCounter = metrics.counter( + "counter.scheduler.operation.allocate"); + schedulerHandleCounter = metrics.counter( + "counter.scheduler.operation.handle"); + schedulerHandleCounterMap = new HashMap(); + for (SchedulerEventType e : SchedulerEventType.values()) { + Counter counter = metrics.counter( + "counter.scheduler.operation.handle." + e); + schedulerHandleCounterMap.put(e, counter); + } + // timers for scheduler operations + int timeWindowSize = conf.getInt( + SLSConfiguration.METRICS_TIMER_WINDOW_SIZE, + SLSConfiguration.METRICS_TIMER_WINDOW_SIZE_DEFAULT); + schedulerAllocateTimer = new Timer( + new SlidingWindowReservoir(timeWindowSize)); + schedulerHandleTimer = new Timer( + new SlidingWindowReservoir(timeWindowSize)); + schedulerHandleTimerMap = new HashMap(); + for (SchedulerEventType e : SchedulerEventType.values()) { + Timer timer = new Timer(new SlidingWindowReservoir(timeWindowSize)); + schedulerHandleTimerMap.put(e, timer); + } + // histogram for scheduler operations (Samplers) + schedulerHistogramList = new ArrayList(); + histogramTimerMap = new HashMap(); + Histogram schedulerAllocateHistogram = new Histogram( + new SlidingWindowReservoir(SAMPLING_SIZE)); + metrics.register("sampler.scheduler.operation.allocate.timecost", + schedulerAllocateHistogram); + schedulerHistogramList.add(schedulerAllocateHistogram); + histogramTimerMap.put(schedulerAllocateHistogram, schedulerAllocateTimer); + Histogram schedulerHandleHistogram = new Histogram( + new SlidingWindowReservoir(SAMPLING_SIZE)); + metrics.register("sampler.scheduler.operation.handle.timecost", + schedulerHandleHistogram); + schedulerHistogramList.add(schedulerHandleHistogram); + histogramTimerMap.put(schedulerHandleHistogram, schedulerHandleTimer); + for (SchedulerEventType e : SchedulerEventType.values()) { + Histogram histogram = new Histogram( + new SlidingWindowReservoir(SAMPLING_SIZE)); + metrics.register( + "sampler.scheduler.operation.handle." + e + ".timecost", + histogram); + schedulerHistogramList.add(histogram); + histogramTimerMap.put(histogram, schedulerHandleTimerMap.get(e)); + } + } finally { + samplerLock.unlock(); + } + } + + private void initMetricsCSVOutput() { + int timeIntervalMS = conf.getInt( + SLSConfiguration.METRICS_RECORD_INTERVAL_MS, + SLSConfiguration.METRICS_RECORD_INTERVAL_MS_DEFAULT); + File dir = new File(metricsOutputDir + "/metrics"); + if(! dir.exists() + && ! dir.mkdirs()) { + LOG.error("Cannot create directory " + dir.getAbsoluteFile()); + } + final CsvReporter reporter = CsvReporter.forRegistry(metrics) + .formatFor(Locale.US) + .convertRatesTo(TimeUnit.SECONDS) + .convertDurationsTo(TimeUnit.MILLISECONDS) + .build(new File(metricsOutputDir + "/metrics")); + reporter.start(timeIntervalMS, TimeUnit.MILLISECONDS); + } + + class HistogramsRunnable implements Runnable { + @Override + public void run() { + samplerLock.lock(); + try { + for (Histogram histogram : schedulerHistogramList) { + Timer timer = histogramTimerMap.get(histogram); + histogram.update((int) timer.getSnapshot().getMean()); + } + } finally { + samplerLock.unlock(); + } + } + } + + class MetricsLogRunnable implements Runnable { + private boolean firstLine = true; + public MetricsLogRunnable() { + try { + metricsLogBW = new BufferedWriter( + new FileWriter(metricsOutputDir + "/realtimetrack.json")); + metricsLogBW.write("["); + } catch (IOException e) { + e.printStackTrace(); + } + } + + @Override + public void run() { + if(running) { + // all WebApp to get real tracking json + String metrics = web.generateRealTimeTrackingMetrics(); + // output + try { + if(firstLine) { + metricsLogBW.write(metrics + EOL); + firstLine = false; + } else { + metricsLogBW.write("," + metrics + EOL); + } + metricsLogBW.flush(); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + } + + // the following functions are used by AMSimulator + public void addAMRuntime(ApplicationId appId, + long traceStartTimeMS, long traceEndTimeMS, + long simulateStartTimeMS, long simulateEndTimeMS) { + + try { + // write job runtime information + StringBuilder sb = new StringBuilder(); + sb.append(appId).append(",").append(traceStartTimeMS).append(",") + .append(traceEndTimeMS).append(",").append(simulateStartTimeMS) + .append(",").append(simulateEndTimeMS); + jobRuntimeLogBW.write(sb.toString() + EOL); + jobRuntimeLogBW.flush(); + } catch (IOException e) { + e.printStackTrace(); + } + } + + private void updateQueueMetrics(String queue, + int releasedMemory, int releasedVCores) { + // update queue counters + SortedMap counterMap = metrics.getCounters(); + if (releasedMemory != 0) { + String name = "counter.queue." + queue + ".allocated.memory"; + if (! counterMap.containsKey(name)) { + metrics.counter(name); + counterMap = metrics.getCounters(); + } + counterMap.get(name).inc(-releasedMemory); + } + if (releasedVCores != 0) { + String name = "counter.queue." + queue + ".allocated.cores"; + if (! counterMap.containsKey(name)) { + metrics.counter(name); + counterMap = metrics.getCounters(); + } + counterMap.get(name).inc(-releasedVCores); + } + } + + public void setQueueSet(Set queues) { + this.queueSet = queues; + } + + public Set getQueueSet() { + return this.queueSet; + } + + public void setTrackedAppSet(Set apps) { + this.trackedAppSet = apps; + } + + public Set getTrackedAppSet() { + return this.trackedAppSet; + } + + public MetricRegistry getMetrics() { + return metrics; + } + + public SchedulerMetrics getSchedulerMetrics() { + return schedulerMetrics; + } + + // API open to out classes + public void addTrackedApp(ApplicationAttemptId appAttemptId, + String oldAppId) { + if (metricsON) { + schedulerMetrics.trackApp(appAttemptId, oldAppId); + } + } + + public void removeTrackedApp(ApplicationAttemptId appAttemptId, + String oldAppId) { + if (metricsON) { + schedulerMetrics.untrackApp(appAttemptId, oldAppId); + } + } + + @Override + public Configuration getConf() { + return conf; + } + + + + +} + diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SchedulerWrapper.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SchedulerWrapper.java new file mode 100644 index 00000000000..44629f5347f --- /dev/null +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SchedulerWrapper.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.sls.scheduler; + +import java.util.Set; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; + +import com.codahale.metrics.MetricRegistry; + +public interface SchedulerWrapper { + + public MetricRegistry getMetrics(); + public SchedulerMetrics getSchedulerMetrics(); + public Set getQueueSet(); + public void setQueueSet(Set queues); + public Set getTrackedAppSet(); + public void setTrackedAppSet(Set apps); + public void addTrackedApp(ApplicationAttemptId appAttemptId, + String oldAppId); + public void removeTrackedApp(ApplicationAttemptId appAttemptId, + String oldAppId); + public void addAMRuntime(ApplicationId appId, + long traceStartTimeMS, long traceEndTimeMS, + long simulateStartTimeMS, long simulateEndTimeMS); + +} diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/web/SLSWebApp.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/web/SLSWebApp.java index 123ccea718a..e6dd8467898 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/web/SLSWebApp.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/web/SLSWebApp.java @@ -41,6 +41,8 @@ import org.apache.hadoop.yarn.sls.scheduler.FairSchedulerMetrics; import org.apache.hadoop.yarn.sls.scheduler.ResourceSchedulerWrapper; import org.apache.hadoop.yarn.sls.scheduler.SchedulerMetrics; +import org.apache.hadoop.yarn.sls.scheduler.SchedulerWrapper; + import com.codahale.metrics.Counter; import com.codahale.metrics.Gauge; import com.codahale.metrics.Histogram; @@ -50,7 +52,7 @@ public class SLSWebApp extends HttpServlet { private static final long serialVersionUID = 1905162041950251407L; private transient Server server; - private transient ResourceSchedulerWrapper wrapper; + private transient SchedulerWrapper wrapper; private transient MetricRegistry metrics; private transient SchedulerMetrics schedulerMetrics; // metrics objects @@ -90,7 +92,7 @@ public class SLSWebApp extends HttpServlet { } } - public SLSWebApp(ResourceSchedulerWrapper wrapper, int metricsAddressPort) { + public SLSWebApp(SchedulerWrapper wrapper, int metricsAddressPort) { this.wrapper = wrapper; metrics = wrapper.getMetrics(); handleOperTimecostHistogramMap = diff --git a/hadoop-tools/hadoop-sls/src/test/resources/capacity-scheduler.xml b/hadoop-tools/hadoop-sls/src/test/resources/capacity-scheduler.xml new file mode 100644 index 00000000000..61be96ae6d4 --- /dev/null +++ b/hadoop-tools/hadoop-sls/src/test/resources/capacity-scheduler.xml @@ -0,0 +1,60 @@ + + + + + + + + yarn.scheduler.capacity.root.queues + sls_queue_1,sls_queue_2,sls_queue_3 + The queues at the this level (root is the root queue). + + + + + yarn.scheduler.capacity.root.sls_queue_1.capacity + 25 + + + + yarn.scheduler.capacity.root.sls_queue_1.maximum-capacity + 100 + + + + yarn.scheduler.capacity.root.sls_queue_2.capacity + 25 + + + + yarn.scheduler.capacity.root.sls_queue_2.maximum-capacity + 100 + + + + yarn.scheduler.capacity.root.sls_queue_3.capacity + 50 + + + + yarn.scheduler.capacity.root.sls_queue_3.maximum-capacity + 100 + + diff --git a/hadoop-tools/hadoop-sls/src/test/resources/yarn-site.xml b/hadoop-tools/hadoop-sls/src/test/resources/yarn-site.xml index f6c6a4a983d..78aa6f2dd7a 100644 --- a/hadoop-tools/hadoop-sls/src/test/resources/yarn-site.xml +++ b/hadoop-tools/hadoop-sls/src/test/resources/yarn-site.xml @@ -17,7 +17,18 @@ yarn.resourcemanager.scheduler.class - org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler + org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler + + + + + yarn.resourcemanager.scheduler.monitor.enable + true + + + + yarn.resourcemanager.scheduler.monitor.policies + org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy