YARN-6451. Add RM monitor validating metrics invariants. Contributed by Carlo Curino

This commit is contained in:
Chris Douglas 2017-04-18 10:28:50 -07:00
parent a9f07e0d3e
commit af8e9842d2
7 changed files with 502 additions and 0 deletions

View File

@ -347,6 +347,7 @@
<exclude>src/test/resources/submit-reservation.json</exclude>
<exclude>src/test/resources/delete-reservation.json</exclude>
<exclude>src/test/resources/update-reservation.json</exclude>
<exclude>src/test/resources/invariants.txt</exclude>
</excludes>
</configuration>
</plugin>

View File

@ -0,0 +1,35 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.monitor.invariants;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
/**
* This exception represents the violation of an internal invariant.
*/
public class InvariantViolationException extends YarnRuntimeException {
public InvariantViolationException(String s) {
super(s);
}
public InvariantViolationException(String s, Exception e) {
super(s, e);
}
}

View File

@ -0,0 +1,96 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.monitor.invariants;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Abstract invariant checker, that setup common context for invariants
* checkers.
*/
public abstract class InvariantsChecker implements SchedulingEditPolicy {
private static final Logger LOG =
LoggerFactory.getLogger(InvariantsChecker.class);
public static final String THROW_ON_VIOLATION =
"yarn.resourcemanager.invariant-checker.throw-on-violation";
public static final String INVARIANT_MONITOR_INTERVAL =
"yarn.resourcemanager.invariant-checker.monitor-interval";
private Configuration conf;
private RMContext context;
private PreemptableResourceScheduler scheduler;
private boolean throwOnInvariantViolation;
private long monitoringInterval;
@Override
public void init(Configuration config, RMContext rmContext,
PreemptableResourceScheduler preemptableResourceScheduler) {
this.conf = config;
this.context = rmContext;
this.scheduler = preemptableResourceScheduler;
this.throwOnInvariantViolation =
conf.getBoolean(InvariantsChecker.THROW_ON_VIOLATION, false);
this.monitoringInterval =
conf.getLong(InvariantsChecker.INVARIANT_MONITOR_INTERVAL, 1000L);
LOG.info("Invariant checker " + this.getPolicyName()
+ " enabled. Monitoring every " + monitoringInterval
+ "ms, throwOnViolation=" + throwOnInvariantViolation);
}
@Override
public long getMonitoringInterval() {
return monitoringInterval;
}
@Override
public String getPolicyName() {
return this.getClass().getSimpleName();
}
public void logOrThrow(String message) throws InvariantViolationException {
if (getThrowOnInvariantViolation()) {
throw new InvariantViolationException(message);
} else {
LOG.warn(message);
}
}
public boolean getThrowOnInvariantViolation() {
return throwOnInvariantViolation;
}
public Configuration getConf() {
return conf;
}
public RMContext getContext() {
return context;
}
public PreemptableResourceScheduler getScheduler() {
return scheduler;
}
}

View File

@ -0,0 +1,195 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.monitor.invariants;
import com.google.common.base.Charsets;
import com.google.common.io.Files;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.AbstractMetric;
import org.apache.hadoop.metrics2.MetricsRecord;
import org.apache.hadoop.metrics2.MetricsSystem;
import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.source.JvmMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.script.Compilable;
import javax.script.CompiledScript;
import javax.script.ScriptEngineManager;
import javax.script.ScriptException;
import javax.script.SimpleBindings;
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* This policy checks at every invocation that a given set of invariants
* (specified in a file) are respected over QueueMetrics and JvmMetrics. The
* file may contain arbitrary (Javascrip) boolean expression over the metrics
* variables.
*
* The right set of invariants depends on the deployment environment, a large
* number of complex invariant can make this check expensive.
*
* The MetricsInvariantChecker can be configured to throw a RuntimeException or
* simlpy warn in the logs if an invariant is not respected.
*/
public class MetricsInvariantChecker extends InvariantsChecker {
private static final Logger LOG =
LoggerFactory.getLogger(MetricsInvariantChecker.class);
public static final String INVARIANTS_FILE =
"yarn.resourcemanager.invariant-checker.file";
private MetricsSystem metricsSystem;
private MetricsCollectorImpl collector;
private SimpleBindings bindings;
private ScriptEngineManager manager;
private Compilable scriptEngine;
private String invariantFile;
private Map<String, CompiledScript> invariants;
private CompiledScript combinedInvariants;
// set of metrics we monitor
private QueueMetrics queueMetrics;
private JvmMetrics jvmMetrics;
@Override
public void init(Configuration config, RMContext rmContext,
PreemptableResourceScheduler preemptableResourceScheduler) {
super.init(config, rmContext, preemptableResourceScheduler);
this.metricsSystem = DefaultMetricsSystem.instance();
this.queueMetrics =
QueueMetrics.forQueue(metricsSystem, "root", null, false, getConf());
this.jvmMetrics = (JvmMetrics) metricsSystem.getSource("JvmMetrics");
// at first collect all metrics
collector = new MetricsCollectorImpl();
queueMetrics.getMetrics(collector, true);
jvmMetrics.getMetrics(collector, true);
// prepare bindings and evaluation engine
this.bindings = new SimpleBindings();
this.manager = new ScriptEngineManager();
this.scriptEngine = (Compilable) manager.getEngineByName("JavaScript");
// load metrics invariant from file
this.invariantFile = getConf().get(MetricsInvariantChecker.INVARIANTS_FILE);
this.invariants = new HashMap<>();
// preload all bindings
queueMetrics.getMetrics(collector, true);
jvmMetrics.getMetrics(collector, true);
for (MetricsRecord record : collector.getRecords()) {
for (AbstractMetric am : record.metrics()) {
bindings.put(am.name().replace(' ', '_'), am.value());
}
}
StringBuilder sb = new StringBuilder();
try {
List<String> tempInv =
Files.readLines(new File(invariantFile), Charsets.UTF_8);
boolean first = true;
// precompile individual invariants
for (String inv : tempInv) {
if(first) {
first = false;
} else {
sb.append("&&");
}
invariants.put(inv, scriptEngine.compile(inv));
sb.append(" (");
sb.append(inv);
sb.append(") ");
}
// create a single large combined invariant for speed of checking
combinedInvariants = scriptEngine.compile(sb.toString());
} catch (IOException e) {
throw new RuntimeException(
"Error loading invariant file: " + e.getMessage());
} catch (ScriptException e) {
throw new RuntimeException("Error compiling invariant " + e.getMessage());
}
}
@Override
public void editSchedule() {
// grab all changed metrics and update bindings
collector.clear();
queueMetrics.getMetrics(collector, false);
jvmMetrics.getMetrics(collector, false);
for (MetricsRecord record : collector.getRecords()) {
for (AbstractMetric am : record.metrics()) {
bindings.put(am.name().replace(' ', '_'), am.value());
}
}
// evaluate all invariants with new bindings
try {
// fastpath check all invariants at once (much faster)
boolean allInvHold = (boolean) combinedInvariants.eval(bindings);
// if any fails, check individually to produce more insightful log
if (!allInvHold) {
for (Map.Entry<String, CompiledScript> e : invariants.entrySet()) {
boolean invariantsHold = (boolean) e.getValue().eval(bindings);
if (!invariantsHold) {
// filter bindings to produce minimal set
Map<String, Object> matchingBindings =
extractMatchingBindings(e.getKey(), bindings);
logOrThrow("Invariant \"" + e.getKey()
+ "\" is NOT holding, with bindings: " + matchingBindings);
}
}
}
} catch (ScriptException e) {
logOrThrow(e.getMessage());
}
}
private static Map<String, Object> extractMatchingBindings(String inv,
SimpleBindings allBindings) {
Map<String, Object> matchingBindings = new HashMap<>();
for (Map.Entry<String, Object> s : allBindings.entrySet()) {
if (inv.contains(s.getKey())) {
matchingBindings.put(s.getKey(), s.getValue());
}
}
return matchingBindings;
}
}

View File

@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Monitoring policies, used to check invariants.
*/
package org.apache.hadoop.yarn.server.resourcemanager.monitor.invariants;

View File

@ -0,0 +1,99 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.monitor.invariants;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.MetricsSystem;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.source.JvmMetrics;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.log4j.Logger;
import org.junit.Before;
import org.junit.Test;
import static junit.framework.TestCase.fail;
/**
* This class tests the {@code MetricsInvariantChecker} by running it multiple
* time and reporting the time it takes to execute, as well as verifying that
* the invariant throws in case the invariants are not respected.
*/
public class TestMetricsInvariantChecker {
public final static Logger LOG =
Logger.getLogger(TestMetricsInvariantChecker.class);
private MetricsSystem metricsSystem;
private MetricsInvariantChecker ic;
private Configuration conf;
@Before
public void setup() {
this.metricsSystem = DefaultMetricsSystem.instance();
JvmMetrics.initSingleton("ResourceManager", null);
this.ic = new MetricsInvariantChecker();
this.conf = new Configuration();
conf.set(MetricsInvariantChecker.INVARIANTS_FILE,
"src/test/resources/invariants.txt");
conf.setBoolean(MetricsInvariantChecker.THROW_ON_VIOLATION, true);
ic.init(conf, null, null);
}
@Test(timeout = 5000)
public void testManyRuns() {
QueueMetrics qm =
QueueMetrics.forQueue(metricsSystem, "root", null, false, conf);
qm.setAvailableResourcesToQueue(Resource.newInstance(1, 1));
int numIterations = 1000;
long start = System.currentTimeMillis();
for (int i = 0; i < numIterations; i++) {
ic.editSchedule();
}
long end = System.currentTimeMillis();
System.out.println("Runtime per iteration (avg of " + numIterations
+ " iterations): " + (end - start) + " tot time");
}
@Test
public void testViolation() {
// create a "wrong" condition in which the invariants are not respected
QueueMetrics qm =
QueueMetrics.forQueue(metricsSystem, "root", null, false, conf);
qm.setAvailableResourcesToQueue(Resource.newInstance(-1, -1));
// test with throwing exception turned on
try {
ic.editSchedule();
fail();
} catch (InvariantViolationException i) {
// expected
}
// test log-only mode
conf.setBoolean(MetricsInvariantChecker.THROW_ON_VIOLATION, false);
ic.init(conf, null, null);
ic.editSchedule();
}
}

View File

@ -0,0 +1,54 @@
running_0 >= 0
running_60 >= 0
running_300 >= 0
running_1440 >= 0
AppsSubmitted >= 0
AppsRunning >= 0
AppsPending >= 0
AppsCompleted >= 0
AppsKilled >= 0
AppsFailed >= 0
AllocatedMB >= 0
AllocatedVCores >= 0
AllocatedContainers >= 0
AggregateContainersAllocated >= 0
AggregateNodeLocalContainersAllocated >= 0
AggregateRackLocalContainersAllocated >= 0
AggregateOffSwitchContainersAllocated >= 0
AggregateContainersReleased >= 0
AggregateContainersPreempted >= 0
AvailableMB >= 0
AvailableVCores >= 0
PendingMB >= 0
PendingVCores >= 0
PendingContainers >= 0
ReservedMB >= 0
ReservedVCores >= 0
ReservedContainers >= 0
ActiveUsers >= 0
ActiveApplications >= 0
AppAttemptFirstContainerAllocationDelayNumOps >= 0
AppAttemptFirstContainerAllocationDelayAvgTime >= 0
MemNonHeapUsedM >= 0
MemNonHeapCommittedM >= 0
MemNonHeapMaxM >= 0 || MemNonHeapMaxM == -1
MemHeapUsedM >= 0
MemHeapCommittedM >= 0
MemHeapMaxM >= 0
MemMaxM >= 0
GcCountPS_Scavenge >= 0
GcTimeMillisPS_Scavenge >= 0
GcCountPS_MarkSweep >= 0
GcTimeMillisPS_MarkSweep >= 0
GcCount >= 0
GcTimeMillis >= 0
ThreadsNew >= 0
ThreadsRunnable >= 0
ThreadsBlocked >= 0
ThreadsWaiting >= 0
ThreadsTimedWaiting >= 0
ThreadsTerminated >= 0
LogFatal >= 0
LogError >= 0
LogWarn >= 0
LogInfo >= 0