YARN-7875. Node Attribute store for storing and recovering attributes. Contributed by Bibin A Chundatt.

This commit is contained in:
Sunil G 2018-04-06 07:09:27 +05:30
parent a6590c1f1f
commit b9890d1f66
18 changed files with 935 additions and 16 deletions

View File

@ -3480,6 +3480,22 @@ public class YarnConfiguration extends Configuration {
public static final String FS_NODE_LABELS_STORE_ROOT_DIR = NODE_LABELS_PREFIX
+ "fs-store.root-dir";
/**
* Node-attribute configurations.
*/
public static final String NODE_ATTRIBUTE_PREFIX =
YARN_PREFIX + "node-attribute.";
/**
* Node attribute store implementation class.
*/
public static final String FS_NODE_ATTRIBUTE_STORE_IMPL_CLASS =
NODE_ATTRIBUTE_PREFIX + "fs-store.impl.class";
/**
* File system not attribute store directory.
*/
public static final String FS_NODE_ATTRIBUTE_STORE_ROOT_DIR =
NODE_ATTRIBUTE_PREFIX + "fs-store.root-dir";
/**
* Flag to indicate if the node labels feature enabled, by default it's
* disabled

View File

@ -0,0 +1,77 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.nodelabels;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeToAttributes;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
/**
* Interface class for Node label store.
*/
public interface NodeAttributeStore extends Closeable {
/**
* Replace labels on node.
*
* @param nodeToAttribute node to attribute list.
* @throws IOException
*/
void replaceNodeAttributes(List<NodeToAttributes> nodeToAttribute)
throws IOException;
/**
* Add attribute to node.
*
* @param nodeToAttribute node to attribute list.
* @throws IOException
*/
void addNodeAttributes(List<NodeToAttributes> nodeToAttribute)
throws IOException;
/**
* Remove attribute from node.
*
* @param nodeToAttribute node to attribute list.
* @throws IOException
*/
void removeNodeAttributes(List<NodeToAttributes> nodeToAttribute)
throws IOException;
/**
* Initialize based on configuration and NodeAttributesManager.
*
* @param configuration configuration instance.
* @param mgr nodeattributemanager instance.
* @throws Exception
*/
void init(Configuration configuration, NodeAttributesManager mgr)
throws Exception;
/**
* Recover store on resourcemanager startup.
* @throws IOException
* @throws YarnException
*/
void recover() throws IOException, YarnException;
}

View File

@ -19,11 +19,13 @@
package org.apache.hadoop.yarn.nodelabels;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.NodeAttribute;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeToAttributes;
/**
* This class captures all interactions for Attributes with RM.
@ -101,6 +103,15 @@ public abstract class NodeAttributesManager extends AbstractService {
public abstract Map<NodeAttribute, AttributeValue> getAttributesForNode(
String hostName);
/**
* Get All node to Attributes list based on filter.
*
* @return List<NodeToAttributes> nodeToAttributes matching filter.If empty
* or null is passed as argument will return all.
*/
public abstract List<NodeToAttributes> getNodeToAttributes(
Set<String> prefix);
// futuristic
// public set<NodeId> getNodesMatchingExpression(String nodeLabelExp);
}

View File

@ -53,12 +53,6 @@ public class RMNodeAttribute extends AbstractLabel {
this.attribute = attribute;
}
public RMNodeAttribute(String attributeName) {
super(attributeName);
attribute = NodeAttribute.newInstance(attributeName,
NodeAttributeType.STRING, CommonNodeLabelsManager.NO_LABEL);
}
public NodeAttributeType getAttributeType() {
return attribute.getAttributeType();
}

View File

@ -64,7 +64,7 @@ public abstract class AbstractFSNodeStore<M> {
initFileSystem(conf);
// mkdir of root dir path
fs.mkdirs(fsWorkingPath);
LOG.info("Created store directory :" + fsWorkingPath);
}
/**

View File

@ -17,13 +17,17 @@
*/
package org.apache.hadoop.yarn.nodelabels.store;
import static org.apache.hadoop.yarn.nodelabels.store.FSStoreOpHandler
.StoreType.NODE_LABEL_STORE;
import static org.apache.hadoop.yarn.nodelabels.store.FSStoreOpHandler.StoreType.NODE_ATTRIBUTE;
import static org.apache.hadoop.yarn.nodelabels.store.FSStoreOpHandler.StoreType.NODE_LABEL_STORE;
import org.apache.hadoop.yarn.nodelabels.store.op.AddClusterLabelOp;
import org.apache.hadoop.yarn.nodelabels.store.op.AddNodeToAttributeLogOp;
import org.apache.hadoop.yarn.nodelabels.store.op.FSNodeStoreLogOp;
import org.apache.hadoop.yarn.nodelabels.store.op.NodeAttributeMirrorOp;
import org.apache.hadoop.yarn.nodelabels.store.op.NodeLabelMirrorOp;
import org.apache.hadoop.yarn.nodelabels.store.op.NodeToLabelOp;
import org.apache.hadoop.yarn.nodelabels.store.op.RemoveClusterLabelOp;
import org.apache.hadoop.yarn.nodelabels.store.op.RemoveNodeToAttributeLogOp;
import org.apache.hadoop.yarn.nodelabels.store.op.ReplaceNodeToAttributeLogOp;
import java.util.HashMap;
import java.util.Map;
@ -39,7 +43,7 @@ public class FSStoreOpHandler {
public enum StoreType {
NODE_LABEL_STORE,
NODE_LABEL_ATTRIBUTE;
NODE_ATTRIBUTE
}
static {
@ -47,13 +51,24 @@ public class FSStoreOpHandler {
mirrorOp = new HashMap<>();
// registerLog edit log operation
//Node Label Operations
registerLog(NODE_LABEL_STORE, AddClusterLabelOp.OPCODE, AddClusterLabelOp.class);
registerLog(NODE_LABEL_STORE, NodeToLabelOp.OPCODE, NodeToLabelOp.class);
registerLog(NODE_LABEL_STORE, RemoveClusterLabelOp.OPCODE, RemoveClusterLabelOp.class);
//NodeAttibute operation
registerLog(NODE_ATTRIBUTE, AddNodeToAttributeLogOp.OPCODE, AddNodeToAttributeLogOp.class);
registerLog(NODE_ATTRIBUTE, RemoveNodeToAttributeLogOp.OPCODE, RemoveNodeToAttributeLogOp.class);
registerLog(NODE_ATTRIBUTE, ReplaceNodeToAttributeLogOp.OPCODE, ReplaceNodeToAttributeLogOp.class);
// registerLog Mirror op
// Node label mirror operation
registerMirror(NODE_LABEL_STORE, NodeLabelMirrorOp.class);
//Node attribute mirror operation
registerMirror(NODE_ATTRIBUTE, NodeAttributeMirrorOp.class);
}
private static void registerMirror(StoreType type,

View File

@ -0,0 +1,71 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.nodelabels.store.op;
import org.apache.hadoop.yarn.nodelabels.NodeAttributesManager;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos;
import org.apache.hadoop.yarn.server.api.protocolrecords.AttributeMappingOperationType;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeToAttributes;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodesToAttributesMappingRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodesToAttributesMappingRequestPBImpl;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.List;
/**
* File system Add Node to attribute mapping.
*/
public class AddNodeToAttributeLogOp
extends FSNodeStoreLogOp<NodeAttributesManager> {
private List<NodeToAttributes> attributes;
public static final int OPCODE = 0;
@Override
public void write(OutputStream os, NodeAttributesManager mgr)
throws IOException {
((NodesToAttributesMappingRequestPBImpl) NodesToAttributesMappingRequest
.newInstance(AttributeMappingOperationType.ADD, attributes, false))
.getProto().writeDelimitedTo(os);
}
@Override
public void recover(InputStream is, NodeAttributesManager mgr)
throws IOException {
NodesToAttributesMappingRequest request =
new NodesToAttributesMappingRequestPBImpl(
YarnServerResourceManagerServiceProtos
.NodesToAttributesMappingRequestProto
.parseDelimitedFrom(is));
mgr.addNodeAttributes(getNodeToAttributesMap(request));
}
public AddNodeToAttributeLogOp setAttributes(
List<NodeToAttributes> attributesList) {
this.attributes = attributesList;
return this;
}
@Override
public int getOpCode() {
return OPCODE;
}
}

View File

@ -17,10 +17,18 @@
*/
package org.apache.hadoop.yarn.nodelabels.store.op;
import org.apache.hadoop.yarn.api.records.NodeAttribute;
import org.apache.hadoop.yarn.nodelabels.store.StoreOp;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeToAttributes;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodesToAttributesMappingRequest;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* Defines all FileSystem editlog operation. All node label and attribute
@ -32,4 +40,13 @@ public abstract class FSNodeStoreLogOp<M>
implements StoreOp<OutputStream, InputStream, M> {
public abstract int getOpCode();
protected Map<String, Set<NodeAttribute>> getNodeToAttributesMap(
NodesToAttributesMappingRequest request) {
List<NodeToAttributes> attributes = request.getNodesToAttributes();
Map<String, Set<NodeAttribute>> nodeToAttrMap = new HashMap<>();
attributes.forEach((v) -> nodeToAttrMap
.put(v.getNode(), new HashSet<>(v.getNodeAttributes())));
return nodeToAttrMap;
}
}

View File

@ -0,0 +1,64 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.nodelabels.store.op;
import com.google.common.collect.ImmutableSet;
import org.apache.hadoop.yarn.api.records.NodeAttribute;
import org.apache.hadoop.yarn.nodelabels.NodeAttributesManager;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos;
import org.apache.hadoop.yarn.server.api.protocolrecords.AttributeMappingOperationType;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodesToAttributesMappingRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodesToAttributesMappingRequestPBImpl;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
/**
* File System Node Attribute Mirror read and write operation.
*/
public class NodeAttributeMirrorOp
extends FSNodeStoreLogOp<NodeAttributesManager> {
@Override
public void write(OutputStream os, NodeAttributesManager mgr)
throws IOException {
((NodesToAttributesMappingRequestPBImpl) NodesToAttributesMappingRequest
.newInstance(AttributeMappingOperationType.REPLACE,
mgr.getNodeToAttributes(
ImmutableSet.of(NodeAttribute.PREFIX_CENTRALIZED)), false))
.getProto().writeDelimitedTo(os);
}
@Override
public void recover(InputStream is, NodeAttributesManager mgr)
throws IOException {
NodesToAttributesMappingRequest request =
new NodesToAttributesMappingRequestPBImpl(
YarnServerResourceManagerServiceProtos
.NodesToAttributesMappingRequestProto
.parseDelimitedFrom(is));
mgr.replaceNodeAttributes(NodeAttribute.PREFIX_CENTRALIZED,
getNodeToAttributesMap(request));
}
@Override
public int getOpCode() {
return -1;
}
}

View File

@ -0,0 +1,71 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.nodelabels.store.op;
import org.apache.hadoop.yarn.nodelabels.NodeAttributesManager;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos;
import org.apache.hadoop.yarn.server.api.protocolrecords.AttributeMappingOperationType;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeToAttributes;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodesToAttributesMappingRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodesToAttributesMappingRequestPBImpl;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.List;
/**
* File system remove node attribute from node operation.
*/
public class RemoveNodeToAttributeLogOp
extends FSNodeStoreLogOp<NodeAttributesManager> {
private List<NodeToAttributes> attributes;
public static final int OPCODE = 1;
@Override
public void write(OutputStream os, NodeAttributesManager mgr)
throws IOException {
((NodesToAttributesMappingRequestPBImpl) NodesToAttributesMappingRequest
.newInstance(AttributeMappingOperationType.REMOVE, attributes, false))
.getProto().writeDelimitedTo(os);
}
@Override
public void recover(InputStream is, NodeAttributesManager mgr)
throws IOException {
NodesToAttributesMappingRequest request =
new NodesToAttributesMappingRequestPBImpl(
YarnServerResourceManagerServiceProtos
.NodesToAttributesMappingRequestProto
.parseDelimitedFrom(is));
mgr.removeNodeAttributes(getNodeToAttributesMap(request));
}
public RemoveNodeToAttributeLogOp setAttributes(
List<NodeToAttributes> attrs) {
this.attributes = attrs;
return this;
}
@Override
public int getOpCode() {
return OPCODE;
}
}

View File

@ -0,0 +1,73 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.nodelabels.store.op;
import org.apache.hadoop.yarn.api.records.NodeAttribute;
import org.apache.hadoop.yarn.nodelabels.NodeAttributesManager;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos;
import org.apache.hadoop.yarn.server.api.protocolrecords.AttributeMappingOperationType;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeToAttributes;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodesToAttributesMappingRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodesToAttributesMappingRequestPBImpl;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.List;
/**
* File system replace node attribute from node operation.
*/
public class ReplaceNodeToAttributeLogOp
extends FSNodeStoreLogOp<NodeAttributesManager> {
private List<NodeToAttributes> attributes;
public static final int OPCODE = 2;
@Override
public void write(OutputStream os, NodeAttributesManager mgr)
throws IOException {
((NodesToAttributesMappingRequestPBImpl) NodesToAttributesMappingRequest
.newInstance(AttributeMappingOperationType.REPLACE, attributes, false))
.getProto().writeDelimitedTo(os);
}
@Override
public void recover(InputStream is, NodeAttributesManager mgr)
throws IOException {
NodesToAttributesMappingRequest request =
new NodesToAttributesMappingRequestPBImpl(
YarnServerResourceManagerServiceProtos
.NodesToAttributesMappingRequestProto
.parseDelimitedFrom(is));
//Only CENTRALIZED is stored to FS system
mgr.replaceNodeAttributes(NodeAttribute.PREFIX_CENTRALIZED,
getNodeToAttributesMap(request));
}
public ReplaceNodeToAttributeLogOp setAttributes(
List<NodeToAttributes> attrs) {
this.attributes = attrs;
return this;
}
@Override
public int getOpCode() {
return OPCODE;
}
}

View File

@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
@InterfaceAudience.Private
package org.apache.hadoop.yarn.nodelabels.store.op;
import org.apache.hadoop.classification.InterfaceAudience;

View File

@ -3965,4 +3965,20 @@
<name>yarn.nodemanager.elastic-memory-control.timeout-sec</name>
<value>5</value>
</property>
<property>
<description>
URI for NodeAttributeManager. The default value is
/tmp/hadoop-yarn-${user}/node-attribute/ in the local filesystem.
</description>
<name>yarn.node-attribute.fs-store.root-dir</name>
<value></value>
</property>
<property>
<description>
Choose different implementation of node attribute's storage
</description>
<name>yarn.node-attribute.fs-store.impl.class</name>
<value>org.apache.hadoop.yarn.server.resourcemanager.nodelabels.FileSystemNodeAttributeStore</value>
</property>
</configuration>

View File

@ -0,0 +1,102 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.nodelabels;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.nodelabels.NodeAttributeStore;
import org.apache.hadoop.yarn.nodelabels.NodeAttributesManager;
import org.apache.hadoop.yarn.nodelabels.store.AbstractFSNodeStore;
import org.apache.hadoop.yarn.nodelabels.store.FSStoreOpHandler;
import org.apache.hadoop.yarn.nodelabels.store.op.AddNodeToAttributeLogOp;
import org.apache.hadoop.yarn.nodelabels.store.op.RemoveNodeToAttributeLogOp;
import org.apache.hadoop.yarn.nodelabels.store.op.ReplaceNodeToAttributeLogOp;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeToAttributes;
import java.io.IOException;
import java.util.List;
/**
* File system node attribute implementation.
*/
public class FileSystemNodeAttributeStore
extends AbstractFSNodeStore<NodeAttributesManager>
implements NodeAttributeStore {
protected static final Log LOG =
LogFactory.getLog(FileSystemNodeAttributeStore.class);
protected static final String DEFAULT_DIR_NAME = "node-attribute";
protected static final String MIRROR_FILENAME = "nodeattribute.mirror";
protected static final String EDITLOG_FILENAME = "nodeattribute.editlog";
public FileSystemNodeAttributeStore() {
super(FSStoreOpHandler.StoreType.NODE_ATTRIBUTE);
}
private String getDefaultFSNodeAttributeRootDir() throws IOException {
// default is in local: /tmp/hadoop-yarn-${user}/node-attribute/
return "file:///tmp/hadoop-yarn-" + UserGroupInformation.getCurrentUser()
.getShortUserName() + "/" + DEFAULT_DIR_NAME;
}
@Override
public void init(Configuration conf, NodeAttributesManager mgr)
throws Exception {
StoreSchema schema = new StoreSchema(EDITLOG_FILENAME, MIRROR_FILENAME);
initStore(conf, new Path(
conf.get(YarnConfiguration.FS_NODE_ATTRIBUTE_STORE_ROOT_DIR,
getDefaultFSNodeAttributeRootDir())), schema, mgr);
}
@Override
public void replaceNodeAttributes(List<NodeToAttributes> nodeToAttribute)
throws IOException {
ReplaceNodeToAttributeLogOp op = new ReplaceNodeToAttributeLogOp();
writeToLog(op.setAttributes(nodeToAttribute));
}
@Override
public void addNodeAttributes(List<NodeToAttributes> nodeAttributeMapping)
throws IOException {
AddNodeToAttributeLogOp op = new AddNodeToAttributeLogOp();
writeToLog(op.setAttributes(nodeAttributeMapping));
}
@Override
public void removeNodeAttributes(List<NodeToAttributes> nodeAttributeMapping)
throws IOException {
RemoveNodeToAttributeLogOp op = new RemoveNodeToAttributeLogOp();
writeToLog(op.setAttributes(nodeAttributeMapping));
}
@Override
public void recover() throws IOException, YarnException {
super.recoverFromStore();
}
@Override
public void close() throws IOException {
super.closeFSStore();
}
}

View File

@ -32,24 +32,31 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import java.util.ArrayList;
import java.util.List;
import com.google.common.base.Strings;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.api.records.NodeAttribute;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.nodelabels.AttributeValue;
import org.apache.hadoop.yarn.nodelabels.NodeAttributeStore;
import org.apache.hadoop.yarn.nodelabels.NodeAttributesManager;
import org.apache.hadoop.yarn.nodelabels.NodeLabelUtil;
import org.apache.hadoop.yarn.nodelabels.RMNodeAttribute;
import org.apache.hadoop.yarn.nodelabels.StringAttributeValue;
import org.apache.hadoop.yarn.server.api.protocolrecords.AttributeMappingOperationType;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeToAttributes;
/**
* Manager holding the attributes to Labels.
@ -63,7 +70,8 @@ public class NodeAttributesManagerImpl extends NodeAttributesManager {
*/
public static final String EMPTY_ATTRIBUTE_VALUE = "";
private Dispatcher dispatcher;
Dispatcher dispatcher;
NodeAttributeStore store;
// TODO may be we can have a better collection here.
// this will be updated to get the attributeName to NM mapping
@ -121,7 +129,21 @@ public class NodeAttributesManagerImpl extends NodeAttributesManager {
}
protected void initNodeAttributeStore(Configuration conf) throws Exception {
// TODO to generalize and make use of the FileSystemNodeLabelsStore
this.store =getAttributeStoreClass(conf);
this.store.init(conf, this);
this.store.recover();
}
private NodeAttributeStore getAttributeStoreClass(Configuration conf) {
try {
return ReflectionUtils.newInstance(
conf.getClass(YarnConfiguration.FS_NODE_ATTRIBUTE_STORE_IMPL_CLASS,
FileSystemNodeAttributeStore.class, NodeAttributeStore.class),
conf);
} catch (Exception e) {
throw new YarnRuntimeException(
"Could not instantiate Node Attribute Store ", e);
}
}
private void internalUpdateAttributesOnNodes(
@ -174,7 +196,8 @@ public class NodeAttributesManagerImpl extends NodeAttributesManager {
LOG.info(logMsg);
if (null != dispatcher) {
if (null != dispatcher && NodeAttribute.PREFIX_CENTRALIZED
.equals(attributePrefix)) {
dispatcher.getEventHandler()
.handle(new NodeAttributesStoreEvent(nodeAttributeMapping, op));
}
@ -382,6 +405,32 @@ public class NodeAttributesManagerImpl extends NodeAttributesManager {
}
}
@Override
public List<NodeToAttributes> getNodeToAttributes(Set<String> prefix) {
try {
readLock.lock();
List<NodeToAttributes> nodeToAttributes = new ArrayList<>();
nodeCollections.forEach((k, v) -> {
List<NodeAttribute> attrs;
if (prefix == null || prefix.isEmpty()) {
attrs = new ArrayList<>(v.getAttributes().keySet());
} else {
attrs = new ArrayList<>();
for (Entry<NodeAttribute, AttributeValue> nodeAttr : v.attributes
.entrySet()) {
if (prefix.contains(nodeAttr.getKey().getAttributePrefix())) {
attrs.add(nodeAttr.getKey());
}
}
}
nodeToAttributes.add(NodeToAttributes.newInstance(k, attrs));
});
return nodeToAttributes;
} finally {
readLock.unlock();
}
}
public void activateNode(NodeId nodeId, Resource resource) {
try {
writeLock.lock();
@ -524,7 +573,29 @@ public class NodeAttributesManagerImpl extends NodeAttributesManager {
// Dispatcher related code
protected void handleStoreEvent(NodeAttributesStoreEvent event) {
// TODO Need to extend the File
List<NodeToAttributes> mappingList = new ArrayList<>();
Map<String, Map<NodeAttribute, AttributeValue>> nodeToAttr =
event.getNodeAttributeMappingList();
nodeToAttr.forEach((k, v) -> mappingList
.add(NodeToAttributes.newInstance(k, new ArrayList<>(v.keySet()))));
try {
switch (event.getOperation()) {
case REPLACE:
store.replaceNodeAttributes(mappingList);
break;
case ADD:
store.addNodeAttributes(mappingList);
break;
case REMOVE:
store.removeNodeAttributes(mappingList);
break;
default:
LOG.warn("Unsupported operation");
}
} catch (IOException e) {
LOG.error("Failed to store attribute modification to storage");
throw new YarnRuntimeException(e);
}
}
@Override
@ -549,7 +620,8 @@ public class NodeAttributesManagerImpl extends NodeAttributesManager {
private void processMapping(
Map<String, Set<NodeAttribute>> nodeAttributeMapping,
AttributeMappingOperationType mappingType) throws IOException {
processMapping(nodeAttributeMapping, mappingType, null);
processMapping(nodeAttributeMapping, mappingType,
NodeAttribute.PREFIX_CENTRALIZED);
}
private void processMapping(
@ -564,4 +636,22 @@ public class NodeAttributesManagerImpl extends NodeAttributesManager {
internalUpdateAttributesOnNodes(validMapping, mappingType,
newAttributesToBeAdded, attributePrefix);
}
protected void stopDispatcher() {
AsyncDispatcher asyncDispatcher = (AsyncDispatcher) dispatcher;
if (null != asyncDispatcher) {
asyncDispatcher.stop();
}
}
@Override
protected void serviceStop() throws Exception {
// finalize store
stopDispatcher();
// only close store when we enabled store persistent
if (null != store) {
store.close();
}
}
}

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.yarn.server.resourcemanager;
import org.apache.hadoop.yarn.nodelabels.NodeAttributeStore;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.FileSystemNodeAttributeStore;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
@ -828,6 +830,14 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
Configuration conf = new Configuration();
conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
hostFile.getAbsolutePath());
conf.setClass(YarnConfiguration.FS_NODE_ATTRIBUTE_STORE_IMPL_CLASS,
FileSystemNodeAttributeStore.class, NodeAttributeStore.class);
File tempDir = File.createTempFile("nattr", ".tmp");
tempDir.delete();
tempDir.mkdirs();
tempDir.deleteOnExit();
conf.set(YarnConfiguration.FS_NODE_ATTRIBUTE_STORE_ROOT_DIR,
tempDir.getAbsolutePath());
rm = new MockRM(conf);
rm.start();

View File

@ -0,0 +1,260 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.nodelabels;
import com.google.common.collect.ImmutableSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.NodeAttribute;
import org.apache.hadoop.yarn.api.records.NodeAttributeType;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.InlineDispatcher;
import org.apache.hadoop.yarn.nodelabels.AttributeValue;
import org.apache.hadoop.yarn.nodelabels.NodeAttributeStore;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
public class TestFileSystemNodeAttributeStore {
private MockNodeAttrbuteManager mgr = null;
private Configuration conf = null;
private static class MockNodeAttrbuteManager
extends NodeAttributesManagerImpl {
@Override
protected void initDispatcher(Configuration conf) {
super.dispatcher = new InlineDispatcher();
}
@Override
protected void startDispatcher() {
//Do nothing
}
@Override
protected void stopDispatcher() {
//Do nothing
}
}
@Before
public void before() throws IOException {
mgr = new MockNodeAttrbuteManager();
conf = new Configuration();
conf.setClass(YarnConfiguration.FS_NODE_ATTRIBUTE_STORE_IMPL_CLASS,
FileSystemNodeAttributeStore.class, NodeAttributeStore.class);
File tempDir = File.createTempFile("nattr", ".tmp");
tempDir.delete();
tempDir.mkdirs();
tempDir.deleteOnExit();
conf.set(YarnConfiguration.FS_NODE_ATTRIBUTE_STORE_ROOT_DIR,
tempDir.getAbsolutePath());
mgr.init(conf);
mgr.start();
}
@After
public void after() throws IOException {
FileSystemNodeAttributeStore fsStore =
((FileSystemNodeAttributeStore) mgr.store);
fsStore.getFs().delete(fsStore.getFsWorkingPath(), true);
mgr.stop();
}
@Test(timeout = 10000)
public void testRecoverWithMirror() throws Exception {
//------host0----
// add -GPU & FPGA
// remove -GPU
// replace -Docker
//------host1----
// add--GPU
NodeAttribute docker = NodeAttribute
.newInstance(NodeAttribute.PREFIX_CENTRALIZED, "DOCKER",
NodeAttributeType.STRING, "docker-0");
NodeAttribute gpu = NodeAttribute
.newInstance(NodeAttribute.PREFIX_CENTRALIZED, "GPU",
NodeAttributeType.STRING, "nvidia");
NodeAttribute fpga = NodeAttribute
.newInstance(NodeAttribute.PREFIX_CENTRALIZED, "FPGA",
NodeAttributeType.STRING, "asus");
Map<String, Set<NodeAttribute>> toAddAttributes = new HashMap<>();
toAddAttributes.put("host0", ImmutableSet.of(gpu, fpga));
toAddAttributes.put("host1", ImmutableSet.of(gpu));
// Add node attribute
mgr.addNodeAttributes(toAddAttributes);
Assert.assertEquals("host0 size", 2,
mgr.getAttributesForNode("host0").size());
// Add test to remove
toAddAttributes.clear();
toAddAttributes.put("host0", ImmutableSet.of(gpu));
mgr.removeNodeAttributes(toAddAttributes);
// replace nodeattribute
toAddAttributes.clear();
toAddAttributes.put("host0", ImmutableSet.of(docker));
mgr.replaceNodeAttributes(NodeAttribute.PREFIX_CENTRALIZED,
toAddAttributes);
Map<NodeAttribute, AttributeValue> attrs =
mgr.getAttributesForNode("host0");
Assert.assertEquals(attrs.size(), 1);
Assert.assertEquals(attrs.keySet().toArray()[0], docker);
mgr.stop();
// Start new attribute manager with same path
mgr = new MockNodeAttrbuteManager();
mgr.init(conf);
mgr.start();
mgr.getAttributesForNode("host0");
Assert.assertEquals("host0 size", 1,
mgr.getAttributesForNode("host0").size());
Assert.assertEquals("host1 size", 1,
mgr.getAttributesForNode("host1").size());
attrs = mgr.getAttributesForNode("host0");
Assert.assertEquals(attrs.size(), 1);
Assert.assertEquals(attrs.keySet().toArray()[0], docker);
//------host0----
// current - docker
// replace - gpu
//----- host1----
// current - gpu
// add - docker
toAddAttributes.clear();
toAddAttributes.put("host0", ImmutableSet.of(gpu));
mgr.replaceNodeAttributes(NodeAttribute.PREFIX_CENTRALIZED,
toAddAttributes);
toAddAttributes.clear();
toAddAttributes.put("host1", ImmutableSet.of(docker));
mgr.addNodeAttributes(toAddAttributes);
// Recover from mirror and edit log
mgr.stop();
mgr = new MockNodeAttrbuteManager();
mgr.init(conf);
mgr.start();
Assert.assertEquals("host0 size", 1,
mgr.getAttributesForNode("host0").size());
Assert.assertEquals("host1 size", 2,
mgr.getAttributesForNode("host1").size());
attrs = mgr.getAttributesForNode("host0");
Assert.assertEquals(attrs.size(), 1);
Assert.assertEquals(attrs.keySet().toArray()[0], gpu);
attrs = mgr.getAttributesForNode("host1");
Assert.assertTrue(attrs.keySet().contains(docker));
Assert.assertTrue(attrs.keySet().contains(gpu));
}
@Test(timeout = 10000)
public void testRecoverFromEditLog() throws Exception {
NodeAttribute docker = NodeAttribute
.newInstance(NodeAttribute.PREFIX_CENTRALIZED, "DOCKER",
NodeAttributeType.STRING, "docker-0");
NodeAttribute gpu = NodeAttribute
.newInstance(NodeAttribute.PREFIX_CENTRALIZED, "GPU",
NodeAttributeType.STRING, "nvidia");
NodeAttribute fpga = NodeAttribute
.newInstance(NodeAttribute.PREFIX_CENTRALIZED, "FPGA",
NodeAttributeType.STRING, "asus");
Map<String, Set<NodeAttribute>> toAddAttributes = new HashMap<>();
toAddAttributes.put("host0", ImmutableSet.of(gpu, fpga));
toAddAttributes.put("host1", ImmutableSet.of(docker));
// Add node attribute
mgr.addNodeAttributes(toAddAttributes);
Assert.assertEquals("host0 size", 2,
mgr.getAttributesForNode("host0").size());
// Increase editlog operation
for (int i = 0; i < 5; i++) {
// Add gpu host1
toAddAttributes.clear();
toAddAttributes.put("host0", ImmutableSet.of(gpu));
mgr.removeNodeAttributes(toAddAttributes);
// Add gpu host1
toAddAttributes.clear();
toAddAttributes.put("host1", ImmutableSet.of(docker));
mgr.addNodeAttributes(toAddAttributes);
// Remove GPU replace
toAddAttributes.clear();
toAddAttributes.put("host0", ImmutableSet.of(gpu));
mgr.replaceNodeAttributes(NodeAttribute.PREFIX_CENTRALIZED,
toAddAttributes);
// Add fgpa host1
toAddAttributes.clear();
toAddAttributes.put("host1", ImmutableSet.of(gpu));
mgr.addNodeAttributes(toAddAttributes);
}
mgr.stop();
// Start new attribute manager with same path
mgr = new MockNodeAttrbuteManager();
mgr.init(conf);
mgr.start();
Assert.assertEquals("host0 size", 1,
mgr.getAttributesForNode("host0").size());
Assert.assertEquals("host1 size", 2,
mgr.getAttributesForNode("host1").size());
toAddAttributes.clear();
NodeAttribute replaced =
NodeAttribute.newInstance("GPU2", NodeAttributeType.STRING, "nvidia2");
toAddAttributes.put("host0", ImmutableSet.of(replaced));
mgr.replaceNodeAttributes(NodeAttribute.PREFIX_CENTRALIZED,
toAddAttributes);
mgr.stop();
mgr = new MockNodeAttrbuteManager();
mgr.init(conf);
mgr.start();
Map<NodeAttribute, AttributeValue> valueMap =
mgr.getAttributesForNode("host0");
Map.Entry<NodeAttribute, AttributeValue> entry =
valueMap.entrySet().iterator().next();
NodeAttribute attribute = entry.getKey();
Assert.assertEquals("host0 size", 1,
mgr.getAttributesForNode("host0").size());
Assert.assertEquals("host1 size", 2,
mgr.getAttributesForNode("host1").size());
checkNodeAttributeEqual(replaced, attribute);
}
public void checkNodeAttributeEqual(NodeAttribute atr1, NodeAttribute atr2) {
Assert.assertEquals(atr1.getAttributeType(), atr2.getAttributeType());
Assert.assertEquals(atr1.getAttributeName(), atr2.getAttributeName());
Assert.assertEquals(atr1.getAttributePrefix(), atr2.getAttributePrefix());
Assert.assertEquals(atr1.getAttributeValue(), atr2.getAttributeValue());
}
}

View File

@ -23,7 +23,9 @@ import com.google.common.collect.Sets;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.NodeAttribute;
import org.apache.hadoop.yarn.api.records.NodeAttributeType;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.nodelabels.AttributeValue;
import org.apache.hadoop.yarn.nodelabels.NodeAttributeStore;
import org.apache.hadoop.yarn.nodelabels.NodeAttributesManager;
import org.apache.hadoop.yarn.nodelabels.NodeLabelUtil;
import org.junit.Test;
@ -31,6 +33,7 @@ import org.junit.Before;
import org.junit.After;
import org.junit.Assert;
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
@ -49,9 +52,17 @@ public class TestNodeAttributesManager {
new String[] {"host1", "host2", "host3"};
@Before
public void init() {
public void init() throws IOException {
Configuration conf = new Configuration();
attributesManager = new NodeAttributesManagerImpl();
conf.setClass(YarnConfiguration.FS_NODE_ATTRIBUTE_STORE_IMPL_CLASS,
FileSystemNodeAttributeStore.class, NodeAttributeStore.class);
File tempDir = File.createTempFile("nattr", ".tmp");
tempDir.delete();
tempDir.mkdirs();
tempDir.deleteOnExit();
conf.set(YarnConfiguration.FS_NODE_ATTRIBUTE_STORE_ROOT_DIR,
tempDir.getAbsolutePath());
attributesManager.init(conf);
attributesManager.start();
}