YARN-3609. Load node labels from storage inside RM serviceStart. Contributed by Wangda Tan

This commit is contained in:
Jian He 2015-05-20 17:06:52 -07:00
parent b275818925
commit ce45e4e82e
6 changed files with 120 additions and 11 deletions

View File

@ -109,6 +109,9 @@ Release 2.7.1 - UNRELEASED
YARN-3681. yarn cmd says "could not find main class 'queue'" in windows. YARN-3681. yarn cmd says "could not find main class 'queue'" in windows.
(Craig Welch and Varun Saxena via xgong) (Craig Welch and Varun Saxena via xgong)
YARN-3609. Load node labels from storage inside RM serviceStart. (Wangda
Tan via jianhe)
Release 2.7.0 - 2015-04-20 Release 2.7.0 - 2015-04-20
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -210,9 +210,6 @@ public class CommonNodeLabelsManager extends AbstractService {
nodeLabelsEnabled = nodeLabelsEnabled =
conf.getBoolean(YarnConfiguration.NODE_LABELS_ENABLED, conf.getBoolean(YarnConfiguration.NODE_LABELS_ENABLED,
YarnConfiguration.DEFAULT_NODE_LABELS_ENABLED); YarnConfiguration.DEFAULT_NODE_LABELS_ENABLED);
if (nodeLabelsEnabled) {
initNodeLabelStore(conf);
}
labelCollections.put(NO_LABEL, new NodeLabel(NO_LABEL)); labelCollections.put(NO_LABEL, new NodeLabel(NO_LABEL));
} }
@ -232,6 +229,10 @@ public class CommonNodeLabelsManager extends AbstractService {
@Override @Override
protected void serviceStart() throws Exception { protected void serviceStart() throws Exception {
if (nodeLabelsEnabled) {
initNodeLabelStore(getConfig());
}
// init dispatcher only when service start, because recover will happen in // init dispatcher only when service start, because recover will happen in
// service init, we don't want to trigger any event handling at that time. // service init, we don't want to trigger any event handling at that time.
initDispatcher(getConfig()); initDispatcher(getConfig());

View File

@ -107,6 +107,7 @@ public class TestFileSystemNodeLabelsStore extends NodeLabelTestBase {
mgr = new MockNodeLabelManager(); mgr = new MockNodeLabelManager();
mgr.init(conf); mgr.init(conf);
mgr.start();
// check variables // check variables
Assert.assertEquals(3, mgr.getClusterNodeLabels().size()); Assert.assertEquals(3, mgr.getClusterNodeLabels().size());
@ -126,6 +127,7 @@ public class TestFileSystemNodeLabelsStore extends NodeLabelTestBase {
mgr.stop(); mgr.stop();
mgr = new MockNodeLabelManager(); mgr = new MockNodeLabelManager();
mgr.init(conf); mgr.init(conf);
mgr.start();
// check variables // check variables
Assert.assertEquals(3, mgr.getClusterNodeLabels().size()); Assert.assertEquals(3, mgr.getClusterNodeLabels().size());
@ -170,6 +172,7 @@ public class TestFileSystemNodeLabelsStore extends NodeLabelTestBase {
mgr = new MockNodeLabelManager(); mgr = new MockNodeLabelManager();
mgr.init(conf); mgr.init(conf);
mgr.start();
// check variables // check variables
Assert.assertEquals(3, mgr.getClusterNodeLabels().size()); Assert.assertEquals(3, mgr.getClusterNodeLabels().size());

View File

@ -94,6 +94,8 @@ import org.junit.Assert;
public class MockRM extends ResourceManager { public class MockRM extends ResourceManager {
static final String ENABLE_WEBAPP = "mockrm.webapp.enabled"; static final String ENABLE_WEBAPP = "mockrm.webapp.enabled";
final private boolean useNullRMNodeLabelsManager;
public MockRM() { public MockRM() {
this(new YarnConfiguration()); this(new YarnConfiguration());
@ -104,20 +106,31 @@ public class MockRM extends ResourceManager {
} }
public MockRM(Configuration conf, RMStateStore store) { public MockRM(Configuration conf, RMStateStore store) {
super(); this(conf, store, true);
}
public MockRM(Configuration conf, RMStateStore store,
boolean useNullRMNodeLabelsManager) {
super();
this.useNullRMNodeLabelsManager = useNullRMNodeLabelsManager;
init(conf instanceof YarnConfiguration ? conf : new YarnConfiguration(conf)); init(conf instanceof YarnConfiguration ? conf : new YarnConfiguration(conf));
if(store != null) { if(store != null) {
setRMStateStore(store); setRMStateStore(store);
} }
Logger rootLogger = LogManager.getRootLogger(); Logger rootLogger = LogManager.getRootLogger();
rootLogger.setLevel(Level.DEBUG); rootLogger.setLevel(Level.DEBUG);
} }
@Override @Override
protected RMNodeLabelsManager createNodeLabelManager() { protected RMNodeLabelsManager createNodeLabelManager()
RMNodeLabelsManager mgr = new NullRMNodeLabelsManager(); throws InstantiationException, IllegalAccessException {
mgr.init(getConfig()); if (useNullRMNodeLabelsManager) {
return mgr; RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
mgr.init(getConfig());
return mgr;
} else {
return super.createNodeLabelManager();
}
} }
public void waitForState(ApplicationId appId, RMAppState finalState) public void waitForState(ApplicationId appId, RMAppState finalState)

View File

@ -106,8 +106,8 @@ public class RMHATestBase extends ClientBaseWithFixes{
} }
protected void startRMs() throws IOException { protected void startRMs() throws IOException {
rm1 = new MockRM(confForRM1); rm1 = new MockRM(confForRM1, null, false);
rm2 = new MockRM(confForRM2); rm2 = new MockRM(confForRM2, null, false);
startRMs(rm1, confForRM1, rm2, confForRM2); startRMs(rm1, confForRM1, rm2, confForRM2);
} }

View File

@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager;
import java.io.File;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import com.google.common.collect.ImmutableSet;
public class TestRMHAForNodeLabels extends RMHATestBase {
public static final Log LOG = LogFactory
.getLog(TestSubmitApplicationWithRMHA.class);
@Before
@Override
public void setup() throws Exception {
super.setup();
// Create directory for node label store
File tempDir = File.createTempFile("nlb", ".tmp");
tempDir.delete();
tempDir.mkdirs();
tempDir.deleteOnExit();
confForRM1.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true);
confForRM1.set(YarnConfiguration.FS_NODE_LABELS_STORE_ROOT_DIR,
tempDir.getAbsolutePath());
confForRM2.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true);
confForRM2.set(YarnConfiguration.FS_NODE_LABELS_STORE_ROOT_DIR,
tempDir.getAbsolutePath());
}
@Test
public void testRMHARecoverNodeLabels() throws Exception {
// start two RMs, and transit rm1 to active, rm2 to standby
startRMs();
// Add labels to rm1
rm1.getRMContext()
.getNodeLabelManager()
.addToCluserNodeLabels(ImmutableSet.of("a", "b", "c"));
Map<NodeId, Set<String>> nodeToLabels = new HashMap<>();
nodeToLabels.put(NodeId.newInstance("host1", 0), ImmutableSet.of("a"));
nodeToLabels.put(NodeId.newInstance("host2", 0), ImmutableSet.of("b"));
rm1.getRMContext().getNodeLabelManager().replaceLabelsOnNode(nodeToLabels);
// Do the failover
explicitFailover();
// Check labels in rm2
Assert
.assertTrue(rm2.getRMContext().getNodeLabelManager()
.getClusterNodeLabels()
.containsAll(ImmutableSet.of("a", "b", "c")));
Assert.assertTrue(rm2.getRMContext().getNodeLabelManager()
.getNodeLabels().get(NodeId.newInstance("host1", 0)).contains("a"));
Assert.assertTrue(rm2.getRMContext().getNodeLabelManager()
.getNodeLabels().get(NodeId.newInstance("host2", 0)).contains("b"));
}
}