YARN-6585. RM fails to start when upgrading from 2.7 for clusters with node labels. Contributed by Sunil G.
(cherry picked from commit 5578af8603
)
This commit is contained in:
parent
7eee4fed89
commit
9849b5a489
|
@ -111,6 +111,13 @@ public class AddToClusterNodeLabelsRequestPBImpl extends
|
||||||
for (NodeLabelProto r : attributesProtoList) {
|
for (NodeLabelProto r : attributesProtoList) {
|
||||||
this.updatedNodeLabels.add(convertFromProtoFormat(r));
|
this.updatedNodeLabels.add(convertFromProtoFormat(r));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (this.updatedNodeLabels.isEmpty()) {
|
||||||
|
List<String> deprecatedLabelsList = p.getDeprecatedNodeLabelsList();
|
||||||
|
for (String l : deprecatedLabelsList) {
|
||||||
|
this.updatedNodeLabels.add(NodeLabel.newInstance(l));
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private NodeLabel convertFromProtoFormat(NodeLabelProto p) {
|
private NodeLabel convertFromProtoFormat(NodeLabelProto p) {
|
||||||
|
|
|
@ -26,7 +26,9 @@ import java.io.File;
|
||||||
import java.io.FileOutputStream;
|
import java.io.FileOutputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.PrintWriter;
|
import java.io.PrintWriter;
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
@ -46,12 +48,17 @@ import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.security.authorize.AccessControlList;
|
import org.apache.hadoop.security.authorize.AccessControlList;
|
||||||
import org.apache.hadoop.security.authorize.ProxyUsers;
|
import org.apache.hadoop.security.authorize.ProxyUsers;
|
||||||
import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
|
import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
|
||||||
|
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsResponse;
|
||||||
import org.apache.hadoop.yarn.api.records.DecommissionType;
|
import org.apache.hadoop.yarn.api.records.DecommissionType;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.NodeLabel;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.conf.HAUtil;
|
import org.apache.hadoop.yarn.conf.HAUtil;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityRequest;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityRequest;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest;
|
||||||
|
@ -62,6 +69,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsC
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsRequest;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsRequest;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsRequest;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsRequest;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeRequest;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeRequest;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.AddToClusterNodeLabelsRequestPBImpl;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.DynamicResourceConfiguration;
|
import org.apache.hadoop.yarn.server.resourcemanager.resource.DynamicResourceConfiguration;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||||
|
@ -75,6 +83,8 @@ import org.junit.Test;
|
||||||
|
|
||||||
import com.google.common.collect.ImmutableMap;
|
import com.google.common.collect.ImmutableMap;
|
||||||
import com.google.common.collect.ImmutableSet;
|
import com.google.common.collect.ImmutableSet;
|
||||||
|
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||||
|
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.AddToClusterNodeLabelsRequestProto;
|
||||||
|
|
||||||
public class TestRMAdminService {
|
public class TestRMAdminService {
|
||||||
|
|
||||||
|
@ -1362,4 +1372,52 @@ public class TestRMAdminService {
|
||||||
base = base * 2;
|
base = base * 2;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 30000)
|
||||||
|
public void testAdminAddToClusterNodeLabelsWithDeprecatedAPIs()
|
||||||
|
throws Exception, YarnException {
|
||||||
|
configuration.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS,
|
||||||
|
"org.apache.hadoop.yarn.FileSystemBasedConfigurationProvider");
|
||||||
|
|
||||||
|
uploadDefaultConfiguration();
|
||||||
|
|
||||||
|
rm = new MockRM(configuration) {
|
||||||
|
protected ClientRMService createClientRMService() {
|
||||||
|
return new ClientRMService(this.rmContext, scheduler, this.rmAppManager,
|
||||||
|
this.applicationACLsManager, this.queueACLsManager,
|
||||||
|
this.getRMContext().getRMDelegationTokenSecretManager());
|
||||||
|
};
|
||||||
|
};
|
||||||
|
rm.init(configuration);
|
||||||
|
rm.start();
|
||||||
|
|
||||||
|
try {
|
||||||
|
List<String> list = new ArrayList<String>();
|
||||||
|
list.add("a");
|
||||||
|
list.add("b");
|
||||||
|
AddToClusterNodeLabelsRequestProto proto = AddToClusterNodeLabelsRequestProto
|
||||||
|
.newBuilder().addAllDeprecatedNodeLabels(list).build();
|
||||||
|
AddToClusterNodeLabelsRequestPBImpl protoImpl = new AddToClusterNodeLabelsRequestPBImpl(
|
||||||
|
proto);
|
||||||
|
rm.adminService
|
||||||
|
.addToClusterNodeLabels((AddToClusterNodeLabelsRequest) protoImpl);
|
||||||
|
} catch (Exception ex) {
|
||||||
|
fail("Could not update node labels." + ex);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create a client.
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
YarnRPC rpc = YarnRPC.create(conf);
|
||||||
|
InetSocketAddress rmAddress = rm.getClientRMService().getBindAddress();
|
||||||
|
ApplicationClientProtocol client = (ApplicationClientProtocol) rpc
|
||||||
|
.getProxy(ApplicationClientProtocol.class, rmAddress, conf);
|
||||||
|
|
||||||
|
// Get node labels collection
|
||||||
|
GetClusterNodeLabelsResponse response = client
|
||||||
|
.getClusterNodeLabels(GetClusterNodeLabelsRequest.newInstance());
|
||||||
|
NodeLabel labelX = NodeLabel.newInstance("a");
|
||||||
|
NodeLabel labelY = NodeLabel.newInstance("b");
|
||||||
|
Assert.assertTrue(
|
||||||
|
response.getNodeLabelList().containsAll(Arrays.asList(labelX, labelY)));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,6 +24,7 @@ import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
|
import java.io.FileOutputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
@ -704,4 +705,52 @@ public class TestRMNodeLabelsManager extends NodeLabelTestBase {
|
||||||
assertLabelsToNodesEquals(
|
assertLabelsToNodesEquals(
|
||||||
mgr.getLabelsToNodes(), transposeNodeToLabels(mgr.getNodeLabels()));
|
mgr.getLabelsToNodes(), transposeNodeToLabels(mgr.getNodeLabels()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testBackwardsCompatableMirror() throws Exception {
|
||||||
|
lmgr = new RMNodeLabelsManager();
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
File tempDir = File.createTempFile("nlb", ".tmp");
|
||||||
|
tempDir.delete();
|
||||||
|
tempDir.mkdirs();
|
||||||
|
tempDir.deleteOnExit();
|
||||||
|
String tempDirName = tempDir.getAbsolutePath();
|
||||||
|
conf.set(YarnConfiguration.FS_NODE_LABELS_STORE_ROOT_DIR, tempDirName);
|
||||||
|
|
||||||
|
// The following are the contents of a 2.7-formatted levelDB file to be
|
||||||
|
// placed in nodelabel.mirror. There are 3 labels: 'a', 'b', and 'c'.
|
||||||
|
// host1 is labeled with 'a', host2 is labeled with 'b', and c is not
|
||||||
|
// associated with a node.
|
||||||
|
byte[] contents =
|
||||||
|
{
|
||||||
|
0x09, 0x0A, 0x01, 0x61, 0x0A, 0x01, 0x62, 0x0A, 0x01, 0x63, 0x20,
|
||||||
|
0x0A, 0x0E, 0x0A, 0x09, 0x0A, 0x05, 0x68, 0x6F, 0x73, 0x74, 0x32,
|
||||||
|
0x10, 0x00, 0x12, 0x01, 0x62, 0x0A, 0x0E, 0x0A, 0x09, 0x0A, 0x05,
|
||||||
|
0x68, 0x6F, 0x73, 0x74, 0x31, 0x10, 0x00, 0x12, 0x01, 0x61
|
||||||
|
};
|
||||||
|
File file = new File(tempDirName + "/nodelabel.mirror");
|
||||||
|
file.createNewFile();
|
||||||
|
FileOutputStream stream = new FileOutputStream(file);
|
||||||
|
stream.write(contents);
|
||||||
|
stream.close();
|
||||||
|
|
||||||
|
conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true);
|
||||||
|
conf.set(YarnConfiguration.RM_SCHEDULER,
|
||||||
|
"org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler");
|
||||||
|
Configuration withQueueLabels = getConfigurationWithQueueLabels(conf);
|
||||||
|
|
||||||
|
MockRM rm = initRM(withQueueLabels);
|
||||||
|
Set<String> labelNames = lmgr.getClusterNodeLabelNames();
|
||||||
|
Map<String, Set<NodeId>> labeledNodes = lmgr.getLabelsToNodes();
|
||||||
|
|
||||||
|
Assert.assertTrue(labelNames.contains("a"));
|
||||||
|
Assert.assertTrue(labelNames.contains("b"));
|
||||||
|
Assert.assertTrue(labelNames.contains("c"));
|
||||||
|
Assert.assertTrue(labeledNodes.get("a")
|
||||||
|
.contains(NodeId.newInstance("host1", 0)));
|
||||||
|
Assert.assertTrue(labeledNodes.get("b")
|
||||||
|
.contains(NodeId.newInstance("host2", 0)));
|
||||||
|
|
||||||
|
rm.stop();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue