merge -r 1375065:1375066 from trunk. FIXES: YARN-27
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1375067 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
fa76c71e62
commit
8a5ab9229d
@ -51,3 +51,5 @@ Release 0.23.3 - Unreleased
|
|||||||
|
|
||||||
YARN-25. remove old aggregated logs (Robert Evans via tgraves)
|
YARN-25. remove old aggregated logs (Robert Evans via tgraves)
|
||||||
|
|
||||||
|
YARN-27. Failed refreshQueues due to misconfiguration prevents further
|
||||||
|
refreshing of queues (Arun Murthy via tgraves)
|
||||||
|
@ -26,6 +26,7 @@
|
|||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.metrics2.MetricsCollector;
|
import org.apache.hadoop.metrics2.MetricsCollector;
|
||||||
import org.apache.hadoop.metrics2.MetricsInfo;
|
import org.apache.hadoop.metrics2.MetricsInfo;
|
||||||
@ -120,14 +121,41 @@ static QueueMetrics forQueue(String queueName, Queue parent,
|
|||||||
enableUserMetrics, conf);
|
enableUserMetrics, conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static QueueMetrics forQueue(MetricsSystem ms, String queueName,
|
/**
|
||||||
|
* Helper method to clear cache - used only for unit tests.
|
||||||
|
*/
|
||||||
|
@Private
|
||||||
|
public synchronized static void clearQueueMetrics() {
|
||||||
|
queueMetrics.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Simple metrics cache to help prevent re-registrations.
|
||||||
|
*/
|
||||||
|
private static Map<String, QueueMetrics> queueMetrics =
|
||||||
|
new HashMap<String, QueueMetrics>();
|
||||||
|
|
||||||
|
public synchronized
|
||||||
|
static QueueMetrics forQueue(MetricsSystem ms, String queueName,
|
||||||
Queue parent, boolean enableUserMetrics,
|
Queue parent, boolean enableUserMetrics,
|
||||||
Configuration conf) {
|
Configuration conf) {
|
||||||
QueueMetrics metrics =
|
QueueMetrics metrics = queueMetrics.get(queueName);
|
||||||
new QueueMetrics(ms, queueName, parent, enableUserMetrics, conf
|
if (metrics == null) {
|
||||||
).tag(QUEUE_INFO, queueName);
|
metrics =
|
||||||
return ms == null ? metrics : ms.register(sourceName(queueName).toString(),
|
new QueueMetrics(ms, queueName, parent, enableUserMetrics, conf).
|
||||||
"Metrics for queue: " + queueName, metrics);
|
tag(QUEUE_INFO, queueName);
|
||||||
|
|
||||||
|
// Register with the MetricsSystems
|
||||||
|
if (ms != null) {
|
||||||
|
metrics =
|
||||||
|
ms.register(
|
||||||
|
sourceName(queueName).toString(),
|
||||||
|
"Metrics for queue: " + queueName, metrics);
|
||||||
|
}
|
||||||
|
queueMetrics.put(queueName, metrics);
|
||||||
|
}
|
||||||
|
|
||||||
|
return metrics;
|
||||||
}
|
}
|
||||||
|
|
||||||
public synchronized QueueMetrics getUserMetrics(String userName) {
|
public synchronized QueueMetrics getUserMetrics(String userName) {
|
||||||
|
@ -38,14 +38,22 @@
|
|||||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
|
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||||
import org.apache.hadoop.yarn.util.BuilderUtils;
|
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
public class TestQueueMetrics {
|
public class TestQueueMetrics {
|
||||||
static final int GB = 1024; // MB
|
static final int GB = 1024; // MB
|
||||||
private static final Configuration conf = new Configuration();
|
private static final Configuration conf = new Configuration();
|
||||||
|
|
||||||
final MetricsSystem ms = new MetricsSystemImpl();
|
private MetricsSystem ms;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() {
|
||||||
|
ms = new MetricsSystemImpl();
|
||||||
|
QueueMetrics.clearQueueMetrics();
|
||||||
|
}
|
||||||
|
|
||||||
@Test public void testDefaultSingleQueueMetrics() {
|
@Test public void testDefaultSingleQueueMetrics() {
|
||||||
String queueName = "single";
|
String queueName = "single";
|
||||||
String user = "alice";
|
String user = "alice";
|
||||||
@ -226,6 +234,37 @@ public void testQueueAppMetricsForMultipleFailures() {
|
|||||||
checkApps(userSource, 1, 0, 0, 1, 0, 0);
|
checkApps(userSource, 1, 0, 0, 1, 0, 0);
|
||||||
checkApps(parentUserSource, 1, 0, 0, 1, 0, 0);
|
checkApps(parentUserSource, 1, 0, 0, 1, 0, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMetricsCache() {
|
||||||
|
MetricsSystem ms = new MetricsSystemImpl("cache");
|
||||||
|
|
||||||
|
try {
|
||||||
|
String p1 = "root1";
|
||||||
|
String leafQueueName = "root1.leaf";
|
||||||
|
ms.start();
|
||||||
|
|
||||||
|
QueueMetrics p1Metrics =
|
||||||
|
QueueMetrics.forQueue(ms, p1, null, true, conf);
|
||||||
|
Queue parentQueue1 = make(stub(Queue.class).returning(p1Metrics).
|
||||||
|
from.getMetrics());
|
||||||
|
QueueMetrics metrics =
|
||||||
|
QueueMetrics.forQueue(ms, leafQueueName, parentQueue1, true, conf);
|
||||||
|
|
||||||
|
Assert.assertNotNull("QueueMetrics for A shoudn't be null", metrics);
|
||||||
|
|
||||||
|
// Re-register to check for cache hit, shouldn't blow up metrics-system...
|
||||||
|
// also, verify parent-metrics
|
||||||
|
QueueMetrics alterMetrics =
|
||||||
|
QueueMetrics.forQueue(ms, leafQueueName, parentQueue1, true, conf);
|
||||||
|
|
||||||
|
Assert.assertNotNull("QueueMetrics for alterMetrics shoudn't be null",
|
||||||
|
alterMetrics);
|
||||||
|
} finally {
|
||||||
|
ms.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
public static void checkApps(MetricsSource source, int submitted, int pending,
|
public static void checkApps(MetricsSource source, int submitted, int pending,
|
||||||
int running, int completed, int failed, int killed) {
|
int running, int completed, int failed, int killed) {
|
||||||
|
@ -98,7 +98,8 @@ public void setUp() throws Exception {
|
|||||||
csConf =
|
csConf =
|
||||||
new CapacitySchedulerConfiguration();
|
new CapacitySchedulerConfiguration();
|
||||||
csConf.setBoolean("yarn.scheduler.capacity.user-metrics.enable", true);
|
csConf.setBoolean("yarn.scheduler.capacity.user-metrics.enable", true);
|
||||||
setupQueueConfiguration(csConf);
|
final String newRoot = "root" + System.currentTimeMillis();
|
||||||
|
setupQueueConfiguration(csConf, newRoot);
|
||||||
YarnConfiguration conf = new YarnConfiguration();
|
YarnConfiguration conf = new YarnConfiguration();
|
||||||
cs.setConf(conf);
|
cs.setConf(conf);
|
||||||
|
|
||||||
@ -112,7 +113,8 @@ public void setUp() throws Exception {
|
|||||||
when(csContext.getClusterResources()).
|
when(csContext.getClusterResources()).
|
||||||
thenReturn(Resources.createResource(100 * 16 * GB));
|
thenReturn(Resources.createResource(100 * 16 * GB));
|
||||||
root =
|
root =
|
||||||
CapacityScheduler.parseQueue(csContext, csConf, null, "root",
|
CapacityScheduler.parseQueue(csContext, csConf, null,
|
||||||
|
CapacitySchedulerConfiguration.ROOT,
|
||||||
queues, queues,
|
queues, queues,
|
||||||
CapacityScheduler.queueComparator,
|
CapacityScheduler.queueComparator,
|
||||||
CapacityScheduler.applicationComparator,
|
CapacityScheduler.applicationComparator,
|
||||||
@ -126,25 +128,33 @@ public void setUp() throws Exception {
|
|||||||
private static final String C = "c";
|
private static final String C = "c";
|
||||||
private static final String C1 = "c1";
|
private static final String C1 = "c1";
|
||||||
private static final String D = "d";
|
private static final String D = "d";
|
||||||
private void setupQueueConfiguration(CapacitySchedulerConfiguration conf) {
|
private void setupQueueConfiguration(
|
||||||
|
CapacitySchedulerConfiguration conf,
|
||||||
|
final String newRoot) {
|
||||||
|
|
||||||
// Define top-level queues
|
// Define top-level queues
|
||||||
conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {A, B, C, D});
|
conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {newRoot});
|
||||||
conf.setCapacity(CapacitySchedulerConfiguration.ROOT, 100);
|
conf.setCapacity(CapacitySchedulerConfiguration.ROOT, 100);
|
||||||
conf.setMaximumCapacity(CapacitySchedulerConfiguration.ROOT, 100);
|
conf.setMaximumCapacity(CapacitySchedulerConfiguration.ROOT, 100);
|
||||||
conf.setAcl(CapacitySchedulerConfiguration.ROOT, QueueACL.SUBMIT_APPLICATIONS, " ");
|
conf.setAcl(CapacitySchedulerConfiguration.ROOT, QueueACL.SUBMIT_APPLICATIONS, " ");
|
||||||
|
|
||||||
final String Q_A = CapacitySchedulerConfiguration.ROOT + "." + A;
|
final String Q_newRoot = CapacitySchedulerConfiguration.ROOT + "." + newRoot;
|
||||||
|
conf.setQueues(Q_newRoot, new String[] {A, B, C, D});
|
||||||
|
conf.setCapacity(Q_newRoot, 100);
|
||||||
|
conf.setMaximumCapacity(Q_newRoot, 100);
|
||||||
|
conf.setAcl(Q_newRoot, QueueACL.SUBMIT_APPLICATIONS, " ");
|
||||||
|
|
||||||
|
final String Q_A = Q_newRoot + "." + A;
|
||||||
conf.setCapacity(Q_A, 8.5f);
|
conf.setCapacity(Q_A, 8.5f);
|
||||||
conf.setMaximumCapacity(Q_A, 20);
|
conf.setMaximumCapacity(Q_A, 20);
|
||||||
conf.setAcl(Q_A, QueueACL.SUBMIT_APPLICATIONS, "*");
|
conf.setAcl(Q_A, QueueACL.SUBMIT_APPLICATIONS, "*");
|
||||||
|
|
||||||
final String Q_B = CapacitySchedulerConfiguration.ROOT + "." + B;
|
final String Q_B = Q_newRoot + "." + B;
|
||||||
conf.setCapacity(Q_B, 80);
|
conf.setCapacity(Q_B, 80);
|
||||||
conf.setMaximumCapacity(Q_B, 99);
|
conf.setMaximumCapacity(Q_B, 99);
|
||||||
conf.setAcl(Q_B, QueueACL.SUBMIT_APPLICATIONS, "*");
|
conf.setAcl(Q_B, QueueACL.SUBMIT_APPLICATIONS, "*");
|
||||||
|
|
||||||
final String Q_C = CapacitySchedulerConfiguration.ROOT + "." + C;
|
final String Q_C = Q_newRoot + "." + C;
|
||||||
conf.setCapacity(Q_C, 1.5f);
|
conf.setCapacity(Q_C, 1.5f);
|
||||||
conf.setMaximumCapacity(Q_C, 10);
|
conf.setMaximumCapacity(Q_C, 10);
|
||||||
conf.setAcl(Q_C, QueueACL.SUBMIT_APPLICATIONS, " ");
|
conf.setAcl(Q_C, QueueACL.SUBMIT_APPLICATIONS, " ");
|
||||||
@ -154,7 +164,7 @@ private void setupQueueConfiguration(CapacitySchedulerConfiguration conf) {
|
|||||||
final String Q_C1 = Q_C + "." + C1;
|
final String Q_C1 = Q_C + "." + C1;
|
||||||
conf.setCapacity(Q_C1, 100);
|
conf.setCapacity(Q_C1, 100);
|
||||||
|
|
||||||
final String Q_D = CapacitySchedulerConfiguration.ROOT + "." + D;
|
final String Q_D = Q_newRoot + "." + D;
|
||||||
conf.setCapacity(Q_D, 10);
|
conf.setCapacity(Q_D, 10);
|
||||||
conf.setMaximumCapacity(Q_D, 11);
|
conf.setMaximumCapacity(Q_D, 11);
|
||||||
conf.setAcl(Q_D, QueueACL.SUBMIT_APPLICATIONS, "user_d");
|
conf.setAcl(Q_D, QueueACL.SUBMIT_APPLICATIONS, "user_d");
|
||||||
|
Loading…
x
Reference in New Issue
Block a user