YARN-3971. Skip RMNodeLabelsManager#checkRemoveFromClusterNodeLabelsOfQueue on nodelabel recovery. (addendum patch). Contributed by Bibin A chundatt
(cherry picked from commit b4078bd17b
)
This commit is contained in:
parent
f33f0b697b
commit
155f25061e
|
@ -75,7 +75,9 @@ public class CommonNodeLabelsManager extends AbstractService {
|
||||||
private static final Pattern LABEL_PATTERN = Pattern
|
private static final Pattern LABEL_PATTERN = Pattern
|
||||||
.compile("^[0-9a-zA-Z][0-9a-zA-Z-_]*");
|
.compile("^[0-9a-zA-Z][0-9a-zA-Z-_]*");
|
||||||
public static final int WILDCARD_PORT = 0;
|
public static final int WILDCARD_PORT = 0;
|
||||||
|
// Flag to identify startup for removelabel
|
||||||
|
private boolean initNodeLabelStoreInProgress = false;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Error messages
|
* Error messages
|
||||||
*/
|
*/
|
||||||
|
@ -226,6 +228,13 @@ public class CommonNodeLabelsManager extends AbstractService {
|
||||||
labelCollections.put(NO_LABEL, new RMNodeLabel(NO_LABEL));
|
labelCollections.put(NO_LABEL, new RMNodeLabel(NO_LABEL));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the isStartup
|
||||||
|
*/
|
||||||
|
protected boolean isInitNodeLabelStoreInProgress() {
|
||||||
|
return initNodeLabelStoreInProgress;
|
||||||
|
}
|
||||||
|
|
||||||
boolean isCentralizedConfiguration() {
|
boolean isCentralizedConfiguration() {
|
||||||
return isCentralizedNodeLabelConfiguration;
|
return isCentralizedNodeLabelConfiguration;
|
||||||
}
|
}
|
||||||
|
@ -252,7 +261,9 @@ public class CommonNodeLabelsManager extends AbstractService {
|
||||||
@Override
|
@Override
|
||||||
protected void serviceStart() throws Exception {
|
protected void serviceStart() throws Exception {
|
||||||
if (nodeLabelsEnabled) {
|
if (nodeLabelsEnabled) {
|
||||||
|
setInitNodeLabelStoreInProgress(true);
|
||||||
initNodeLabelStore(getConfig());
|
initNodeLabelStore(getConfig());
|
||||||
|
setInitNodeLabelStoreInProgress(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
// init dispatcher only when service start, because recover will happen in
|
// init dispatcher only when service start, because recover will happen in
|
||||||
|
@ -1083,4 +1094,9 @@ public class CommonNodeLabelsManager extends AbstractService {
|
||||||
}
|
}
|
||||||
return newMap;
|
return newMap;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setInitNodeLabelStoreInProgress(
|
||||||
|
boolean initNodeLabelStoreInProgress) {
|
||||||
|
this.initNodeLabelStoreInProgress = initNodeLabelStoreInProgress;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -33,7 +33,6 @@ import java.util.concurrent.ConcurrentMap;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.service.Service;
|
|
||||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
|
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
|
||||||
|
@ -114,13 +113,13 @@ public class RMNodeLabelsManager extends CommonNodeLabelsManager {
|
||||||
throws IOException {
|
throws IOException {
|
||||||
try {
|
try {
|
||||||
writeLock.lock();
|
writeLock.lock();
|
||||||
if (getServiceState() == Service.STATE.STARTED) {
|
if (!isInitNodeLabelStoreInProgress()) {
|
||||||
// We cannot remove node labels from collection when some queue(s) are
|
// We cannot remove node labels from collection when some queue(s) are
|
||||||
// using any of them.
|
// using any of them.
|
||||||
// We will only do this check when service starting finished. Before
|
// We will not do remove when recovery is in prpgress. During
|
||||||
// service starting, we will replay edit logs and recover state. It is
|
// service starting, we will replay edit logs and recover state. It is
|
||||||
// possible that a history operation removed some labels which were being
|
// possible that a history operation removed some labels which were not
|
||||||
// used by some queues in the past but not used by current queues.
|
// used by some queues in the past but are used by current queues.
|
||||||
checkRemoveFromClusterNodeLabelsOfQueue(labelsToRemove);
|
checkRemoveFromClusterNodeLabelsOfQueue(labelsToRemove);
|
||||||
}
|
}
|
||||||
// copy before NMs
|
// copy before NMs
|
||||||
|
|
|
@ -33,7 +33,9 @@ import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.service.Service;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.NodeLabel;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||||
|
@ -44,6 +46,7 @@ import org.apache.hadoop.yarn.nodelabels.NodeLabelTestBase;
|
||||||
import org.apache.hadoop.yarn.nodelabels.RMNodeLabel;
|
import org.apache.hadoop.yarn.nodelabels.RMNodeLabel;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeLabelsUpdateSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeLabelsUpdateSchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
|
||||||
|
@ -606,16 +609,7 @@ public class TestRMNodeLabelsManager extends NodeLabelTestBase {
|
||||||
|
|
||||||
@Test(timeout = 60000)
|
@Test(timeout = 60000)
|
||||||
public void testcheckRemoveFromClusterNodeLabelsOfQueue() throws Exception {
|
public void testcheckRemoveFromClusterNodeLabelsOfQueue() throws Exception {
|
||||||
class TestRMLabelManger extends RMNodeLabelsManager {
|
lmgr = new RMNodeLabelsManager();
|
||||||
@Override
|
|
||||||
protected void checkRemoveFromClusterNodeLabelsOfQueue(
|
|
||||||
Collection<String> labelsToRemove) throws IOException {
|
|
||||||
checkQueueCall = true;
|
|
||||||
// Do nothing
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
lmgr = new TestRMLabelManger();
|
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
File tempDir = File.createTempFile("nlb", ".tmp");
|
File tempDir = File.createTempFile("nlb", ".tmp");
|
||||||
tempDir.delete();
|
tempDir.delete();
|
||||||
|
@ -624,23 +618,60 @@ public class TestRMNodeLabelsManager extends NodeLabelTestBase {
|
||||||
conf.set(YarnConfiguration.FS_NODE_LABELS_STORE_ROOT_DIR,
|
conf.set(YarnConfiguration.FS_NODE_LABELS_STORE_ROOT_DIR,
|
||||||
tempDir.getAbsolutePath());
|
tempDir.getAbsolutePath());
|
||||||
conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true);
|
conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true);
|
||||||
|
conf.set(YarnConfiguration.RM_SCHEDULER,
|
||||||
|
"org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler");
|
||||||
|
Configuration withQueueLabels = getConfigurationWithQueueLabels(conf);
|
||||||
|
MockRM rm = initRM(conf);
|
||||||
|
lmgr.addToCluserNodeLabels(toSet(NodeLabel.newInstance("x", false)));
|
||||||
|
lmgr.removeFromClusterNodeLabels(Arrays.asList(new String[] { "x" }));
|
||||||
|
lmgr.addToCluserNodeLabelsWithDefaultExclusivity(toSet("x"));
|
||||||
|
rm.stop();
|
||||||
|
class TestRMLabelManger extends RMNodeLabelsManager {
|
||||||
|
@Override
|
||||||
|
protected void checkRemoveFromClusterNodeLabelsOfQueue(
|
||||||
|
Collection<String> labelsToRemove) throws IOException {
|
||||||
|
checkQueueCall = true;
|
||||||
|
// Do nothing
|
||||||
|
}
|
||||||
|
}
|
||||||
|
lmgr = new TestRMLabelManger();
|
||||||
|
MockRM rm2 = initRM(withQueueLabels);
|
||||||
|
Assert.assertFalse(
|
||||||
|
"checkRemoveFromClusterNodeLabelsOfQueue should not be called"
|
||||||
|
+ "on recovery",
|
||||||
|
checkQueueCall);
|
||||||
|
lmgr.removeFromClusterNodeLabels(Arrays.asList(new String[] { "x" }));
|
||||||
|
Assert
|
||||||
|
.assertTrue("checkRemoveFromClusterNodeLabelsOfQueue should be called "
|
||||||
|
+ "since its not recovery", checkQueueCall);
|
||||||
|
rm2.stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
private MockRM initRM(Configuration conf) {
|
||||||
MockRM rm = new MockRM(conf) {
|
MockRM rm = new MockRM(conf) {
|
||||||
@Override
|
@Override
|
||||||
public RMNodeLabelsManager createNodeLabelManager() {
|
public RMNodeLabelsManager createNodeLabelManager() {
|
||||||
return lmgr;
|
return lmgr;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
lmgr.addToCluserNodeLabelsWithDefaultExclusivity(toSet("a"));
|
|
||||||
lmgr.removeFromClusterNodeLabels(Arrays.asList(new String[] { "a" }));
|
|
||||||
rm.getRMContext().setNodeLabelManager(lmgr);
|
rm.getRMContext().setNodeLabelManager(lmgr);
|
||||||
rm.start();
|
rm.start();
|
||||||
lmgr.addToCluserNodeLabelsWithDefaultExclusivity(toSet("a"));
|
Assert.assertEquals(Service.STATE.STARTED, rm.getServiceState());
|
||||||
Assert.assertEquals(false, checkQueueCall);
|
return rm;
|
||||||
lmgr.removeFromClusterNodeLabels(Arrays.asList(new String[] { "a" }));
|
}
|
||||||
Assert.assertEquals(true, checkQueueCall);
|
|
||||||
lmgr.stop();
|
private Configuration getConfigurationWithQueueLabels(Configuration config) {
|
||||||
lmgr.close();
|
CapacitySchedulerConfiguration conf =
|
||||||
rm.stop();
|
new CapacitySchedulerConfiguration(config);
|
||||||
|
// Define top-level queues
|
||||||
|
conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] { "a" });
|
||||||
|
conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);
|
||||||
|
|
||||||
|
final String A = CapacitySchedulerConfiguration.ROOT + ".a";
|
||||||
|
conf.setCapacity(A, 100);
|
||||||
|
conf.setAccessibleNodeLabels(A, ImmutableSet.of("x"));
|
||||||
|
conf.setCapacityByLabel(A, "x", 100);
|
||||||
|
return conf;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(timeout = 5000)
|
@Test(timeout = 5000)
|
||||||
|
|
Loading…
Reference in New Issue