YARN-2880. Added a test to make sure node labels will be recovered if RM restart is enabled. Contributed by Rohith Sharmaks
(cherry picked from commit 73fbb3c66b
)
This commit is contained in:
parent
ee25c0d890
commit
58c971164c
|
@ -27,6 +27,9 @@ Release 2.7.0 - UNRELEASED
|
||||||
YARN-2765. Added leveldb-based implementation for RMStateStore. (Jason Lowe
|
YARN-2765. Added leveldb-based implementation for RMStateStore. (Jason Lowe
|
||||||
via jianhe)
|
via jianhe)
|
||||||
|
|
||||||
|
YARN-2880. Added a test to make sure node labels will be recovered
|
||||||
|
if RM restart is enabled. (Rohith Sharmaks via jianhe)
|
||||||
|
|
||||||
IMPROVEMENTS
|
IMPROVEMENTS
|
||||||
|
|
||||||
YARN-2891. Failed Container Executor does not provide a clear error
|
YARN-2891. Failed Container Executor does not provide a clear error
|
||||||
|
|
|
@ -69,6 +69,7 @@ import org.apache.hadoop.yarn.api.records.Container;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||||
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
import org.apache.hadoop.yarn.api.records.Priority;
|
import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||||
|
@ -82,6 +83,7 @@ import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
|
||||||
import org.apache.hadoop.yarn.server.api.records.NodeAction;
|
import org.apache.hadoop.yarn.server.api.records.NodeAction;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
|
||||||
|
@ -105,6 +107,9 @@ import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import com.google.common.collect.ImmutableMap;
|
||||||
|
import com.google.common.collect.Sets;
|
||||||
|
|
||||||
public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
||||||
private final static File TEMP_DIR = new File(System.getProperty(
|
private final static File TEMP_DIR = new File(System.getProperty(
|
||||||
"test.build.data", "/tmp"), "decommision");
|
"test.build.data", "/tmp"), "decommision");
|
||||||
|
@ -2036,4 +2041,90 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Test does following verification
|
||||||
|
// 1. Start RM1 with store patch /tmp
|
||||||
|
// 2. Add/remove/replace labels to cluster and node lable and verify
|
||||||
|
// 3. Start RM2 with store patch /tmp only
|
||||||
|
// 4. Get cluster and node lobel, it should be present by recovering it
|
||||||
|
@Test(timeout = 20000)
|
||||||
|
public void testRMRestartRecoveringNodeLabelManager() throws Exception {
|
||||||
|
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||||
|
memStore.init(conf);
|
||||||
|
MockRM rm1 = new MockRM(conf, memStore) {
|
||||||
|
@Override
|
||||||
|
protected RMNodeLabelsManager createNodeLabelManager() {
|
||||||
|
RMNodeLabelsManager mgr = new RMNodeLabelsManager();
|
||||||
|
mgr.init(getConfig());
|
||||||
|
return mgr;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
rm1.init(conf);
|
||||||
|
rm1.start();
|
||||||
|
|
||||||
|
RMNodeLabelsManager nodeLabelManager =
|
||||||
|
rm1.getRMContext().getNodeLabelManager();
|
||||||
|
|
||||||
|
Set<String> clusterNodeLabels = new HashSet<String>();
|
||||||
|
clusterNodeLabels.add("x");
|
||||||
|
clusterNodeLabels.add("y");
|
||||||
|
clusterNodeLabels.add("z");
|
||||||
|
// Add node label x,y,z
|
||||||
|
nodeLabelManager.addToCluserNodeLabels(clusterNodeLabels);
|
||||||
|
|
||||||
|
// Add node Label to Node h1->x
|
||||||
|
NodeId n1 = NodeId.newInstance("h1", 0);
|
||||||
|
nodeLabelManager.addLabelsToNode(ImmutableMap.of(n1, toSet("x")));
|
||||||
|
|
||||||
|
clusterNodeLabels.remove("z");
|
||||||
|
// Remove cluster label z
|
||||||
|
nodeLabelManager.removeFromClusterNodeLabels(toSet("z"));
|
||||||
|
|
||||||
|
// Replace nodelabel h1->x,y
|
||||||
|
nodeLabelManager.replaceLabelsOnNode(ImmutableMap.of(n1, toSet("x", "y")));
|
||||||
|
|
||||||
|
// Wait for updating store.It is expected NodeStore update should happen
|
||||||
|
// very fast since it has separate dispatcher. So waiting for max 5 seconds,
|
||||||
|
// which is sufficient time to update NodeStore.
|
||||||
|
int count = 10;
|
||||||
|
while (count-- > 0) {
|
||||||
|
if (nodeLabelManager.getNodeLabels().size() > 0) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
Thread.sleep(500);
|
||||||
|
}
|
||||||
|
|
||||||
|
Assert.assertEquals(clusterNodeLabels.size(), nodeLabelManager
|
||||||
|
.getClusterNodeLabels().size());
|
||||||
|
|
||||||
|
Map<NodeId, Set<String>> nodeLabels = nodeLabelManager.getNodeLabels();
|
||||||
|
Assert.assertEquals(1, nodeLabelManager.getNodeLabels().size());
|
||||||
|
Assert.assertTrue(nodeLabels.get(n1).equals(toSet("x", "y")));
|
||||||
|
|
||||||
|
MockRM rm2 = new MockRM(conf, memStore) {
|
||||||
|
@Override
|
||||||
|
protected RMNodeLabelsManager createNodeLabelManager() {
|
||||||
|
RMNodeLabelsManager mgr = new RMNodeLabelsManager();
|
||||||
|
mgr.init(getConfig());
|
||||||
|
return mgr;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
rm2.init(conf);
|
||||||
|
rm2.start();
|
||||||
|
|
||||||
|
nodeLabelManager = rm2.getRMContext().getNodeLabelManager();
|
||||||
|
Assert.assertEquals(clusterNodeLabels.size(), nodeLabelManager
|
||||||
|
.getClusterNodeLabels().size());
|
||||||
|
|
||||||
|
nodeLabels = nodeLabelManager.getNodeLabels();
|
||||||
|
Assert.assertEquals(1, nodeLabelManager.getNodeLabels().size());
|
||||||
|
Assert.assertTrue(nodeLabels.get(n1).equals(toSet("x", "y")));
|
||||||
|
rm1.stop();
|
||||||
|
rm2.stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
private <E> Set<E> toSet(E... elements) {
|
||||||
|
Set<E> set = Sets.newHashSet(elements);
|
||||||
|
return set;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue