YARN-4405. Support node label store in non-appendable file system. Contributed by Wangda Tan

(cherry picked from commit 755dda8dd8bb23864abc752bad506f223fcac010)

(cherry picked from commit be404e168060c03a33a35c8d30925e317a64cad7)
This commit is contained in:
Jian He 2015-12-03 17:45:31 -08:00 committed by Wangda Tan
parent 0b88a7f259
commit f355645ca4
11 changed files with 107 additions and 62 deletions

View File

@ -18,27 +18,22 @@
package org.apache.hadoop.conf; package org.apache.hadoop.conf;
import java.lang.Class; import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import java.lang.reflect.Field; import java.lang.reflect.Field;
import java.lang.reflect.Modifier; import java.lang.reflect.Modifier;
import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry;
import java.util.Set; import java.util.Set;
import java.util.regex.Matcher; import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import org.apache.hadoop.conf.Configuration;
/** /**
* Base class for comparing fields in one or more Configuration classes * Base class for comparing fields in one or more Configuration classes
* against a corresponding .xml file. Usage is intended as follows: * against a corresponding .xml file. Usage is intended as follows:
@ -331,6 +326,7 @@ public abstract class TestConfigurationFieldsBase {
private static Set<String> compareConfigurationToXmlFields(Map<String,String> keyMap1, Map<String,String> keyMap2) { private static Set<String> compareConfigurationToXmlFields(Map<String,String> keyMap1, Map<String,String> keyMap2) {
Set<String> retVal = new HashSet<String>(keyMap1.keySet()); Set<String> retVal = new HashSet<String>(keyMap1.keySet());
retVal.removeAll(keyMap2.keySet()); retVal.removeAll(keyMap2.keySet());
return retVal; return retVal;
} }

View File

@ -539,6 +539,9 @@ Release 2.8.0 - UNRELEASED
YARN-4292. ResourceUtilization should be a part of NodeInfo REST API. YARN-4292. ResourceUtilization should be a part of NodeInfo REST API.
(Sunil G via wangda) (Sunil G via wangda)
YARN-4405. Support node label store in non-appendable file system. (Wangda
Tan via jianhe)
OPTIMIZATIONS OPTIMIZATIONS
YARN-3339. TestDockerContainerExecutor should pull a single image and not YARN-3339. TestDockerContainerExecutor should pull a single image and not

View File

@ -2075,6 +2075,12 @@ private static void addDeprecatedKeys() {
*/ */
public static final String NODE_LABELS_PREFIX = YARN_PREFIX + "node-labels."; public static final String NODE_LABELS_PREFIX = YARN_PREFIX + "node-labels.";
/** Node label store implementation class */
public static final String FS_NODE_LABELS_STORE_IMPL_CLASS = NODE_LABELS_PREFIX
+ "fs-store.impl.class";
public static final String DEFAULT_FS_NODE_LABELS_STORE_IMPL_CLASS =
"org.apache.hadoop.yarn.nodelabels.FileSystemNodeLabelsStore";
/** URI for NodeLabelManager */ /** URI for NodeLabelManager */
public static final String FS_NODE_LABELS_STORE_ROOT_DIR = NODE_LABELS_PREFIX public static final String FS_NODE_LABELS_STORE_ROOT_DIR = NODE_LABELS_PREFIX
+ "fs-store.root-dir"; + "fs-store.root-dir";

View File

@ -48,6 +48,8 @@ public void initializeMemberVariables() {
errorIfMissingXmlProps = true; errorIfMissingXmlProps = true;
// Specific properties to skip // Specific properties to skip
configurationPropsToSkipCompare
.add(YarnConfiguration.DEFAULT_FS_NODE_LABELS_STORE_IMPL_CLASS);
configurationPropsToSkipCompare configurationPropsToSkipCompare
.add(YarnConfiguration.DEFAULT_RM_CONFIGURATION_PROVIDER_CLASS); .add(YarnConfiguration.DEFAULT_RM_CONFIGURATION_PROVIDER_CLASS);
configurationPropsToSkipCompare configurationPropsToSkipCompare

View File

@ -42,6 +42,7 @@
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeLabel; import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
@ -224,10 +225,20 @@ protected void serviceInit(Configuration conf) throws Exception {
labelCollections.put(NO_LABEL, new RMNodeLabel(NO_LABEL)); labelCollections.put(NO_LABEL, new RMNodeLabel(NO_LABEL));
} }
boolean isCentralizedConfiguration() {
return isCentralizedNodeLabelConfiguration;
}
protected void initNodeLabelStore(Configuration conf) throws Exception { protected void initNodeLabelStore(Configuration conf) throws Exception {
this.store = new FileSystemNodeLabelsStore(this); this.store =
ReflectionUtils
.newInstance(
conf.getClass(YarnConfiguration.FS_NODE_LABELS_STORE_IMPL_CLASS,
FileSystemNodeLabelsStore.class, NodeLabelsStore.class),
conf);
this.store.setNodeLabelsManager(this);
this.store.init(conf); this.store.init(conf);
this.store.recover(!isCentralizedNodeLabelConfiguration); this.store.recover();
} }
// for UT purpose // for UT purpose

View File

@ -52,11 +52,6 @@
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
public class FileSystemNodeLabelsStore extends NodeLabelsStore { public class FileSystemNodeLabelsStore extends NodeLabelsStore {
public FileSystemNodeLabelsStore(CommonNodeLabelsManager mgr) {
super(mgr);
}
protected static final Log LOG = LogFactory.getLog(FileSystemNodeLabelsStore.class); protected static final Log LOG = LogFactory.getLog(FileSystemNodeLabelsStore.class);
protected static final String DEFAULT_DIR_NAME = "node-labels"; protected static final String DEFAULT_DIR_NAME = "node-labels";
@ -69,8 +64,8 @@ protected enum SerializedLogType {
Path fsWorkingPath; Path fsWorkingPath;
FileSystem fs; FileSystem fs;
FSDataOutputStream editlogOs; private FSDataOutputStream editlogOs;
Path editLogPath; private Path editLogPath;
private String getDefaultFSNodeLabelsRootDir() throws IOException { private String getDefaultFSNodeLabelsRootDir() throws IOException {
// default is in local: /tmp/hadoop-yarn-${user}/node-labels/ // default is in local: /tmp/hadoop-yarn-${user}/node-labels/
@ -161,11 +156,39 @@ public void removeClusterNodeLabels(Collection<String> labels)
} }
} }
protected void loadFromMirror(Path newMirrorPath, Path oldMirrorPath)
throws IOException {
// If mirror.new exists, read from mirror.new,
FSDataInputStream is = null;
if (fs.exists(newMirrorPath)) {
is = fs.open(newMirrorPath);
} else if (fs.exists(oldMirrorPath)) {
is = fs.open(oldMirrorPath);
}
if (null != is) {
List<NodeLabel> labels = new AddToClusterNodeLabelsRequestPBImpl(
AddToClusterNodeLabelsRequestProto.parseDelimitedFrom(is))
.getNodeLabels();
mgr.addToCluserNodeLabels(labels);
if (mgr.isCentralizedConfiguration()) {
// Only load node to labels mapping while using centralized configuration
Map<NodeId, Set<String>> nodeToLabels =
new ReplaceLabelsOnNodeRequestPBImpl(
ReplaceLabelsOnNodeRequestProto.parseDelimitedFrom(is))
.getNodeToLabels();
mgr.replaceLabelsOnNode(nodeToLabels);
}
is.close();
}
}
/* (non-Javadoc) /* (non-Javadoc)
* @see org.apache.hadoop.yarn.nodelabels.NodeLabelsStore#recover(boolean) * @see org.apache.hadoop.yarn.nodelabels.NodeLabelsStore#recover(boolean)
*/ */
@Override @Override
public void recover(boolean ignoreNodeToLabelsMappings) throws YarnException, public void recover() throws YarnException,
IOException { IOException {
/* /*
* Steps of recover * Steps of recover
@ -182,30 +205,12 @@ public void recover(boolean ignoreNodeToLabelsMappings) throws YarnException,
Path mirrorPath = new Path(fsWorkingPath, MIRROR_FILENAME); Path mirrorPath = new Path(fsWorkingPath, MIRROR_FILENAME);
Path oldMirrorPath = new Path(fsWorkingPath, MIRROR_FILENAME + ".old"); Path oldMirrorPath = new Path(fsWorkingPath, MIRROR_FILENAME + ".old");
FSDataInputStream is = null; loadFromMirror(mirrorPath, oldMirrorPath);
if (fs.exists(mirrorPath)) {
is = fs.open(mirrorPath);
} else if (fs.exists(oldMirrorPath)) {
is = fs.open(oldMirrorPath);
}
if (null != is) {
List<NodeLabel> labels =
new AddToClusterNodeLabelsRequestPBImpl(
AddToClusterNodeLabelsRequestProto.parseDelimitedFrom(is)).getNodeLabels();
Map<NodeId, Set<String>> nodeToLabels =
new ReplaceLabelsOnNodeRequestPBImpl(
ReplaceLabelsOnNodeRequestProto.parseDelimitedFrom(is))
.getNodeToLabels();
mgr.addToCluserNodeLabels(labels);
mgr.replaceLabelsOnNode(nodeToLabels);
is.close();
}
// Open and process editlog // Open and process editlog
editLogPath = new Path(fsWorkingPath, EDITLOG_FILENAME); editLogPath = new Path(fsWorkingPath, EDITLOG_FILENAME);
if (fs.exists(editLogPath)) { if (fs.exists(editLogPath)) {
is = fs.open(editLogPath); FSDataInputStream is = fs.open(editLogPath);
while (true) { while (true) {
try { try {
@ -233,7 +238,7 @@ public void recover(boolean ignoreNodeToLabelsMappings) throws YarnException,
new ReplaceLabelsOnNodeRequestPBImpl( new ReplaceLabelsOnNodeRequestPBImpl(
ReplaceLabelsOnNodeRequestProto.parseDelimitedFrom(is)) ReplaceLabelsOnNodeRequestProto.parseDelimitedFrom(is))
.getNodeToLabels(); .getNodeToLabels();
if (!ignoreNodeToLabelsMappings) { if (mgr.isCentralizedConfiguration()) {
/* /*
* In case of Distributed NodeLabels setup, * In case of Distributed NodeLabels setup,
* ignoreNodeToLabelsMappings will be set to true and recover will * ignoreNodeToLabelsMappings will be set to true and recover will

View File

@ -31,11 +31,7 @@
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
public abstract class NodeLabelsStore implements Closeable { public abstract class NodeLabelsStore implements Closeable {
protected final CommonNodeLabelsManager mgr; protected CommonNodeLabelsManager mgr;
public NodeLabelsStore(CommonNodeLabelsManager mgr) {
this.mgr = mgr;
}
/** /**
* Store node {@literal ->} label * Store node {@literal ->} label
@ -62,16 +58,14 @@ public abstract void removeClusterNodeLabels(Collection<String> labels)
* ignoreNodeToLabelsMappings will be set to true and recover will be invoked * ignoreNodeToLabelsMappings will be set to true and recover will be invoked
* as RM will collect the node labels from NM through registration/HB * as RM will collect the node labels from NM through registration/HB
* *
* @param ignoreNodeToLabelsMappings
* @throws IOException * @throws IOException
* @throws YarnException * @throws YarnException
*/ */
public abstract void recover(boolean ignoreNodeToLabelsMappings) public abstract void recover() throws IOException, YarnException;
throws IOException, YarnException;
public void init(Configuration conf) throws Exception {} public void init(Configuration conf) throws Exception {}
public CommonNodeLabelsManager getNodeLabelsManager() { public void setNodeLabelsManager(CommonNodeLabelsManager mgr) {
return mgr; this.mgr = mgr;
} }
} }

View File

@ -2465,4 +2465,12 @@
<name>yarn.am.blacklisting.disable-failure-threshold</name> <name>yarn.am.blacklisting.disable-failure-threshold</name>
<value>0.8f</value> <value>0.8f</value>
</property> </property>
<property>
<description>
Choose different implementation of node label's storage
</description>
<name>yarn.node-labels.fs-store.impl.class</name>
<value>org.apache.hadoop.yarn.nodelabels.FileSystemNodeLabelsStore</value>
</property>
</configuration> </configuration>

View File

@ -36,10 +36,10 @@ public class DummyCommonNodeLabelsManager extends CommonNodeLabelsManager {
@Override @Override
public void initNodeLabelStore(Configuration conf) { public void initNodeLabelStore(Configuration conf) {
this.store = new NodeLabelsStore(this) { this.store = new NodeLabelsStore() {
@Override @Override
public void recover(boolean ignoreNodeToLabelsMappings) public void recover()
throws IOException { throws IOException {
} }
@ -65,6 +65,8 @@ public void close() throws IOException {
// do nothing // do nothing
} }
}; };
this.store.setNodeLabelsManager(this);
} }
@Override @Override

View File

@ -21,6 +21,7 @@
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection;
import java.util.Map; import java.util.Map;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -33,13 +34,17 @@
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith;
import com.google.common.collect.ImmutableMap; import org.junit.runners.Parameterized;
import org.mockito.Mockito; import org.mockito.Mockito;
import com.google.common.collect.ImmutableMap;
@RunWith(Parameterized.class)
public class TestFileSystemNodeLabelsStore extends NodeLabelTestBase { public class TestFileSystemNodeLabelsStore extends NodeLabelTestBase {
MockNodeLabelManager mgr = null; MockNodeLabelManager mgr = null;
Configuration conf = null; Configuration conf = null;
String storeClassName = null;
private static class MockNodeLabelManager extends private static class MockNodeLabelManager extends
CommonNodeLabelsManager { CommonNodeLabelsManager {
@ -59,8 +64,15 @@ protected void stopDispatcher() {
} }
} }
private FileSystemNodeLabelsStore getStore() { public TestFileSystemNodeLabelsStore(String className) {
return (FileSystemNodeLabelsStore) mgr.store; this.storeClassName = className;
}
@Parameterized.Parameters
public static Collection<String[]> getParameters() {
return Arrays.asList(
new String[][] { { FileSystemNodeLabelsStore.class.getCanonicalName() },
{ NonAppendableFSNodeLabelStore.class.getCanonicalName() } });
} }
@Before @Before
@ -68,6 +80,7 @@ public void before() throws IOException {
mgr = new MockNodeLabelManager(); mgr = new MockNodeLabelManager();
conf = new Configuration(); conf = new Configuration();
conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true); conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true);
conf.set(YarnConfiguration.FS_NODE_LABELS_STORE_IMPL_CLASS, storeClassName);
File tempDir = File.createTempFile("nlb", ".tmp"); File tempDir = File.createTempFile("nlb", ".tmp");
tempDir.delete(); tempDir.delete();
tempDir.mkdirs(); tempDir.mkdirs();
@ -80,7 +93,11 @@ public void before() throws IOException {
@After @After
public void after() throws IOException { public void after() throws IOException {
getStore().fs.delete(getStore().fsWorkingPath, true); if (mgr.store instanceof FileSystemNodeLabelsStore) {
FileSystemNodeLabelsStore fsStore =
((FileSystemNodeLabelsStore) mgr.store);
fsStore.fs.delete(fsStore.fsWorkingPath, true);
}
mgr.stop(); mgr.stop();
} }
@ -324,11 +341,12 @@ public void testSerilizationAfterRecovery() throws Exception {
@Test @Test
public void testRootMkdirOnInitStore() throws Exception { public void testRootMkdirOnInitStore() throws Exception {
final FileSystem mockFs = Mockito.mock(FileSystem.class); final FileSystem mockFs = Mockito.mock(FileSystem.class);
FileSystemNodeLabelsStore mockStore = new FileSystemNodeLabelsStore(mgr) { FileSystemNodeLabelsStore mockStore = new FileSystemNodeLabelsStore() {
void setFileSystem(Configuration conf) throws IOException { void setFileSystem(Configuration conf) throws IOException {
fs = mockFs; fs = mockFs;
} }
}; };
mockStore.setNodeLabelsManager(mgr);
mockStore.fs = mockFs; mockStore.fs = mockFs;
verifyMkdirsCount(mockStore, true, 0); verifyMkdirsCount(mockStore, true, 0);
verifyMkdirsCount(mockStore, false, 1); verifyMkdirsCount(mockStore, false, 1);

View File

@ -37,10 +37,10 @@ public class NullRMNodeLabelsManager extends RMNodeLabelsManager {
@Override @Override
public void initNodeLabelStore(Configuration conf) { public void initNodeLabelStore(Configuration conf) {
this.store = new NodeLabelsStore(this) { this.store = new NodeLabelsStore() {
@Override @Override
public void recover(boolean ignoreNodeToLabelsMappings) public void recover()
throws IOException { throws IOException {
// do nothing // do nothing
} }