YARN-8739. Fix jenkins issues for Node Attributes branch. Contributed by Sunil Govindan.
This commit is contained in:
parent
1e7d6e55a5
commit
c44088ac19
|
@ -574,7 +574,10 @@ public abstract class HAAdmin extends Configured implements Tool {
|
|||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* UsageInfo class holds args and help details.
|
||||
*/
|
||||
public static class UsageInfo {
|
||||
public final String args;
|
||||
public final String help;
|
||||
|
|
|
@ -207,7 +207,7 @@ public class RMNodeWrapper implements RMNode {
|
|||
public Map<String, Long> getAllocationTagsWithCount() {
|
||||
return node.getAllocationTagsWithCount();
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public Set<NodeAttribute> getAllNodeAttributes() {
|
||||
return node.getAllNodeAttributes();
|
||||
|
|
|
@ -54,12 +54,13 @@ public abstract class GetAttributesToNodesResponse {
|
|||
|
||||
/**
|
||||
* Get mapping of NodeAttributeKey to its associated mapping of list of
|
||||
* NodeToAttributeValuenode to attribute value.
|
||||
* NodeToAttributeValue associated with attribute.
|
||||
*
|
||||
* @return Map<NodeAttributeKey, List<NodeToAttributeValue>> node attributes
|
||||
* to list of NodeToAttributeValuenode.
|
||||
*/
|
||||
@Public
|
||||
@Evolving
|
||||
public abstract Map<NodeAttributeKey, List<NodeToAttributeValue>> getAttributesToNodes();
|
||||
public abstract Map<NodeAttributeKey,
|
||||
List<NodeToAttributeValue>> getAttributesToNodes();
|
||||
}
|
||||
|
|
|
@ -150,7 +150,7 @@ public interface ResourceManagerAdministrationProtocol extends GetUserMappingsPr
|
|||
|
||||
@Private
|
||||
@Idempotent
|
||||
public NodesToAttributesMappingResponse mapAttributesToNodes(
|
||||
NodesToAttributesMappingResponse mapAttributesToNodes(
|
||||
NodesToAttributesMappingRequest request) throws YarnException,
|
||||
IOException;
|
||||
}
|
||||
|
|
|
@ -20,6 +20,10 @@ package org.apache.hadoop.yarn.server.api.protocolrecords;
|
|||
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
|
||||
/**
|
||||
* NodesToAttributesMappingResponse holds response object for attribute
|
||||
* mapping.
|
||||
*/
|
||||
public class NodesToAttributesMappingResponse {
|
||||
public static NodesToAttributesMappingResponse newInstance() {
|
||||
return Records.newRecord(NodesToAttributesMappingResponse.class);
|
||||
|
|
|
@ -936,7 +936,8 @@ public abstract class YarnClient extends AbstractService {
|
|||
*/
|
||||
@Public
|
||||
@Unstable
|
||||
public abstract Map<NodeAttributeKey, List<NodeToAttributeValue>> getAttributesToNodes(
|
||||
public abstract Map<NodeAttributeKey,
|
||||
List<NodeToAttributeValue>> getAttributesToNodes(
|
||||
Set<NodeAttributeKey> attributes) throws YarnException, IOException;
|
||||
|
||||
/**
|
||||
|
|
|
@ -100,8 +100,9 @@ public class GetAttributesToNodesResponsePBImpl
|
|||
Iterable<AttributeToNodesProto> iterable =
|
||||
() -> new Iterator<AttributeToNodesProto>() {
|
||||
|
||||
private Iterator<Map.Entry<NodeAttributeKey, List<NodeToAttributeValue>>> iter =
|
||||
attributesToNodes.entrySet().iterator();
|
||||
private Iterator<Map.Entry<NodeAttributeKey,
|
||||
List<NodeToAttributeValue>>> iter = attributesToNodes.entrySet()
|
||||
.iterator();
|
||||
|
||||
@Override
|
||||
public void remove() {
|
||||
|
@ -198,7 +199,8 @@ public class GetAttributesToNodesResponsePBImpl
|
|||
}
|
||||
|
||||
@Override
|
||||
public Map<NodeAttributeKey, List<NodeToAttributeValue>> getAttributesToNodes() {
|
||||
public Map<NodeAttributeKey,
|
||||
List<NodeToAttributeValue>> getAttributesToNodes() {
|
||||
initAttributesToNodes();
|
||||
return this.attributesToNodes;
|
||||
}
|
||||
|
|
|
@ -28,7 +28,6 @@ import org.apache.hadoop.yarn.api.records.NodeLabel;
|
|||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.nodelabels.store.AbstractFSNodeStore;
|
||||
import org.apache.hadoop.yarn.nodelabels.store.FSStoreOpHandler;
|
||||
|
||||
import org.apache.hadoop.yarn.nodelabels.store.op.AddClusterLabelOp;
|
||||
import org.apache.hadoop.yarn.nodelabels.store.FSStoreOpHandler.StoreType;
|
||||
|
@ -41,6 +40,9 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* FileSystemNodeLabelsStore for storing node labels.
|
||||
*/
|
||||
public class FileSystemNodeLabelsStore
|
||||
extends AbstractFSNodeStore<CommonNodeLabelsManager>
|
||||
implements NodeLabelsStore {
|
||||
|
|
|
@ -96,7 +96,8 @@ public abstract class NodeAttributesManager extends AbstractService {
|
|||
* @return a Map of attributeKeys to a map of hostnames to its attribute
|
||||
* values.
|
||||
*/
|
||||
public abstract Map<NodeAttributeKey, Map<String, AttributeValue>> getAttributesToNodes(
|
||||
public abstract Map<NodeAttributeKey,
|
||||
Map<String, AttributeValue>> getAttributesToNodes(
|
||||
Set<NodeAttributeKey> attributes);
|
||||
|
||||
/**
|
||||
|
|
|
@ -36,7 +36,7 @@ import java.util.Set;
|
|||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
/**
|
||||
* Store implementation for Non Appendable File Store
|
||||
* Store implementation for Non Appendable File Store.
|
||||
*/
|
||||
public class NonAppendableFSNodeLabelStore extends FileSystemNodeLabelsStore {
|
||||
protected static final Log
|
||||
|
|
|
@ -30,7 +30,8 @@ import org.apache.hadoop.yarn.util.resource.Resources;
|
|||
/**
|
||||
* Partition representation in RM.
|
||||
*/
|
||||
public class RMNodeLabel extends AbstractLabel implements Comparable<RMNodeLabel> {
|
||||
public class RMNodeLabel extends AbstractLabel
|
||||
implements Comparable<RMNodeLabel> {
|
||||
private boolean exclusive;
|
||||
private NodeLabel nodeLabel;
|
||||
private Set<NodeId> nodeIds;
|
||||
|
|
|
@ -182,14 +182,14 @@ public abstract class AbstractFSNodeStore<M> {
|
|||
LOG.info("Finished create editlog file at:" + editLogPath.toString());
|
||||
}
|
||||
|
||||
protected void loadManagerFromEditLog(Path editLogPath) throws IOException {
|
||||
if (!fs.exists(editLogPath)) {
|
||||
protected void loadManagerFromEditLog(Path editPath) throws IOException {
|
||||
if (!fs.exists(editPath)) {
|
||||
return;
|
||||
}
|
||||
try (FSDataInputStream is = fs.open(editLogPath)) {
|
||||
try (FSDataInputStream is = fs.open(editPath)) {
|
||||
while (true) {
|
||||
try {
|
||||
StoreOp storeOp = FSStoreOpHandler.get(is.readInt(),storeType);
|
||||
StoreOp storeOp = FSStoreOpHandler.get(is.readInt(), storeType);
|
||||
storeOp.recover(is, manager);
|
||||
} catch (EOFException e) {
|
||||
// EOF hit, break
|
||||
|
|
|
@ -41,6 +41,9 @@ public class FSStoreOpHandler {
|
|||
editLogOp;
|
||||
private static Map<StoreType, Class<? extends FSNodeStoreLogOp>> mirrorOp;
|
||||
|
||||
/**
|
||||
* Store Type enum to hold label and attribute.
|
||||
*/
|
||||
public enum StoreType {
|
||||
NODE_LABEL_STORE,
|
||||
NODE_ATTRIBUTE
|
||||
|
@ -53,14 +56,19 @@ public class FSStoreOpHandler {
|
|||
// registerLog edit log operation
|
||||
|
||||
//Node Label Operations
|
||||
registerLog(NODE_LABEL_STORE, AddClusterLabelOp.OPCODE, AddClusterLabelOp.class);
|
||||
registerLog(NODE_LABEL_STORE, AddClusterLabelOp.OPCODE,
|
||||
AddClusterLabelOp.class);
|
||||
registerLog(NODE_LABEL_STORE, NodeToLabelOp.OPCODE, NodeToLabelOp.class);
|
||||
registerLog(NODE_LABEL_STORE, RemoveClusterLabelOp.OPCODE, RemoveClusterLabelOp.class);
|
||||
registerLog(NODE_LABEL_STORE, RemoveClusterLabelOp.OPCODE,
|
||||
RemoveClusterLabelOp.class);
|
||||
|
||||
//NodeAttibute operation
|
||||
registerLog(NODE_ATTRIBUTE, AddNodeToAttributeLogOp.OPCODE, AddNodeToAttributeLogOp.class);
|
||||
registerLog(NODE_ATTRIBUTE, RemoveNodeToAttributeLogOp.OPCODE, RemoveNodeToAttributeLogOp.class);
|
||||
registerLog(NODE_ATTRIBUTE, ReplaceNodeToAttributeLogOp.OPCODE, ReplaceNodeToAttributeLogOp.class);
|
||||
registerLog(NODE_ATTRIBUTE, AddNodeToAttributeLogOp.OPCODE,
|
||||
AddNodeToAttributeLogOp.class);
|
||||
registerLog(NODE_ATTRIBUTE, RemoveNodeToAttributeLogOp.OPCODE,
|
||||
RemoveNodeToAttributeLogOp.class);
|
||||
registerLog(NODE_ATTRIBUTE, ReplaceNodeToAttributeLogOp.OPCODE,
|
||||
ReplaceNodeToAttributeLogOp.class);
|
||||
|
||||
// registerLog Mirror op
|
||||
|
||||
|
|
|
@ -30,7 +30,7 @@ import java.io.IOException;
|
|||
public interface StoreOp<W, R, M> {
|
||||
|
||||
/**
|
||||
* Write operation to persistent storage
|
||||
* Write operation to persistent storage.
|
||||
*
|
||||
* @param write write to be done to
|
||||
* @param mgr manager used by store
|
||||
|
@ -39,7 +39,7 @@ public interface StoreOp<W, R, M> {
|
|||
void write(W write, M mgr) throws IOException;
|
||||
|
||||
/**
|
||||
* Read and populate StoreOp
|
||||
* Read and populate StoreOp.
|
||||
*
|
||||
* @param read read to be done from
|
||||
* @param mgr manager used by store
|
||||
|
|
|
@ -57,8 +57,8 @@ public class AddClusterLabelOp
|
|||
mgr.addToCluserNodeLabels(labels);
|
||||
}
|
||||
|
||||
public AddClusterLabelOp setLabels(List<NodeLabel> labels) {
|
||||
this.labels = labels;
|
||||
public AddClusterLabelOp setLabels(List<NodeLabel> nodeLabels) {
|
||||
this.labels = nodeLabels;
|
||||
return this;
|
||||
}
|
||||
|
||||
|
|
|
@ -35,6 +35,9 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* NodeLabel Mirror Op class.
|
||||
*/
|
||||
public class NodeLabelMirrorOp
|
||||
extends FSNodeStoreLogOp<CommonNodeLabelsManager> {
|
||||
|
||||
|
|
|
@ -59,8 +59,8 @@ public class NodeToLabelOp
|
|||
}
|
||||
|
||||
public NodeToLabelOp setNodeToLabels(
|
||||
Map<NodeId, Set<String>> nodeToLabels) {
|
||||
this.nodeToLabels = nodeToLabels;
|
||||
Map<NodeId, Set<String>> nodeToLabelsList) {
|
||||
this.nodeToLabels = nodeToLabelsList;
|
||||
return this;
|
||||
}
|
||||
|
||||
|
|
|
@ -59,8 +59,8 @@ public class RemoveClusterLabelOp
|
|||
mgr.removeFromClusterNodeLabels(labels);
|
||||
}
|
||||
|
||||
public RemoveClusterLabelOp setLabels(Collection<String> labels) {
|
||||
this.labels = labels;
|
||||
public RemoveClusterLabelOp setLabels(Collection<String> nodeLabels) {
|
||||
this.labels = nodeLabels;
|
||||
return this;
|
||||
}
|
||||
|
||||
|
|
|
@ -28,6 +28,9 @@ import org.apache.hadoop.yarn.proto.YarnProtos.NodeToAttributesProto;
|
|||
import org.apache.hadoop.yarn.proto.YarnProtos.NodeToAttributesProtoOrBuilder;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeToAttributes;
|
||||
|
||||
/**
|
||||
* Proto class for Node to attributes mapping.
|
||||
*/
|
||||
public class NodeToAttributesPBImpl extends NodeToAttributes {
|
||||
private NodeToAttributesProto proto =
|
||||
NodeToAttributesProto.getDefaultInstance();
|
||||
|
|
|
@ -29,6 +29,9 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.AttributeMappingOperati
|
|||
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeToAttributes;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NodesToAttributesMappingRequest;
|
||||
|
||||
/**
|
||||
* Proto class for node to attributes mapping request.
|
||||
*/
|
||||
public class NodesToAttributesMappingRequestPBImpl
|
||||
extends NodesToAttributesMappingRequest {
|
||||
private NodesToAttributesMappingRequestProto proto =
|
||||
|
|
|
@ -21,6 +21,9 @@ package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
|
|||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.NodesToAttributesMappingResponseProto;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NodesToAttributesMappingResponse;
|
||||
|
||||
/**
|
||||
* Proto class for node to attributes mapping response.
|
||||
*/
|
||||
public class NodesToAttributesMappingResponsePBImpl
|
||||
extends NodesToAttributesMappingResponse {
|
||||
|
||||
|
|
|
@ -28,6 +28,9 @@ import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos
|
|||
import org.apache.hadoop.yarn.server.api.protocolrecords
|
||||
.RemoveFromClusterNodeLabelsRequest;
|
||||
|
||||
/**
|
||||
* Proto class to handlde RemoveFromClusterNodeLabels request.
|
||||
*/
|
||||
public class RemoveFromClusterNodeLabelsRequestPBImpl
|
||||
extends RemoveFromClusterNodeLabelsRequest {
|
||||
Set<String> labels;
|
||||
|
|
|
@ -465,7 +465,7 @@ public class TestPBImplRecords extends BasePBImplRecordsTest {
|
|||
generateByNewInstance(NodeToAttributeValue.class);
|
||||
generateByNewInstance(NodeAttributeInfo.class);
|
||||
generateByNewInstance(NodesToAttributesMappingRequest.class);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAllocateRequestPBImpl() throws Exception {
|
||||
|
|
|
@ -360,7 +360,7 @@ public class TestFileSystemNodeLabelsStore extends NodeLabelTestBase {
|
|||
Mockito.when(store.getFs().exists(Mockito.any(
|
||||
Path.class))).thenReturn(existsRetVal);
|
||||
store.init(conf, mgr);
|
||||
Mockito.verify(store.getFs(),Mockito.times(
|
||||
Mockito.verify(store.getFs(), Mockito.times(
|
||||
expectedNumOfCalls)).mkdirs(Mockito.any(Path
|
||||
.class));
|
||||
}
|
||||
|
|
|
@ -20,12 +20,16 @@ package org.apache.hadoop.yarn.nodelabels;
|
|||
import static org.junit.Assert.fail;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
* Test class to verify node label util ops.
|
||||
*/
|
||||
public class TestNodeLabelUtil {
|
||||
|
||||
@Test
|
||||
public void testAttributeValueAddition() {
|
||||
String[] values =
|
||||
new String[] {"1_8", "1.8", "ABZ", "ABZ", "az", "a-z","a_z", "123456789"};
|
||||
new String[]{"1_8", "1.8", "ABZ", "ABZ", "az", "a-z", "a_z",
|
||||
"123456789"};
|
||||
for (String val : values) {
|
||||
try {
|
||||
NodeLabelUtil.checkAndThrowAttributeValue(val);
|
||||
|
@ -34,7 +38,7 @@ public class TestNodeLabelUtil {
|
|||
}
|
||||
}
|
||||
|
||||
String[] invalidVals = new String[] {"_18","1,8","1/5",".15","1\\5"};
|
||||
String[] invalidVals = new String[]{"_18", "1,8", "1/5", ".15", "1\\5"};
|
||||
for (String val : invalidVals) {
|
||||
try {
|
||||
NodeLabelUtil.checkAndThrowAttributeValue(val);
|
||||
|
|
|
@ -189,8 +189,8 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
|
|||
|
||||
private HashSet<ApplicationId> applicationMap = new HashSet<>();
|
||||
private HashSet<ApplicationId> keepContainerOnUams = new HashSet<>();
|
||||
private HashMap<ApplicationAttemptId, List<ContainerId>> applicationContainerIdMap =
|
||||
new HashMap<ApplicationAttemptId, List<ContainerId>>();
|
||||
private HashMap<ApplicationAttemptId,
|
||||
List<ContainerId>> applicationContainerIdMap = new HashMap<>();
|
||||
private AtomicInteger containerIndex = new AtomicInteger(0);
|
||||
private Configuration conf;
|
||||
private int subClusterId;
|
||||
|
@ -495,7 +495,7 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
|
|||
if (request.getApplicationSubmissionContext().getUnmanagedAM()
|
||||
|| request.getApplicationSubmissionContext()
|
||||
.getKeepContainersAcrossApplicationAttempts()) {
|
||||
keepContainerOnUams.add(appId);
|
||||
keepContainerOnUams.add(appId);
|
||||
}
|
||||
return SubmitApplicationResponse.newInstance();
|
||||
}
|
||||
|
@ -921,8 +921,9 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
|
|||
}
|
||||
|
||||
@Override
|
||||
public NodesToAttributesMappingResponse mapAttributesToNodes(NodesToAttributesMappingRequest request)
|
||||
throws YarnException, IOException {
|
||||
public NodesToAttributesMappingResponse mapAttributesToNodes(
|
||||
NodesToAttributesMappingRequest request)
|
||||
throws YarnException, IOException {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,8 +19,6 @@ package org.apache.hadoop.yarn.server.nodemanager.nodelabels;
|
|||
|
||||
import org.apache.hadoop.yarn.api.records.NodeAttribute;
|
||||
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* Abstract class which will be responsible for fetching the node attributes.
|
||||
*
|
||||
|
|
|
@ -180,6 +180,9 @@ public class TestNodeStatusUpdaterForLabels extends NodeLabelTestBase {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A dummy NodeLabelsProvider class for tests.
|
||||
*/
|
||||
public static class DummyNodeLabelsProvider extends NodeLabelsProvider {
|
||||
|
||||
public DummyNodeLabelsProvider() {
|
||||
|
|
|
@ -515,7 +515,6 @@ public class ResourceTrackerService extends AbstractService implements
|
|||
* 4. Send healthStatus to RMNode
|
||||
* 5. Update node's labels if distributed Node Labels configuration is enabled
|
||||
*/
|
||||
|
||||
NodeId nodeId = remoteNodeStatus.getNodeId();
|
||||
|
||||
// 1. Check if it's a valid (i.e. not excluded) node, if not, see if it is
|
||||
|
|
|
@ -163,8 +163,8 @@ public class NodeAttributesManagerImpl extends NodeAttributesManager {
|
|||
StringBuilder logMsg = new StringBuilder(op.name());
|
||||
logMsg.append(" attributes on nodes:");
|
||||
// do update labels from nodes
|
||||
for (Entry<String, Map<NodeAttribute, AttributeValue>> entry : nodeAttributeMapping
|
||||
.entrySet()) {
|
||||
for (Entry<String, Map<NodeAttribute, AttributeValue>> entry :
|
||||
nodeAttributeMapping.entrySet()) {
|
||||
String nodeHost = entry.getKey();
|
||||
Map<NodeAttribute, AttributeValue> attributes = entry.getValue();
|
||||
|
||||
|
@ -400,7 +400,8 @@ public class NodeAttributesManagerImpl extends NodeAttributesManager {
|
|||
}
|
||||
|
||||
@Override
|
||||
public Map<NodeAttributeKey, Map<String, AttributeValue>> getAttributesToNodes(
|
||||
public Map<NodeAttributeKey,
|
||||
Map<String, AttributeValue>> getAttributesToNodes(
|
||||
Set<NodeAttributeKey> attributes) {
|
||||
try {
|
||||
readLock.lock();
|
||||
|
|
|
@ -41,7 +41,8 @@ public class NodeAttributesStoreEvent
|
|||
this.operation = operation;
|
||||
}
|
||||
|
||||
public Map<String, Map<NodeAttribute, AttributeValue>> getNodeAttributeMappingList() {
|
||||
public Map<String,
|
||||
Map<NodeAttribute, AttributeValue>> getNodeAttributeMappingList() {
|
||||
return nodeAttributeMapping;
|
||||
}
|
||||
|
||||
|
|
|
@ -285,7 +285,7 @@ public class MockNodes {
|
|||
public Map<String, Long> getAllocationTagsWithCount() {
|
||||
return null;
|
||||
}
|
||||
|
||||
|
||||
public void setNodeAttributes(String prefix,
|
||||
Set<NodeAttribute> nodeAttributes) {
|
||||
|
||||
|
|
|
@ -36,6 +36,9 @@ import java.util.HashMap;
|
|||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* Test class for FileSystemNodeAttributeStore.
|
||||
*/
|
||||
public class TestFileSystemNodeAttributeStore {
|
||||
|
||||
private MockNodeAttrbuteManager mgr = null;
|
||||
|
|
Loading…
Reference in New Issue