YARN-2740. Fix NodeLabelsManager to properly handle node label modifications when distributed node label configuration enabled. (Naganarasimha G R via wangda)
This commit is contained in:
parent
9fc32c5c4d
commit
db1b674b50
|
@ -265,6 +265,9 @@ Release 2.8.0 - UNRELEASED
|
|||
YARN-3530. ATS throws exception on trying to filter results without otherinfo.
|
||||
(zhijie shen via xgong)
|
||||
|
||||
YARN-2740. Fix NodeLabelsManager to properly handle node label modifications
|
||||
when distributed node label configuration enabled. (Naganarasimha G R via wangda)
|
||||
|
||||
Release 2.7.1 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -1779,6 +1779,12 @@ public class YarnConfiguration extends Configuration {
|
|||
public static final String DEFAULT_NODELABEL_CONFIGURATION_TYPE =
|
||||
CENTALIZED_NODELABEL_CONFIGURATION_TYPE;
|
||||
|
||||
@Private
|
||||
public static boolean isDistributedNodeLabelConfiguration(Configuration conf) {
|
||||
return DISTRIBUTED_NODELABEL_CONFIGURATION_TYPE.equals(conf.get(
|
||||
NODELABEL_CONFIGURATION_TYPE, DEFAULT_NODELABEL_CONFIGURATION_TYPE));
|
||||
}
|
||||
|
||||
public YarnConfiguration() {
|
||||
super();
|
||||
}
|
||||
|
|
|
@ -97,6 +97,8 @@ public class CommonNodeLabelsManager extends AbstractService {
|
|||
protected NodeLabelsStore store;
|
||||
private boolean nodeLabelsEnabled = false;
|
||||
|
||||
private boolean isDistributedNodeLabelConfiguration = false;
|
||||
|
||||
/**
|
||||
* A <code>Host</code> can have multiple <code>Node</code>s
|
||||
*/
|
||||
|
@ -213,6 +215,10 @@ public class CommonNodeLabelsManager extends AbstractService {
|
|||
nodeLabelsEnabled =
|
||||
conf.getBoolean(YarnConfiguration.NODE_LABELS_ENABLED,
|
||||
YarnConfiguration.DEFAULT_NODE_LABELS_ENABLED);
|
||||
|
||||
isDistributedNodeLabelConfiguration =
|
||||
YarnConfiguration.isDistributedNodeLabelConfiguration(conf);
|
||||
|
||||
if (nodeLabelsEnabled) {
|
||||
initNodeLabelStore(conf);
|
||||
}
|
||||
|
@ -223,7 +229,7 @@ public class CommonNodeLabelsManager extends AbstractService {
|
|||
protected void initNodeLabelStore(Configuration conf) throws Exception {
|
||||
this.store = new FileSystemNodeLabelsStore(this);
|
||||
this.store.init(conf);
|
||||
this.store.recover();
|
||||
this.store.recover(isDistributedNodeLabelConfiguration);
|
||||
}
|
||||
|
||||
// for UT purpose
|
||||
|
@ -613,7 +619,10 @@ public class CommonNodeLabelsManager extends AbstractService {
|
|||
}
|
||||
}
|
||||
|
||||
if (null != dispatcher) {
|
||||
if (null != dispatcher && !isDistributedNodeLabelConfiguration) {
|
||||
// In case of DistributedNodeLabelConfiguration, no need to save the the
|
||||
// NodeLabels Mapping to the back-end store, as on RM restart/failover
|
||||
// NodeLabels are collected from NM through Register/Heartbeat again
|
||||
dispatcher.getEventHandler().handle(
|
||||
new UpdateNodeToLabelsMappingsEvent(newNMToLabels));
|
||||
}
|
||||
|
@ -799,8 +808,10 @@ public class CommonNodeLabelsManager extends AbstractService {
|
|||
readLock.lock();
|
||||
List<NodeLabel> nodeLabels = new ArrayList<>();
|
||||
for (RMNodeLabel label : labelCollections.values()) {
|
||||
nodeLabels.add(NodeLabel.newInstance(label.getLabelName(),
|
||||
label.getIsExclusive()));
|
||||
if (!label.getLabelName().equals(NO_LABEL)) {
|
||||
nodeLabels.add(NodeLabel.newInstance(label.getLabelName(),
|
||||
label.getIsExclusive()));
|
||||
}
|
||||
}
|
||||
return nodeLabels;
|
||||
} finally {
|
||||
|
@ -824,7 +835,6 @@ public class CommonNodeLabelsManager extends AbstractService {
|
|||
readLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private void checkAndThrowLabelName(String label) throws IOException {
|
||||
if (label == null || label.isEmpty() || label.length() > MAX_LABEL_LENGTH) {
|
||||
|
|
|
@ -154,8 +154,12 @@ public class FileSystemNodeLabelsStore extends NodeLabelsStore {
|
|||
ensureCloseEditlogFile();
|
||||
}
|
||||
|
||||
/* (non-Javadoc)
|
||||
* @see org.apache.hadoop.yarn.nodelabels.NodeLabelsStore#recover(boolean)
|
||||
*/
|
||||
@Override
|
||||
public void recover() throws YarnException, IOException {
|
||||
public void recover(boolean ignoreNodeToLabelsMappings) throws YarnException,
|
||||
IOException {
|
||||
/*
|
||||
* Steps of recover
|
||||
* 1) Read from last mirror (from mirror or mirror.old)
|
||||
|
@ -222,7 +226,15 @@ public class FileSystemNodeLabelsStore extends NodeLabelsStore {
|
|||
new ReplaceLabelsOnNodeRequestPBImpl(
|
||||
ReplaceLabelsOnNodeRequestProto.parseDelimitedFrom(is))
|
||||
.getNodeToLabels();
|
||||
mgr.replaceLabelsOnNode(map);
|
||||
if (!ignoreNodeToLabelsMappings) {
|
||||
/*
|
||||
* In case of Distributed NodeLabels setup,
|
||||
* ignoreNodeToLabelsMappings will be set to true and recover will
|
||||
* be invoked. As RM will collect the node labels from NM through
|
||||
* registration/HB
|
||||
*/
|
||||
mgr.replaceLabelsOnNode(map);
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -56,9 +56,18 @@ public abstract class NodeLabelsStore implements Closeable {
|
|||
throws IOException;
|
||||
|
||||
/**
|
||||
* Recover labels and node to labels mappings from store
|
||||
* Recover labels and node to labels mappings from store, but if
|
||||
* ignoreNodeToLabelsMappings is true then node to labels mappings should not
|
||||
* be recovered. In case of Distributed NodeLabels setup
|
||||
* ignoreNodeToLabelsMappings will be set to true and recover will be invoked
|
||||
* as RM will collect the node labels from NM through registration/HB
|
||||
*
|
||||
* @param ignoreNodeToLabelsMappings
|
||||
* @throws IOException
|
||||
* @throws YarnException
|
||||
*/
|
||||
public abstract void recover() throws IOException, YarnException;
|
||||
public abstract void recover(boolean ignoreNodeToLabelsMappings)
|
||||
throws IOException, YarnException;
|
||||
|
||||
public void init(Configuration conf) throws Exception {}
|
||||
|
||||
|
|
|
@ -39,7 +39,8 @@ public class DummyCommonNodeLabelsManager extends CommonNodeLabelsManager {
|
|||
this.store = new NodeLabelsStore(this) {
|
||||
|
||||
@Override
|
||||
public void recover() throws IOException {
|
||||
public void recover(boolean ignoreNodeToLabelsMappings)
|
||||
throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -554,4 +554,29 @@ public class TestCommonNodeLabelsManager extends NodeLabelTestBase {
|
|||
Assert.assertTrue(expectedAddedLabelNames.contains(label.getName()));
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 5000)
|
||||
public void testReplaceLabelsOnNodeInDistributedMode() throws Exception {
|
||||
//create new DummyCommonNodeLabelsManager than the one got from @before
|
||||
mgr.stop();
|
||||
mgr = new DummyCommonNodeLabelsManager();
|
||||
Configuration conf = new YarnConfiguration();
|
||||
conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true);
|
||||
conf.set(YarnConfiguration.NODELABEL_CONFIGURATION_TYPE,
|
||||
YarnConfiguration.DISTRIBUTED_NODELABEL_CONFIGURATION_TYPE);
|
||||
|
||||
mgr.init(conf);
|
||||
mgr.start();
|
||||
|
||||
mgr.addToCluserNodeLabelsWithDefaultExclusivity(toSet("p1", "p2", "p3"));
|
||||
mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("n1"), toSet("p1")));
|
||||
Set<String> labelsByNode = mgr.getLabelsByNode(toNodeId("n1"));
|
||||
|
||||
Assert.assertNull(
|
||||
"Labels are not expected to be written to the NodeLabelStore",
|
||||
mgr.lastNodeToLabels);
|
||||
Assert.assertNotNull("Updated labels should be available from the Mgr",
|
||||
labelsByNode);
|
||||
Assert.assertTrue(labelsByNode.contains("p1"));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -144,6 +144,40 @@ public class TestFileSystemNodeLabelsStore extends NodeLabelTestBase {
|
|||
mgr.stop();
|
||||
}
|
||||
|
||||
@SuppressWarnings({ "unchecked", "rawtypes" })
|
||||
@Test(timeout = 10000)
|
||||
public void testRecoverWithDistributedNodeLabels() throws Exception {
|
||||
mgr.addToCluserNodeLabelsWithDefaultExclusivity(toSet("p1", "p2", "p3"));
|
||||
mgr.addToCluserNodeLabelsWithDefaultExclusivity(toSet("p4"));
|
||||
mgr.addToCluserNodeLabelsWithDefaultExclusivity(toSet("p5", "p6"));
|
||||
mgr.replaceLabelsOnNode((Map) ImmutableMap.of(toNodeId("n1"), toSet("p1"),
|
||||
toNodeId("n2"), toSet("p2")));
|
||||
mgr.replaceLabelsOnNode((Map) ImmutableMap.of(toNodeId("n3"), toSet("p3"),
|
||||
toNodeId("n4"), toSet("p4"), toNodeId("n5"), toSet("p5"),
|
||||
toNodeId("n6"), toSet("p6"), toNodeId("n7"), toSet("p6")));
|
||||
|
||||
mgr.removeFromClusterNodeLabels(toSet("p1"));
|
||||
mgr.removeFromClusterNodeLabels(Arrays.asList("p3", "p5"));
|
||||
mgr.stop();
|
||||
|
||||
mgr = new MockNodeLabelManager();
|
||||
Configuration cf = new Configuration(conf);
|
||||
cf.set(YarnConfiguration.NODELABEL_CONFIGURATION_TYPE,
|
||||
YarnConfiguration.DISTRIBUTED_NODELABEL_CONFIGURATION_TYPE);
|
||||
mgr.init(cf);
|
||||
|
||||
// check variables
|
||||
Assert.assertEquals(3, mgr.getClusterNodeLabels().size());
|
||||
Assert.assertTrue(mgr.getClusterNodeLabelNames().containsAll(
|
||||
Arrays.asList("p2", "p4", "p6")));
|
||||
|
||||
Assert.assertTrue("During recovery in distributed node-labels setup, "
|
||||
+ "node to labels mapping should not be recovered ", mgr
|
||||
.getNodeLabels().size() == 0);
|
||||
|
||||
mgr.stop();
|
||||
}
|
||||
|
||||
@SuppressWarnings({ "unchecked", "rawtypes" })
|
||||
@Test(timeout = 10000)
|
||||
public void testEditlogRecover() throws Exception {
|
||||
|
|
|
@ -112,6 +112,9 @@ public class AdminService extends CompositeService implements
|
|||
private final RecordFactory recordFactory =
|
||||
RecordFactoryProvider.getRecordFactory(null);
|
||||
|
||||
@VisibleForTesting
|
||||
boolean isDistributedNodeLabelConfiguration = false;
|
||||
|
||||
public AdminService(ResourceManager rm, RMContext rmContext) {
|
||||
super(AdminService.class.getName());
|
||||
this.rm = rm;
|
||||
|
@ -141,6 +144,10 @@ public class AdminService extends CompositeService implements
|
|||
YarnConfiguration.DEFAULT_YARN_ADMIN_ACL)), UserGroupInformation
|
||||
.getCurrentUser());
|
||||
rmId = conf.get(YarnConfiguration.RM_HA_ID);
|
||||
|
||||
isDistributedNodeLabelConfiguration =
|
||||
YarnConfiguration.isDistributedNodeLabelConfiguration(conf);
|
||||
|
||||
super.serviceInit(conf);
|
||||
}
|
||||
|
||||
|
@ -637,32 +644,35 @@ public class AdminService extends CompositeService implements
|
|||
@Override
|
||||
public RemoveFromClusterNodeLabelsResponse removeFromClusterNodeLabels(
|
||||
RemoveFromClusterNodeLabelsRequest request) throws YarnException, IOException {
|
||||
String argName = "removeFromClusterNodeLabels";
|
||||
String operation = "removeFromClusterNodeLabels";
|
||||
final String msg = "remove labels.";
|
||||
UserGroupInformation user = checkAcls(argName);
|
||||
|
||||
checkRMStatus(user.getShortUserName(), argName, msg);
|
||||
UserGroupInformation user = checkAcls(operation);
|
||||
|
||||
checkRMStatus(user.getShortUserName(), operation, msg);
|
||||
|
||||
RemoveFromClusterNodeLabelsResponse response =
|
||||
recordFactory.newRecordInstance(RemoveFromClusterNodeLabelsResponse.class);
|
||||
try {
|
||||
rmContext.getNodeLabelManager().removeFromClusterNodeLabels(request.getNodeLabels());
|
||||
RMAuditLogger
|
||||
.logSuccess(user.getShortUserName(), argName, "AdminService");
|
||||
.logSuccess(user.getShortUserName(), operation, "AdminService");
|
||||
return response;
|
||||
} catch (IOException ioe) {
|
||||
throw logAndWrapException(ioe, user.getShortUserName(), argName, msg);
|
||||
throw logAndWrapException(ioe, user.getShortUserName(), operation, msg);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ReplaceLabelsOnNodeResponse replaceLabelsOnNode(
|
||||
ReplaceLabelsOnNodeRequest request) throws YarnException, IOException {
|
||||
String argName = "replaceLabelsOnNode";
|
||||
String operation = "replaceLabelsOnNode";
|
||||
final String msg = "set node to labels.";
|
||||
UserGroupInformation user = checkAcls(argName);
|
||||
|
||||
checkRMStatus(user.getShortUserName(), argName, msg);
|
||||
checkAndThrowIfDistributedNodeLabelConfEnabled(operation);
|
||||
UserGroupInformation user = checkAcls(operation);
|
||||
|
||||
checkRMStatus(user.getShortUserName(), operation, msg);
|
||||
|
||||
ReplaceLabelsOnNodeResponse response =
|
||||
recordFactory.newRecordInstance(ReplaceLabelsOnNodeResponse.class);
|
||||
|
@ -670,30 +680,41 @@ public class AdminService extends CompositeService implements
|
|||
rmContext.getNodeLabelManager().replaceLabelsOnNode(
|
||||
request.getNodeToLabels());
|
||||
RMAuditLogger
|
||||
.logSuccess(user.getShortUserName(), argName, "AdminService");
|
||||
.logSuccess(user.getShortUserName(), operation, "AdminService");
|
||||
return response;
|
||||
} catch (IOException ioe) {
|
||||
throw logAndWrapException(ioe, user.getShortUserName(), argName, msg);
|
||||
throw logAndWrapException(ioe, user.getShortUserName(), operation, msg);
|
||||
}
|
||||
}
|
||||
|
||||
private void checkRMStatus(String user, String argName, String msg)
|
||||
private void checkRMStatus(String user, String operation, String msg)
|
||||
throws StandbyException {
|
||||
if (!isRMActive()) {
|
||||
RMAuditLogger.logFailure(user, argName, "",
|
||||
RMAuditLogger.logFailure(user, operation, "",
|
||||
"AdminService", "ResourceManager is not active. Can not " + msg);
|
||||
throwStandbyException();
|
||||
}
|
||||
}
|
||||
|
||||
private YarnException logAndWrapException(Exception exception, String user,
|
||||
String argName, String msg) throws YarnException {
|
||||
String operation, String msg) throws YarnException {
|
||||
LOG.warn("Exception " + msg, exception);
|
||||
RMAuditLogger.logFailure(user, argName, "",
|
||||
RMAuditLogger.logFailure(user, operation, "",
|
||||
"AdminService", "Exception " + msg);
|
||||
return RPCUtil.getRemoteException(exception);
|
||||
}
|
||||
|
||||
private void checkAndThrowIfDistributedNodeLabelConfEnabled(String operation)
|
||||
throws YarnException {
|
||||
if (isDistributedNodeLabelConfiguration) {
|
||||
String msg =
|
||||
String.format("Error when invoke method=%s because of "
|
||||
+ "distributed node label configuration enabled.", operation);
|
||||
LOG.error(msg);
|
||||
throw RPCUtil.getRemoteException(new IOException(msg));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public CheckForDecommissioningNodesResponse checkForDecommissioningNodes(
|
||||
CheckForDecommissioningNodesRequest checkForDecommissioningNodesRequest)
|
||||
|
|
|
@ -104,7 +104,7 @@ public class ResourceTrackerService extends AbstractService implements
|
|||
private int minAllocMb;
|
||||
private int minAllocVcores;
|
||||
|
||||
private boolean isDistributesNodeLabelsConf;
|
||||
private boolean isDistributedNodeLabelsConf;
|
||||
|
||||
static {
|
||||
resync.setNodeAction(NodeAction.RESYNC);
|
||||
|
@ -155,13 +155,8 @@ public class ResourceTrackerService extends AbstractService implements
|
|||
YarnConfiguration.RM_NODEMANAGER_MINIMUM_VERSION,
|
||||
YarnConfiguration.DEFAULT_RM_NODEMANAGER_MINIMUM_VERSION);
|
||||
|
||||
String nodeLabelConfigurationType =
|
||||
conf.get(YarnConfiguration.NODELABEL_CONFIGURATION_TYPE,
|
||||
YarnConfiguration.DEFAULT_NODELABEL_CONFIGURATION_TYPE);
|
||||
|
||||
isDistributesNodeLabelsConf =
|
||||
YarnConfiguration.DISTRIBUTED_NODELABEL_CONFIGURATION_TYPE
|
||||
.equals(nodeLabelConfigurationType);
|
||||
isDistributedNodeLabelsConf =
|
||||
YarnConfiguration.isDistributedNodeLabelConfiguration(conf);
|
||||
|
||||
super.serviceInit(conf);
|
||||
}
|
||||
|
@ -352,7 +347,7 @@ public class ResourceTrackerService extends AbstractService implements
|
|||
|
||||
// Update node's labels to RM's NodeLabelManager.
|
||||
Set<String> nodeLabels = request.getNodeLabels();
|
||||
if (isDistributesNodeLabelsConf && nodeLabels != null) {
|
||||
if (isDistributedNodeLabelsConf && nodeLabels != null) {
|
||||
try {
|
||||
updateNodeLabelsFromNMReport(nodeLabels, nodeId);
|
||||
response.setAreNodeLabelsAcceptedByRM(true);
|
||||
|
@ -470,7 +465,7 @@ public class ResourceTrackerService extends AbstractService implements
|
|||
this.rmContext.getDispatcher().getEventHandler().handle(nodeStatusEvent);
|
||||
|
||||
// 5. Update node's labels to RM's NodeLabelManager.
|
||||
if (isDistributesNodeLabelsConf && request.getNodeLabels() != null) {
|
||||
if (isDistributedNodeLabelsConf && request.getNodeLabels() != null) {
|
||||
try {
|
||||
updateNodeLabelsFromNMReport(request.getNodeLabels(), nodeId);
|
||||
nodeHeartBeatResponse.setAreNodeLabelsAcceptedByRM(true);
|
||||
|
|
|
@ -149,6 +149,7 @@ import org.apache.hadoop.yarn.webapp.BadRequestException;
|
|||
import org.apache.hadoop.yarn.webapp.NotFoundException;
|
||||
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.inject.Inject;
|
||||
import com.google.inject.Singleton;
|
||||
|
||||
|
@ -165,6 +166,9 @@ public class RMWebServices {
|
|||
private final Configuration conf;
|
||||
private @Context HttpServletResponse response;
|
||||
|
||||
@VisibleForTesting
|
||||
boolean isDistributedNodeLabelConfiguration = false;
|
||||
|
||||
public final static String DELEGATION_TOKEN_HEADER =
|
||||
"Hadoop-YARN-RM-Delegation-Token";
|
||||
|
||||
|
@ -172,6 +176,19 @@ public class RMWebServices {
|
|||
public RMWebServices(final ResourceManager rm, Configuration conf) {
|
||||
this.rm = rm;
|
||||
this.conf = conf;
|
||||
isDistributedNodeLabelConfiguration =
|
||||
YarnConfiguration.isDistributedNodeLabelConfiguration(conf);
|
||||
}
|
||||
|
||||
private void checkAndThrowIfDistributedNodeLabelConfEnabled(String operation)
|
||||
throws IOException {
|
||||
if (isDistributedNodeLabelConfiguration) {
|
||||
String msg =
|
||||
String.format("Error when invoke method=%s because of "
|
||||
+ "distributed node label configuration enabled.", operation);
|
||||
LOG.error(msg);
|
||||
throw new IOException(msg);
|
||||
}
|
||||
}
|
||||
|
||||
RMWebServices(ResourceManager rm, Configuration conf,
|
||||
|
@ -816,38 +833,64 @@ public class RMWebServices {
|
|||
@POST
|
||||
@Path("/replace-node-to-labels")
|
||||
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
|
||||
public Response replaceLabelsOnNodes(
|
||||
final NodeToLabelsInfo newNodeToLabels,
|
||||
@Context HttpServletRequest hsr)
|
||||
throws IOException {
|
||||
public Response replaceLabelsOnNodes(final NodeToLabelsInfo newNodeToLabels,
|
||||
@Context HttpServletRequest hsr) throws IOException {
|
||||
Map<NodeId, Set<String>> nodeIdToLabels =
|
||||
new HashMap<NodeId, Set<String>>();
|
||||
|
||||
for (Map.Entry<String, NodeLabelsInfo> nitle : newNodeToLabels
|
||||
.getNodeToLabels().entrySet()) {
|
||||
nodeIdToLabels.put(
|
||||
ConverterUtils.toNodeIdWithDefaultPort(nitle.getKey()),
|
||||
new HashSet<String>(nitle.getValue().getNodeLabels()));
|
||||
}
|
||||
|
||||
return replaceLabelsOnNode(nodeIdToLabels, hsr, "/replace-node-to-labels");
|
||||
}
|
||||
|
||||
@POST
|
||||
@Path("/nodes/{nodeId}/replace-labels")
|
||||
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
|
||||
public Response replaceLabelsOnNode(NodeLabelsInfo newNodeLabelsInfo,
|
||||
@Context HttpServletRequest hsr, @PathParam("nodeId") String nodeId)
|
||||
throws Exception {
|
||||
NodeId nid = ConverterUtils.toNodeIdWithDefaultPort(nodeId);
|
||||
Map<NodeId, Set<String>> newLabelsForNode =
|
||||
new HashMap<NodeId, Set<String>>();
|
||||
newLabelsForNode.put(nid,
|
||||
new HashSet<String>(newNodeLabelsInfo.getNodeLabels()));
|
||||
|
||||
return replaceLabelsOnNode(newLabelsForNode, hsr, "/nodes/nodeid/replace-labels");
|
||||
}
|
||||
|
||||
private Response replaceLabelsOnNode(
|
||||
Map<NodeId, Set<String>> newLabelsForNode, HttpServletRequest hsr,
|
||||
String operation) throws IOException {
|
||||
init();
|
||||
|
||||
|
||||
checkAndThrowIfDistributedNodeLabelConfEnabled("replaceLabelsOnNode");
|
||||
|
||||
UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true);
|
||||
if (callerUGI == null) {
|
||||
String msg = "Unable to obtain user name, user not authenticated for"
|
||||
+ " post to .../replace-node-to-labels";
|
||||
String msg =
|
||||
"Unable to obtain user name, user not authenticated for"
|
||||
+ " post to ..." + operation;
|
||||
throw new AuthorizationException(msg);
|
||||
}
|
||||
if (!rm.getRMContext().getNodeLabelManager().checkAccess(callerUGI)) {
|
||||
String msg = "User " + callerUGI.getShortUserName() + " not authorized"
|
||||
+ " for post to .../replace-node-to-labels ";
|
||||
throw new AuthorizationException(msg);
|
||||
}
|
||||
|
||||
Map<NodeId, Set<String>> nodeIdToLabels =
|
||||
new HashMap<NodeId, Set<String>>();
|
||||
|
||||
for (Map.Entry<String, NodeLabelsInfo> nitle :
|
||||
newNodeToLabels.getNodeToLabels().entrySet()) {
|
||||
nodeIdToLabels.put(ConverterUtils.toNodeIdWithDefaultPort(nitle.getKey()),
|
||||
new HashSet<String>(nitle.getValue().getNodeLabels()));
|
||||
if (!rm.getRMContext().getNodeLabelManager().checkAccess(callerUGI)) {
|
||||
String msg =
|
||||
"User " + callerUGI.getShortUserName() + " not authorized"
|
||||
+ " for post to ..." + operation;
|
||||
throw new AuthorizationException(msg);
|
||||
}
|
||||
|
||||
rm.getRMContext().getNodeLabelManager().replaceLabelsOnNode(nodeIdToLabels);
|
||||
|
||||
rm.getRMContext().getNodeLabelManager()
|
||||
.replaceLabelsOnNode(newLabelsForNode);
|
||||
|
||||
return Response.status(Status.OK).build();
|
||||
}
|
||||
|
||||
|
||||
@GET
|
||||
@Path("/get-node-labels")
|
||||
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
|
||||
|
@ -897,7 +940,7 @@ public class RMWebServices {
|
|||
@Context HttpServletRequest hsr)
|
||||
throws Exception {
|
||||
init();
|
||||
|
||||
|
||||
UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true);
|
||||
if (callerUGI == null) {
|
||||
String msg = "Unable to obtain user name, user not authenticated for"
|
||||
|
@ -931,40 +974,6 @@ public class RMWebServices {
|
|||
rm.getRMContext().getNodeLabelManager().getLabelsOnNode(nid));
|
||||
|
||||
}
|
||||
|
||||
@POST
|
||||
@Path("/nodes/{nodeId}/replace-labels")
|
||||
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
|
||||
public Response replaceLabelsOnNode(NodeLabelsInfo newNodeLabelsInfo,
|
||||
@Context HttpServletRequest hsr, @PathParam("nodeId") String nodeId)
|
||||
throws Exception {
|
||||
init();
|
||||
|
||||
UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true);
|
||||
if (callerUGI == null) {
|
||||
String msg = "Unable to obtain user name, user not authenticated for"
|
||||
+ " post to .../nodes/nodeid/replace-labels";
|
||||
throw new AuthorizationException(msg);
|
||||
}
|
||||
|
||||
if (!rm.getRMContext().getNodeLabelManager().checkAccess(callerUGI)) {
|
||||
String msg = "User " + callerUGI.getShortUserName() + " not authorized"
|
||||
+ " for post to .../nodes/nodeid/replace-labels";
|
||||
throw new AuthorizationException(msg);
|
||||
}
|
||||
|
||||
NodeId nid = ConverterUtils.toNodeIdWithDefaultPort(nodeId);
|
||||
|
||||
Map<NodeId, Set<String>> newLabelsForNode = new HashMap<NodeId,
|
||||
Set<String>>();
|
||||
|
||||
newLabelsForNode.put(nid, new HashSet<String>(newNodeLabelsInfo.getNodeLabels()));
|
||||
|
||||
rm.getRMContext().getNodeLabelManager().replaceLabelsOnNode(newLabelsForNode);
|
||||
|
||||
return Response.status(Status.OK).build();
|
||||
|
||||
}
|
||||
|
||||
protected Response killApp(RMApp app, UserGroupInformation callerUGI,
|
||||
HttpServletRequest hsr) throws IOException, InterruptedException {
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.io.DataOutputStream;
|
||||
|
@ -44,6 +45,7 @@ import org.apache.hadoop.security.authorize.AccessControlList;
|
|||
import org.apache.hadoop.security.authorize.ProxyUsers;
|
||||
import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
|
||||
import org.apache.hadoop.yarn.api.records.DecommissionType;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.conf.HAUtil;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
|
@ -53,6 +55,9 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesRequest;
|
|||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeRequest;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
||||
import org.junit.After;
|
||||
|
@ -60,6 +65,8 @@ import org.junit.Assert;
|
|||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
|
||||
public class TestRMAdminService {
|
||||
|
||||
|
@ -754,6 +761,67 @@ public class TestRMAdminService {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testModifyLabelsOnNodesWithDistributedConfigurationDisabled()
|
||||
throws IOException, YarnException {
|
||||
// create RM and set it's ACTIVE
|
||||
MockRM rm = new MockRM();
|
||||
((RMContextImpl) rm.getRMContext())
|
||||
.setHAServiceState(HAServiceState.ACTIVE);
|
||||
RMNodeLabelsManager labelMgr = rm.rmContext.getNodeLabelManager();
|
||||
|
||||
// by default, distributed configuration for node label is disabled, this
|
||||
// should pass
|
||||
labelMgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x", "y"));
|
||||
rm.adminService.replaceLabelsOnNode(ReplaceLabelsOnNodeRequest
|
||||
.newInstance(ImmutableMap.of(NodeId.newInstance("host", 0),
|
||||
(Set<String>) ImmutableSet.of("x"))));
|
||||
rm.close();
|
||||
}
|
||||
|
||||
@Test(expected = YarnException.class)
|
||||
public void testModifyLabelsOnNodesWithDistributedConfigurationEnabled()
|
||||
throws IOException, YarnException {
|
||||
// create RM and set it's ACTIVE, and set distributed node label
|
||||
// configuration to true
|
||||
MockRM rm = new MockRM();
|
||||
rm.adminService.isDistributedNodeLabelConfiguration = true;
|
||||
|
||||
((RMContextImpl) rm.getRMContext())
|
||||
.setHAServiceState(HAServiceState.ACTIVE);
|
||||
RMNodeLabelsManager labelMgr = rm.rmContext.getNodeLabelManager();
|
||||
|
||||
// by default, distributed configuration for node label is disabled, this
|
||||
// should pass
|
||||
labelMgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x", "y"));
|
||||
rm.adminService.replaceLabelsOnNode(ReplaceLabelsOnNodeRequest
|
||||
.newInstance(ImmutableMap.of(NodeId.newInstance("host", 0),
|
||||
(Set<String>) ImmutableSet.of("x"))));
|
||||
rm.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRemoveClusterNodeLabelsWithDistributedConfigurationEnabled()
|
||||
throws IOException, YarnException {
|
||||
// create RM and set it's ACTIVE
|
||||
MockRM rm = new MockRM();
|
||||
((RMContextImpl) rm.getRMContext())
|
||||
.setHAServiceState(HAServiceState.ACTIVE);
|
||||
RMNodeLabelsManager labelMgr = rm.rmContext.getNodeLabelManager();
|
||||
rm.adminService.isDistributedNodeLabelConfiguration = true;
|
||||
|
||||
// by default, distributed configuration for node label is disabled, this
|
||||
// should pass
|
||||
labelMgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x", "y"));
|
||||
rm.adminService
|
||||
.removeFromClusterNodeLabels(RemoveFromClusterNodeLabelsRequest
|
||||
.newInstance((Set<String>) ImmutableSet.of("x")));
|
||||
|
||||
Set<String> clusterNodeLabels = labelMgr.getClusterNodeLabelNames();
|
||||
assertEquals(1,clusterNodeLabels.size());
|
||||
rm.close();
|
||||
}
|
||||
|
||||
private String writeConfigurationXML(Configuration conf, String confXMLName)
|
||||
throws IOException {
|
||||
DataOutputStream output = null;
|
||||
|
|
|
@ -40,7 +40,8 @@ public class NullRMNodeLabelsManager extends RMNodeLabelsManager {
|
|||
this.store = new NodeLabelsStore(this) {
|
||||
|
||||
@Override
|
||||
public void recover() throws IOException {
|
||||
public void recover(boolean ignoreNodeToLabelsMappings)
|
||||
throws IOException {
|
||||
// do nothing
|
||||
}
|
||||
|
||||
|
|
|
@ -51,6 +51,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.ClusterMetrics;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
|
||||
|
@ -623,6 +624,7 @@ public class TestRMWebServices extends JerseyTestBase {
|
|||
null, null, null, null, null);
|
||||
when(mockRM.getRMContext()).thenReturn(rmContext);
|
||||
when(mockRM.getClientRMService()).thenReturn(mockClientSvc);
|
||||
rmContext.setNodeLabelManager(mock(RMNodeLabelsManager.class));
|
||||
|
||||
RMWebServices webSvc = new RMWebServices(mockRM, new Configuration(),
|
||||
mock(HttpServletResponse.class));
|
||||
|
|
|
@ -19,10 +19,10 @@
|
|||
package org.apache.hadoop.yarn.server.resourcemanager.webapp;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.StringReader;
|
||||
import java.io.StringWriter;
|
||||
|
||||
import javax.ws.rs.core.MediaType;
|
||||
|
@ -51,7 +51,6 @@ import com.sun.jersey.api.client.ClientResponse;
|
|||
import com.sun.jersey.api.client.WebResource;
|
||||
import com.sun.jersey.api.json.JSONJAXBContext;
|
||||
import com.sun.jersey.api.json.JSONMarshaller;
|
||||
import com.sun.jersey.api.json.JSONUnmarshaller;
|
||||
import com.sun.jersey.core.util.MultivaluedMapImpl;
|
||||
import com.sun.jersey.guice.spi.container.servlet.GuiceContainer;
|
||||
import com.sun.jersey.test.framework.WebAppDescriptor;
|
||||
|
@ -66,13 +65,13 @@ public class TestRMWebServicesNodeLabels extends JerseyTestBase {
|
|||
|
||||
private String userName;
|
||||
private String notUserName;
|
||||
private RMWebServices rmWebService;
|
||||
|
||||
private Injector injector = Guice.createInjector(new ServletModule() {
|
||||
|
||||
@Override
|
||||
protected void configureServlets() {
|
||||
bind(JAXBContextResolver.class);
|
||||
bind(RMWebServices.class);
|
||||
bind(GenericExceptionHandler.class);
|
||||
try {
|
||||
userName = UserGroupInformation.getCurrentUser().getShortUserName();
|
||||
} catch (IOException ioe) {
|
||||
|
@ -83,6 +82,9 @@ public class TestRMWebServicesNodeLabels extends JerseyTestBase {
|
|||
conf = new YarnConfiguration();
|
||||
conf.set(YarnConfiguration.YARN_ADMIN_ACL, userName);
|
||||
rm = new MockRM(conf);
|
||||
rmWebService = new RMWebServices(rm,conf);
|
||||
bind(RMWebServices.class).toInstance(rmWebService);
|
||||
bind(GenericExceptionHandler.class);
|
||||
bind(ResourceManager.class).toInstance(rm);
|
||||
filter("/*").through(
|
||||
TestRMWebServicesAppsModification.TestRMCustomAuthFilter.class);
|
||||
|
@ -113,7 +115,6 @@ public class TestRMWebServicesNodeLabels extends JerseyTestBase {
|
|||
ClientResponse response;
|
||||
JSONObject json;
|
||||
JSONArray jarr;
|
||||
String responseString;
|
||||
|
||||
// Add a label
|
||||
response =
|
||||
|
@ -386,6 +387,93 @@ public class TestRMWebServicesNodeLabels extends JerseyTestBase {
|
|||
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
|
||||
String res = response.getEntity(String.class);
|
||||
assertTrue(res.equals("null"));
|
||||
|
||||
// Following test cases are to test replace when distributed node label
|
||||
// configuration is on
|
||||
// Reset for testing : add cluster labels
|
||||
response =
|
||||
r.path("ws")
|
||||
.path("v1")
|
||||
.path("cluster")
|
||||
.path("add-node-labels")
|
||||
.queryParam("user.name", userName)
|
||||
.accept(MediaType.APPLICATION_JSON)
|
||||
.entity("{\"nodeLabels\":[\"x\",\"y\"]}",
|
||||
MediaType.APPLICATION_JSON).post(ClientResponse.class);
|
||||
// Reset for testing : Add labels to a node
|
||||
response =
|
||||
r.path("ws").path("v1").path("cluster").path("nodes").path("nid:0")
|
||||
.path("replace-labels").queryParam("user.name", userName)
|
||||
.accept(MediaType.APPLICATION_JSON)
|
||||
.entity("{\"nodeLabels\": [\"y\"]}", MediaType.APPLICATION_JSON)
|
||||
.post(ClientResponse.class);
|
||||
LOG.info("posted node nodelabel");
|
||||
|
||||
//setting rmWebService for Distributed NodeLabel Configuration
|
||||
rmWebService.isDistributedNodeLabelConfiguration = true;
|
||||
|
||||
// Case1 : Replace labels using node-to-labels
|
||||
ntli = new NodeToLabelsInfo();
|
||||
nli = new NodeLabelsInfo();
|
||||
nli.getNodeLabels().add("x");
|
||||
ntli.getNodeToLabels().put("nid:0", nli);
|
||||
response =
|
||||
r.path("ws")
|
||||
.path("v1")
|
||||
.path("cluster")
|
||||
.path("replace-node-to-labels")
|
||||
.queryParam("user.name", userName)
|
||||
.accept(MediaType.APPLICATION_JSON)
|
||||
.entity(toJson(ntli, NodeToLabelsInfo.class),
|
||||
MediaType.APPLICATION_JSON).post(ClientResponse.class);
|
||||
|
||||
// Verify, using node-to-labels that previous operation has failed
|
||||
response =
|
||||
r.path("ws").path("v1").path("cluster").path("get-node-to-labels")
|
||||
.queryParam("user.name", userName)
|
||||
.accept(MediaType.APPLICATION_JSON).get(ClientResponse.class);
|
||||
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
|
||||
ntli = response.getEntity(NodeToLabelsInfo.class);
|
||||
nli = ntli.getNodeToLabels().get("nid:0");
|
||||
assertEquals(1, nli.getNodeLabels().size());
|
||||
assertFalse(nli.getNodeLabels().contains("x"));
|
||||
|
||||
// Case2 : failure to Replace labels using replace-labels
|
||||
response =
|
||||
r.path("ws").path("v1").path("cluster").path("nodes").path("nid:0")
|
||||
.path("replace-labels").queryParam("user.name", userName)
|
||||
.accept(MediaType.APPLICATION_JSON)
|
||||
.entity("{\"nodeLabels\": [\"x\"]}", MediaType.APPLICATION_JSON)
|
||||
.post(ClientResponse.class);
|
||||
LOG.info("posted node nodelabel");
|
||||
|
||||
// Verify, using node-to-labels that previous operation has failed
|
||||
response =
|
||||
r.path("ws").path("v1").path("cluster").path("get-node-to-labels")
|
||||
.queryParam("user.name", userName)
|
||||
.accept(MediaType.APPLICATION_JSON).get(ClientResponse.class);
|
||||
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
|
||||
ntli = response.getEntity(NodeToLabelsInfo.class);
|
||||
nli = ntli.getNodeToLabels().get("nid:0");
|
||||
assertEquals(1, nli.getNodeLabels().size());
|
||||
assertFalse(nli.getNodeLabels().contains("x"));
|
||||
|
||||
// Case3 : Remove cluster label should be successfull
|
||||
response =
|
||||
r.path("ws").path("v1").path("cluster")
|
||||
.path("remove-node-labels")
|
||||
.queryParam("user.name", userName)
|
||||
.accept(MediaType.APPLICATION_JSON)
|
||||
.entity("{\"nodeLabels\":\"x\"}", MediaType.APPLICATION_JSON)
|
||||
.post(ClientResponse.class);
|
||||
// Verify
|
||||
response =
|
||||
r.path("ws").path("v1").path("cluster")
|
||||
.path("get-node-labels").queryParam("user.name", userName)
|
||||
.accept(MediaType.APPLICATION_JSON).get(ClientResponse.class);
|
||||
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
|
||||
json = response.getEntity(JSONObject.class);
|
||||
assertEquals("y", json.getString("nodeLabels"));
|
||||
}
|
||||
|
||||
@SuppressWarnings("rawtypes")
|
||||
|
@ -396,13 +484,4 @@ public class TestRMWebServicesNodeLabels extends JerseyTestBase {
|
|||
jm.marshallToJSON(nsli, sw);
|
||||
return sw.toString();
|
||||
}
|
||||
|
||||
@SuppressWarnings({ "rawtypes", "unchecked" })
|
||||
private Object fromJson(String json, Class klass) throws Exception {
|
||||
StringReader sr = new StringReader(json);
|
||||
JSONJAXBContext ctx = new JSONJAXBContext(klass);
|
||||
JSONUnmarshaller jm = ctx.createJSONUnmarshaller();
|
||||
return jm.unmarshalFromJSON(sr, klass);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue