YARN-3076. Add API/Implementation to YarnClient to retrieve label-to-node mapping (Varun Saxena via wangda)
(cherry picked from commit d49ae725d5
)
This commit is contained in:
parent
b1fc4ec57a
commit
cd5eb9c1de
|
@ -439,6 +439,18 @@ public class ResourceMgrDelegate extends YarnClient {
|
||||||
return client.getNodeToLabels();
|
return client.getNodeToLabels();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Map<String, Set<NodeId>> getLabelsToNodes() throws YarnException,
|
||||||
|
IOException {
|
||||||
|
return client.getLabelsToNodes();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Map<String, Set<NodeId>> getLabelsToNodes(Set<String> labels)
|
||||||
|
throws YarnException, IOException {
|
||||||
|
return client.getLabelsToNodes(labels);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Set<String> getClusterNodeLabels()
|
public Set<String> getClusterNodeLabels()
|
||||||
throws YarnException, IOException {
|
throws YarnException, IOException {
|
||||||
|
|
|
@ -90,6 +90,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsRequest;
|
||||||
|
@ -436,6 +438,12 @@ public class TestClientRedirect {
|
||||||
GetClusterNodeLabelsRequest request) throws YarnException, IOException {
|
GetClusterNodeLabelsRequest request) throws YarnException, IOException {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public GetLabelsToNodesResponse getLabelsToNodes(
|
||||||
|
GetLabelsToNodesRequest request) throws YarnException, IOException {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
class HistoryService extends AMService implements HSClientProtocol {
|
class HistoryService extends AMService implements HSClientProtocol {
|
||||||
|
|
|
@ -268,6 +268,9 @@ Release 2.7.0 - UNRELEASED
|
||||||
YARN-1514. Utility to benchmark ZKRMStateStore#loadState for RM HA.
|
YARN-1514. Utility to benchmark ZKRMStateStore#loadState for RM HA.
|
||||||
(Tsuyoshi OZAWA via jianhe)
|
(Tsuyoshi OZAWA via jianhe)
|
||||||
|
|
||||||
|
YARN-3076. Add API/Implementation to YarnClient to retrieve label-to-node
|
||||||
|
mapping. (Varun Saxena via wangda)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
YARN-2990. FairScheduler's delay-scheduling always waits for node-local and
|
YARN-2990. FairScheduler's delay-scheduling always waits for node-local and
|
||||||
|
|
|
@ -47,6 +47,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsRequest;
|
||||||
|
@ -676,6 +678,22 @@ public interface ApplicationClientProtocol {
|
||||||
public GetNodesToLabelsResponse getNodeToLabels(
|
public GetNodesToLabelsResponse getNodeToLabels(
|
||||||
GetNodesToLabelsRequest request) throws YarnException, IOException;
|
GetNodesToLabelsRequest request) throws YarnException, IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* <p>
|
||||||
|
* The interface used by client to get labels to nodes mappings
|
||||||
|
* in existing cluster
|
||||||
|
* </p>
|
||||||
|
*
|
||||||
|
* @param request
|
||||||
|
* @return labels to nodes mappings
|
||||||
|
* @throws YarnException
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
@Public
|
||||||
|
@Unstable
|
||||||
|
public GetLabelsToNodesResponse getLabelsToNodes(
|
||||||
|
GetLabelsToNodesRequest request) throws YarnException, IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* <p>
|
* <p>
|
||||||
* The interface used by client to get node labels in the cluster
|
* The interface used by client to get node labels in the cluster
|
||||||
|
|
|
@ -0,0 +1,41 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.api.protocolrecords;
|
||||||
|
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.util.Records;
|
||||||
|
|
||||||
|
public abstract class GetLabelsToNodesRequest {
|
||||||
|
|
||||||
|
public static GetLabelsToNodesRequest newInstance() {
|
||||||
|
return Records.newRecord(GetLabelsToNodesRequest.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static GetLabelsToNodesRequest newInstance(Set<String> nodeLabels) {
|
||||||
|
GetLabelsToNodesRequest request =
|
||||||
|
Records.newRecord(GetLabelsToNodesRequest.class);
|
||||||
|
request.setNodeLabels(nodeLabels);
|
||||||
|
return request;
|
||||||
|
}
|
||||||
|
|
||||||
|
public abstract void setNodeLabels(Set<String> nodeLabels);
|
||||||
|
|
||||||
|
public abstract Set<String> getNodeLabels();
|
||||||
|
}
|
|
@ -0,0 +1,45 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.api.protocolrecords;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability.Evolving;
|
||||||
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
|
import org.apache.hadoop.yarn.util.Records;
|
||||||
|
|
||||||
|
public abstract class GetLabelsToNodesResponse {
|
||||||
|
public static GetLabelsToNodesResponse newInstance(
|
||||||
|
Map<String, Set<NodeId>> map) {
|
||||||
|
GetLabelsToNodesResponse response =
|
||||||
|
Records.newRecord(GetLabelsToNodesResponse.class);
|
||||||
|
response.setLabelsToNodes(map);
|
||||||
|
return response;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Public
|
||||||
|
@Evolving
|
||||||
|
public abstract void setLabelsToNodes(Map<String, Set<NodeId>> map);
|
||||||
|
|
||||||
|
@Public
|
||||||
|
@Evolving
|
||||||
|
public abstract Map<String, Set<NodeId>> getLabelsToNodes();
|
||||||
|
}
|
|
@ -53,5 +53,6 @@ service ApplicationClientProtocolService {
|
||||||
rpc updateReservation (ReservationUpdateRequestProto) returns (ReservationUpdateResponseProto);
|
rpc updateReservation (ReservationUpdateRequestProto) returns (ReservationUpdateResponseProto);
|
||||||
rpc deleteReservation (ReservationDeleteRequestProto) returns (ReservationDeleteResponseProto);
|
rpc deleteReservation (ReservationDeleteRequestProto) returns (ReservationDeleteResponseProto);
|
||||||
rpc getNodeToLabels (GetNodesToLabelsRequestProto) returns (GetNodesToLabelsResponseProto);
|
rpc getNodeToLabels (GetNodesToLabelsRequestProto) returns (GetNodesToLabelsResponseProto);
|
||||||
|
rpc getLabelsToNodes (GetLabelsToNodesRequestProto) returns (GetLabelsToNodesResponseProto);
|
||||||
rpc getClusterNodeLabels (GetClusterNodeLabelsRequestProto) returns (GetClusterNodeLabelsResponseProto);
|
rpc getClusterNodeLabels (GetClusterNodeLabelsRequestProto) returns (GetClusterNodeLabelsResponseProto);
|
||||||
}
|
}
|
||||||
|
|
|
@ -238,6 +238,11 @@ message NodeIdToLabelsProto {
|
||||||
repeated string nodeLabels = 2;
|
repeated string nodeLabels = 2;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
message LabelsToNodeIdsProto {
|
||||||
|
optional string nodeLabels = 1;
|
||||||
|
repeated NodeIdProto nodeId = 2;
|
||||||
|
}
|
||||||
|
|
||||||
////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////
|
||||||
////// From AM_RM_Protocol /////////////////////////////////////////////
|
////// From AM_RM_Protocol /////////////////////////////////////////////
|
||||||
////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////
|
||||||
|
|
|
@ -201,6 +201,14 @@ message GetNodesToLabelsResponseProto {
|
||||||
repeated NodeIdToLabelsProto nodeToLabels = 1;
|
repeated NodeIdToLabelsProto nodeToLabels = 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
message GetLabelsToNodesRequestProto {
|
||||||
|
repeated string nodeLabels = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
message GetLabelsToNodesResponseProto {
|
||||||
|
repeated LabelsToNodeIdsProto labelsToNodes = 1;
|
||||||
|
}
|
||||||
|
|
||||||
message GetClusterNodeLabelsRequestProto {
|
message GetClusterNodeLabelsRequestProto {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -596,6 +596,37 @@ public abstract class YarnClient extends AbstractService {
|
||||||
public abstract Map<NodeId, Set<String>> getNodeToLabels()
|
public abstract Map<NodeId, Set<String>> getNodeToLabels()
|
||||||
throws YarnException, IOException;
|
throws YarnException, IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* <p>
|
||||||
|
* The interface used by client to get labels to nodes mapping
|
||||||
|
* in existing cluster
|
||||||
|
* </p>
|
||||||
|
*
|
||||||
|
* @return node to labels mappings
|
||||||
|
* @throws YarnException
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
@Public
|
||||||
|
@Unstable
|
||||||
|
public abstract Map<String, Set<NodeId>> getLabelsToNodes()
|
||||||
|
throws YarnException, IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* <p>
|
||||||
|
* The interface used by client to get labels to nodes mapping
|
||||||
|
* for specified labels in existing cluster
|
||||||
|
* </p>
|
||||||
|
*
|
||||||
|
* @param labels labels for which labels to nodes mapping has to be retrieved
|
||||||
|
* @return labels to nodes mappings for specific labels
|
||||||
|
* @throws YarnException
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
@Public
|
||||||
|
@Unstable
|
||||||
|
public abstract Map<String, Set<NodeId>> getLabelsToNodes(Set<String> labels)
|
||||||
|
throws YarnException, IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* <p>
|
* <p>
|
||||||
* The interface used by client to get node labels in the cluster
|
* The interface used by client to get node labels in the cluster
|
||||||
|
|
|
@ -61,6 +61,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsRequest;
|
||||||
|
@ -777,6 +778,20 @@ public class YarnClientImpl extends YarnClient {
|
||||||
.getNodeToLabels();
|
.getNodeToLabels();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Map<String, Set<NodeId>> getLabelsToNodes() throws YarnException,
|
||||||
|
IOException {
|
||||||
|
return rmClient.getLabelsToNodes(GetLabelsToNodesRequest.newInstance())
|
||||||
|
.getLabelsToNodes();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Map<String, Set<NodeId>> getLabelsToNodes(Set<String> labels)
|
||||||
|
throws YarnException, IOException {
|
||||||
|
return rmClient.getLabelsToNodes(
|
||||||
|
GetLabelsToNodesRequest.newInstance(labels)).getLabelsToNodes();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Set<String> getClusterNodeLabels() throws YarnException, IOException {
|
public Set<String> getClusterNodeLabels() throws YarnException, IOException {
|
||||||
return rmClient.getClusterNodeLabels(
|
return rmClient.getClusterNodeLabels(
|
||||||
|
|
|
@ -30,6 +30,7 @@ import java.io.IOException;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.security.PrivilegedExceptionAction;
|
import java.security.PrivilegedExceptionAction;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
|
@ -37,6 +38,7 @@ import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
|
@ -63,6 +65,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
|
||||||
|
@ -402,6 +406,32 @@ public class TestYarnClient {
|
||||||
client.stop();
|
client.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test (timeout = 10000)
|
||||||
|
public void testGetLabelsToNodes() throws YarnException, IOException {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
final YarnClient client = new MockYarnClient();
|
||||||
|
client.init(conf);
|
||||||
|
client.start();
|
||||||
|
|
||||||
|
// Get labels to nodes mapping
|
||||||
|
Map<String, Set<NodeId>> expectedLabelsToNodes =
|
||||||
|
((MockYarnClient)client).getLabelsToNodesMap();
|
||||||
|
Map<String, Set<NodeId>> labelsToNodes = client.getLabelsToNodes();
|
||||||
|
Assert.assertEquals(labelsToNodes, expectedLabelsToNodes);
|
||||||
|
Assert.assertEquals(labelsToNodes.size(), 3);
|
||||||
|
|
||||||
|
// Get labels to nodes for selected labels
|
||||||
|
Set<String> setLabels = new HashSet<String>(Arrays.asList("x", "z"));
|
||||||
|
expectedLabelsToNodes =
|
||||||
|
((MockYarnClient)client).getLabelsToNodesMap(setLabels);
|
||||||
|
labelsToNodes = client.getLabelsToNodes(setLabels);
|
||||||
|
Assert.assertEquals(labelsToNodes, expectedLabelsToNodes);
|
||||||
|
Assert.assertEquals(labelsToNodes.size(), 2);
|
||||||
|
|
||||||
|
client.stop();
|
||||||
|
client.close();
|
||||||
|
}
|
||||||
|
|
||||||
private static class MockYarnClient extends YarnClientImpl {
|
private static class MockYarnClient extends YarnClientImpl {
|
||||||
private ApplicationReport mockReport;
|
private ApplicationReport mockReport;
|
||||||
private List<ApplicationReport> reports;
|
private List<ApplicationReport> reports;
|
||||||
|
@ -422,6 +452,8 @@ public class TestYarnClient {
|
||||||
mock(GetContainersResponse.class);
|
mock(GetContainersResponse.class);
|
||||||
GetContainerReportResponse mockContainerResponse =
|
GetContainerReportResponse mockContainerResponse =
|
||||||
mock(GetContainerReportResponse.class);
|
mock(GetContainerReportResponse.class);
|
||||||
|
GetLabelsToNodesResponse mockLabelsToNodesResponse =
|
||||||
|
mock(GetLabelsToNodesResponse.class);
|
||||||
|
|
||||||
public MockYarnClient() {
|
public MockYarnClient() {
|
||||||
super();
|
super();
|
||||||
|
@ -458,6 +490,9 @@ public class TestYarnClient {
|
||||||
when(rmClient.getContainerReport(any(GetContainerReportRequest.class)))
|
when(rmClient.getContainerReport(any(GetContainerReportRequest.class)))
|
||||||
.thenReturn(mockContainerResponse);
|
.thenReturn(mockContainerResponse);
|
||||||
|
|
||||||
|
when(rmClient.getLabelsToNodes(any(GetLabelsToNodesRequest.class)))
|
||||||
|
.thenReturn(mockLabelsToNodesResponse);
|
||||||
|
|
||||||
historyClient = mock(AHSClient.class);
|
historyClient = mock(AHSClient.class);
|
||||||
|
|
||||||
} catch (YarnException e) {
|
} catch (YarnException e) {
|
||||||
|
@ -618,6 +653,44 @@ public class TestYarnClient {
|
||||||
return appReports;
|
return appReports;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Map<String, Set<NodeId>> getLabelsToNodes()
|
||||||
|
throws YarnException, IOException {
|
||||||
|
when(mockLabelsToNodesResponse.getLabelsToNodes()).thenReturn(
|
||||||
|
getLabelsToNodesMap());
|
||||||
|
return super.getLabelsToNodes();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Map<String, Set<NodeId>> getLabelsToNodes(Set<String> labels)
|
||||||
|
throws YarnException, IOException {
|
||||||
|
when(mockLabelsToNodesResponse.getLabelsToNodes()).thenReturn(
|
||||||
|
getLabelsToNodesMap(labels));
|
||||||
|
return super.getLabelsToNodes(labels);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Map<String, Set<NodeId>> getLabelsToNodesMap() {
|
||||||
|
Map<String, Set<NodeId>> map = new HashMap<String, Set<NodeId>>();
|
||||||
|
Set<NodeId> setNodeIds =
|
||||||
|
new HashSet<NodeId>(Arrays.asList(
|
||||||
|
NodeId.newInstance("host1", 0), NodeId.newInstance("host2", 0)));
|
||||||
|
map.put("x", setNodeIds);
|
||||||
|
map.put("y", setNodeIds);
|
||||||
|
map.put("z", setNodeIds);
|
||||||
|
return map;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Map<String, Set<NodeId>> getLabelsToNodesMap(Set<String> labels) {
|
||||||
|
Map<String, Set<NodeId>> map = new HashMap<String, Set<NodeId>>();
|
||||||
|
Set<NodeId> setNodeIds =
|
||||||
|
new HashSet<NodeId>(Arrays.asList(
|
||||||
|
NodeId.newInstance("host1", 0), NodeId.newInstance("host2", 0)));
|
||||||
|
for(String label : labels) {
|
||||||
|
map.put(label, setNodeIds);
|
||||||
|
}
|
||||||
|
return map;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<ApplicationAttemptReport> getApplicationAttempts(
|
public List<ApplicationAttemptReport> getApplicationAttempts(
|
||||||
ApplicationId appId) throws YarnException, IOException {
|
ApplicationId appId) throws YarnException, IOException {
|
||||||
|
|
|
@ -53,6 +53,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsRequest;
|
||||||
|
@ -97,6 +99,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetContainersRequestPB
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetContainersResponsePBImpl;
|
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetContainersResponsePBImpl;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetDelegationTokenRequestPBImpl;
|
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetDelegationTokenRequestPBImpl;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetDelegationTokenResponsePBImpl;
|
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetDelegationTokenResponsePBImpl;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetLabelsToNodesRequestPBImpl;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetLabelsToNodesResponsePBImpl;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetNewApplicationRequestPBImpl;
|
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetNewApplicationRequestPBImpl;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetNewApplicationResponsePBImpl;
|
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetNewApplicationResponsePBImpl;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetNodesToLabelsRequestPBImpl;
|
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetNodesToLabelsRequestPBImpl;
|
||||||
|
@ -474,6 +478,21 @@ public class ApplicationClientProtocolPBClientImpl implements ApplicationClientP
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public GetLabelsToNodesResponse getLabelsToNodes(
|
||||||
|
GetLabelsToNodesRequest request)
|
||||||
|
throws YarnException, IOException {
|
||||||
|
YarnServiceProtos.GetLabelsToNodesRequestProto requestProto =
|
||||||
|
((GetLabelsToNodesRequestPBImpl) request).getProto();
|
||||||
|
try {
|
||||||
|
return new GetLabelsToNodesResponsePBImpl(proxy.getLabelsToNodes(
|
||||||
|
null, requestProto));
|
||||||
|
} catch (ServiceException e) {
|
||||||
|
RPCUtil.unwrapAndThrowException(e);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public GetClusterNodeLabelsResponse getClusterNodeLabels(
|
public GetClusterNodeLabelsResponse getClusterNodeLabels(
|
||||||
GetClusterNodeLabelsRequest request) throws YarnException, IOException {
|
GetClusterNodeLabelsRequest request) throws YarnException, IOException {
|
||||||
|
|
|
@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
|
||||||
|
@ -73,6 +74,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetContainersRequestPB
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetContainersResponsePBImpl;
|
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetContainersResponsePBImpl;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetDelegationTokenRequestPBImpl;
|
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetDelegationTokenRequestPBImpl;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetDelegationTokenResponsePBImpl;
|
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetDelegationTokenResponsePBImpl;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetLabelsToNodesRequestPBImpl;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetLabelsToNodesResponsePBImpl;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetNewApplicationRequestPBImpl;
|
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetNewApplicationRequestPBImpl;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetNewApplicationResponsePBImpl;
|
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetNewApplicationResponsePBImpl;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetNodesToLabelsRequestPBImpl;
|
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetNodesToLabelsRequestPBImpl;
|
||||||
|
@ -114,6 +117,8 @@ import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetContainerReportRequestP
|
||||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetContainerReportResponseProto;
|
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetContainerReportResponseProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetContainersRequestProto;
|
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetContainersRequestProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetContainersResponseProto;
|
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetContainersResponseProto;
|
||||||
|
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetLabelsToNodesRequestProto;
|
||||||
|
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetLabelsToNodesResponseProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetNewApplicationRequestProto;
|
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetNewApplicationRequestProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetNewApplicationResponseProto;
|
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetNewApplicationResponseProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetNodesToLabelsRequestProto;
|
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetNodesToLabelsRequestProto;
|
||||||
|
@ -470,6 +475,22 @@ public class ApplicationClientProtocolPBServiceImpl implements ApplicationClient
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public GetLabelsToNodesResponseProto getLabelsToNodes(
|
||||||
|
RpcController controller, GetLabelsToNodesRequestProto proto)
|
||||||
|
throws ServiceException {
|
||||||
|
GetLabelsToNodesRequestPBImpl request =
|
||||||
|
new GetLabelsToNodesRequestPBImpl(proto);
|
||||||
|
try {
|
||||||
|
GetLabelsToNodesResponse response = real.getLabelsToNodes(request);
|
||||||
|
return ((GetLabelsToNodesResponsePBImpl) response).getProto();
|
||||||
|
} catch (YarnException e) {
|
||||||
|
throw new ServiceException(e);
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new ServiceException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public GetClusterNodeLabelsResponseProto getClusterNodeLabels(
|
public GetClusterNodeLabelsResponseProto getClusterNodeLabels(
|
||||||
RpcController controller, GetClusterNodeLabelsRequestProto proto)
|
RpcController controller, GetClusterNodeLabelsRequestProto proto)
|
||||||
|
|
|
@ -0,0 +1,121 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
|
||||||
|
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesRequest;
|
||||||
|
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetLabelsToNodesRequestProto;
|
||||||
|
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetLabelsToNodesRequestProtoOrBuilder;
|
||||||
|
|
||||||
|
import com.google.protobuf.TextFormat;
|
||||||
|
|
||||||
|
public class GetLabelsToNodesRequestPBImpl extends GetLabelsToNodesRequest {
|
||||||
|
|
||||||
|
Set<String> nodeLabels = null;
|
||||||
|
|
||||||
|
GetLabelsToNodesRequestProto proto =
|
||||||
|
GetLabelsToNodesRequestProto.getDefaultInstance();
|
||||||
|
GetLabelsToNodesRequestProto.Builder builder = null;
|
||||||
|
boolean viaProto = false;
|
||||||
|
|
||||||
|
public GetLabelsToNodesRequestPBImpl() {
|
||||||
|
builder = GetLabelsToNodesRequestProto.newBuilder();
|
||||||
|
}
|
||||||
|
|
||||||
|
public GetLabelsToNodesRequestPBImpl(GetLabelsToNodesRequestProto proto) {
|
||||||
|
this.proto = proto;
|
||||||
|
viaProto = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
public GetLabelsToNodesRequestProto getProto() {
|
||||||
|
mergeLocalToProto();
|
||||||
|
proto = viaProto ? proto : builder.build();
|
||||||
|
viaProto = true;
|
||||||
|
return proto;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void mergeLocalToProto() {
|
||||||
|
if (viaProto)
|
||||||
|
maybeInitBuilder();
|
||||||
|
mergeLocalToBuilder();
|
||||||
|
proto = builder.build();
|
||||||
|
viaProto = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void mergeLocalToBuilder() {
|
||||||
|
if (nodeLabels != null && !nodeLabels.isEmpty()) {
|
||||||
|
builder.clearNodeLabels();
|
||||||
|
builder.addAllNodeLabels(nodeLabels);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void maybeInitBuilder() {
|
||||||
|
if (viaProto || builder == null) {
|
||||||
|
builder = GetLabelsToNodesRequestProto.newBuilder(proto);
|
||||||
|
}
|
||||||
|
viaProto = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void initNodeLabels() {
|
||||||
|
if (this.nodeLabels != null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
GetLabelsToNodesRequestProtoOrBuilder p = viaProto ? proto : builder;
|
||||||
|
List<String> nodeLabelsList = p.getNodeLabelsList();
|
||||||
|
this.nodeLabels = new HashSet<String>();
|
||||||
|
this.nodeLabels.addAll(nodeLabelsList);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Set<String> getNodeLabels() {
|
||||||
|
initNodeLabels();
|
||||||
|
return this.nodeLabels;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setNodeLabels(Set<String> nodeLabels) {
|
||||||
|
maybeInitBuilder();
|
||||||
|
if (nodeLabels == null)
|
||||||
|
builder.clearNodeLabels();
|
||||||
|
this.nodeLabels = nodeLabels;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode() {
|
||||||
|
return getProto().hashCode();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object other) {
|
||||||
|
if (other == null)
|
||||||
|
return false;
|
||||||
|
if (other.getClass().isAssignableFrom(this.getClass())) {
|
||||||
|
return this.getProto().equals(this.getClass().cast(other).getProto());
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return TextFormat.shortDebugString(getProto());
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,184 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Map.Entry;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability.Evolving;
|
||||||
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
|
||||||
|
import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesResponse;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.proto.YarnProtos.LabelsToNodeIdsProto;
|
||||||
|
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetLabelsToNodesResponseProto;
|
||||||
|
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetLabelsToNodesResponseProtoOrBuilder;
|
||||||
|
|
||||||
|
public class GetLabelsToNodesResponsePBImpl extends
|
||||||
|
GetLabelsToNodesResponse {
|
||||||
|
GetLabelsToNodesResponseProto proto = GetLabelsToNodesResponseProto
|
||||||
|
.getDefaultInstance();
|
||||||
|
GetLabelsToNodesResponseProto.Builder builder = null;
|
||||||
|
boolean viaProto = false;
|
||||||
|
|
||||||
|
private Map<String, Set<NodeId>> labelsToNodes;
|
||||||
|
|
||||||
|
public GetLabelsToNodesResponsePBImpl() {
|
||||||
|
this.builder = GetLabelsToNodesResponseProto.newBuilder();
|
||||||
|
}
|
||||||
|
|
||||||
|
public GetLabelsToNodesResponsePBImpl(GetLabelsToNodesResponseProto proto) {
|
||||||
|
this.proto = proto;
|
||||||
|
this.viaProto = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void initLabelsToNodes() {
|
||||||
|
if (this.labelsToNodes != null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
GetLabelsToNodesResponseProtoOrBuilder p = viaProto ? proto : builder;
|
||||||
|
List<LabelsToNodeIdsProto> list = p.getLabelsToNodesList();
|
||||||
|
this.labelsToNodes = new HashMap<String, Set<NodeId>>();
|
||||||
|
|
||||||
|
for (LabelsToNodeIdsProto c : list) {
|
||||||
|
Set<NodeId> setNodes = new HashSet<NodeId>();
|
||||||
|
for(NodeIdProto n : c.getNodeIdList()) {
|
||||||
|
NodeId node = new NodeIdPBImpl(n);
|
||||||
|
setNodes.add(node);
|
||||||
|
}
|
||||||
|
if(!setNodes.isEmpty()) {
|
||||||
|
this.labelsToNodes.put(c.getNodeLabels(), setNodes);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void maybeInitBuilder() {
|
||||||
|
if (viaProto || builder == null) {
|
||||||
|
builder = GetLabelsToNodesResponseProto.newBuilder(proto);
|
||||||
|
}
|
||||||
|
viaProto = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void addLabelsToNodesToProto() {
|
||||||
|
maybeInitBuilder();
|
||||||
|
builder.clearLabelsToNodes();
|
||||||
|
if (labelsToNodes == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
Iterable<LabelsToNodeIdsProto> iterable =
|
||||||
|
new Iterable<LabelsToNodeIdsProto>() {
|
||||||
|
@Override
|
||||||
|
public Iterator<LabelsToNodeIdsProto> iterator() {
|
||||||
|
return new Iterator<LabelsToNodeIdsProto>() {
|
||||||
|
|
||||||
|
Iterator<Entry<String, Set<NodeId>>> iter =
|
||||||
|
labelsToNodes.entrySet().iterator();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void remove() {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public LabelsToNodeIdsProto next() {
|
||||||
|
Entry<String, Set<NodeId>> now = iter.next();
|
||||||
|
Set<NodeIdProto> nodeProtoSet = new HashSet<NodeIdProto>();
|
||||||
|
for(NodeId n : now.getValue()) {
|
||||||
|
nodeProtoSet.add(convertToProtoFormat(n));
|
||||||
|
}
|
||||||
|
return LabelsToNodeIdsProto.newBuilder()
|
||||||
|
.setNodeLabels(now.getKey()).addAllNodeId(nodeProtoSet)
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean hasNext() {
|
||||||
|
return iter.hasNext();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
};
|
||||||
|
builder.addAllLabelsToNodes(iterable);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void mergeLocalToBuilder() {
|
||||||
|
if (this.labelsToNodes != null) {
|
||||||
|
addLabelsToNodesToProto();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void mergeLocalToProto() {
|
||||||
|
if (viaProto)
|
||||||
|
maybeInitBuilder();
|
||||||
|
mergeLocalToBuilder();
|
||||||
|
proto = builder.build();
|
||||||
|
viaProto = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
public GetLabelsToNodesResponseProto getProto() {
|
||||||
|
mergeLocalToProto();
|
||||||
|
proto = viaProto ? proto : builder.build();
|
||||||
|
viaProto = true;
|
||||||
|
return proto;
|
||||||
|
}
|
||||||
|
|
||||||
|
private NodeIdProto convertToProtoFormat(NodeId t) {
|
||||||
|
return ((NodeIdPBImpl)t).getProto();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode() {
|
||||||
|
assert false : "hashCode not designed";
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object other) {
|
||||||
|
if (other == null)
|
||||||
|
return false;
|
||||||
|
if (other.getClass().isAssignableFrom(this.getClass())) {
|
||||||
|
return this.getProto().equals(this.getClass().cast(other).getProto());
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@Public
|
||||||
|
@Evolving
|
||||||
|
public void setLabelsToNodes(Map<String, Set<NodeId>> map) {
|
||||||
|
initLabelsToNodes();
|
||||||
|
labelsToNodes.clear();
|
||||||
|
labelsToNodes.putAll(map);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@Public
|
||||||
|
@Evolving
|
||||||
|
public Map<String, Set<NodeId>> getLabelsToNodes() {
|
||||||
|
initLabelsToNodes();
|
||||||
|
return this.labelsToNodes;
|
||||||
|
}
|
||||||
|
}
|
|
@ -1003,4 +1003,16 @@ public class TestPBImplRecords {
|
||||||
validatePBImplRecord(GetNodesToLabelsResponsePBImpl.class,
|
validatePBImplRecord(GetNodesToLabelsResponsePBImpl.class,
|
||||||
GetNodesToLabelsResponseProto.class);
|
GetNodesToLabelsResponseProto.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetLabelsToNodesRequestPBImpl() throws Exception {
|
||||||
|
validatePBImplRecord(GetLabelsToNodesRequestPBImpl.class,
|
||||||
|
GetLabelsToNodesRequestProto.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetLabelsToNodesResponsePBImpl() throws Exception {
|
||||||
|
validatePBImplRecord(GetLabelsToNodesResponsePBImpl.class,
|
||||||
|
GetLabelsToNodesResponseProto.class);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -70,6 +70,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsRequest;
|
||||||
|
@ -1220,6 +1222,19 @@ public class ClientRMService extends AbstractService implements
|
||||||
return response;
|
return response;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public GetLabelsToNodesResponse getLabelsToNodes(
|
||||||
|
GetLabelsToNodesRequest request) throws YarnException, IOException {
|
||||||
|
RMNodeLabelsManager labelsMgr = rmContext.getNodeLabelManager();
|
||||||
|
if (request.getNodeLabels() == null || request.getNodeLabels().isEmpty()) {
|
||||||
|
return GetLabelsToNodesResponse.newInstance(
|
||||||
|
labelsMgr.getLabelsToNodes());
|
||||||
|
} else {
|
||||||
|
return GetLabelsToNodesResponse.newInstance(
|
||||||
|
labelsMgr.getLabelsToNodes(request.getNodeLabels()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public GetClusterNodeLabelsResponse getClusterNodeLabels(
|
public GetClusterNodeLabelsResponse getClusterNodeLabels(
|
||||||
GetClusterNodeLabelsRequest request) throws YarnException, IOException {
|
GetClusterNodeLabelsRequest request) throws YarnException, IOException {
|
||||||
|
|
|
@ -70,6 +70,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
|
||||||
|
@ -1437,4 +1439,77 @@ public class TestClientRMService {
|
||||||
rpc.stopProxy(client, conf);
|
rpc.stopProxy(client, conf);
|
||||||
rm.close();
|
rm.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetLabelsToNodes() throws Exception {
|
||||||
|
MockRM rm = new MockRM() {
|
||||||
|
protected ClientRMService createClientRMService() {
|
||||||
|
return new ClientRMService(this.rmContext, scheduler,
|
||||||
|
this.rmAppManager, this.applicationACLsManager,
|
||||||
|
this.queueACLsManager, this.getRMContext()
|
||||||
|
.getRMDelegationTokenSecretManager());
|
||||||
|
};
|
||||||
|
};
|
||||||
|
rm.start();
|
||||||
|
RMNodeLabelsManager labelsMgr = rm.getRMContext().getNodeLabelManager();
|
||||||
|
labelsMgr.addToCluserNodeLabels(ImmutableSet.of("x", "y", "z"));
|
||||||
|
|
||||||
|
Map<NodeId, Set<String>> map = new HashMap<NodeId, Set<String>>();
|
||||||
|
map.put(NodeId.newInstance("host1", 0), ImmutableSet.of("x"));
|
||||||
|
map.put(NodeId.newInstance("host1", 1), ImmutableSet.of("z"));
|
||||||
|
map.put(NodeId.newInstance("host2", 0), ImmutableSet.of("y"));
|
||||||
|
map.put(NodeId.newInstance("host3", 0), ImmutableSet.of("y"));
|
||||||
|
map.put(NodeId.newInstance("host3", 1), ImmutableSet.of("z"));
|
||||||
|
labelsMgr.replaceLabelsOnNode(map);
|
||||||
|
|
||||||
|
// Create a client.
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
YarnRPC rpc = YarnRPC.create(conf);
|
||||||
|
InetSocketAddress rmAddress = rm.getClientRMService().getBindAddress();
|
||||||
|
LOG.info("Connecting to ResourceManager at " + rmAddress);
|
||||||
|
ApplicationClientProtocol client =
|
||||||
|
(ApplicationClientProtocol) rpc.getProxy(
|
||||||
|
ApplicationClientProtocol.class, rmAddress, conf);
|
||||||
|
|
||||||
|
// Get node labels collection
|
||||||
|
GetClusterNodeLabelsResponse response =
|
||||||
|
client.getClusterNodeLabels(GetClusterNodeLabelsRequest.newInstance());
|
||||||
|
Assert.assertTrue(response.getNodeLabels().containsAll(
|
||||||
|
Arrays.asList("x", "y", "z")));
|
||||||
|
|
||||||
|
// Get labels to nodes mapping
|
||||||
|
GetLabelsToNodesResponse response1 =
|
||||||
|
client.getLabelsToNodes(GetLabelsToNodesRequest.newInstance());
|
||||||
|
Map<String, Set<NodeId>> labelsToNodes = response1.getLabelsToNodes();
|
||||||
|
Assert.assertTrue(
|
||||||
|
labelsToNodes.keySet().containsAll(Arrays.asList("x", "y", "z")));
|
||||||
|
Assert.assertTrue(
|
||||||
|
labelsToNodes.get("x").containsAll(Arrays.asList(
|
||||||
|
NodeId.newInstance("host1", 0))));
|
||||||
|
Assert.assertTrue(
|
||||||
|
labelsToNodes.get("y").containsAll(Arrays.asList(
|
||||||
|
NodeId.newInstance("host2", 0), NodeId.newInstance("host3", 0))));
|
||||||
|
Assert.assertTrue(
|
||||||
|
labelsToNodes.get("z").containsAll(Arrays.asList(
|
||||||
|
NodeId.newInstance("host1", 1), NodeId.newInstance("host3", 1))));
|
||||||
|
|
||||||
|
// Get labels to nodes mapping for specific labels
|
||||||
|
Set<String> setlabels =
|
||||||
|
new HashSet<String>(Arrays.asList(new String[]{"x", "z"}));
|
||||||
|
GetLabelsToNodesResponse response2 =
|
||||||
|
client.getLabelsToNodes(GetLabelsToNodesRequest.newInstance(setlabels));
|
||||||
|
labelsToNodes = response2.getLabelsToNodes();
|
||||||
|
Assert.assertTrue(
|
||||||
|
labelsToNodes.keySet().containsAll(Arrays.asList("x", "z")));
|
||||||
|
Assert.assertTrue(
|
||||||
|
labelsToNodes.get("x").containsAll(Arrays.asList(
|
||||||
|
NodeId.newInstance("host1", 0))));
|
||||||
|
Assert.assertTrue(
|
||||||
|
labelsToNodes.get("z").containsAll(Arrays.asList(
|
||||||
|
NodeId.newInstance("host1", 1), NodeId.newInstance("host3", 1))));
|
||||||
|
Assert.assertEquals(labelsToNodes.get("y"), null);
|
||||||
|
|
||||||
|
rpc.stopProxy(client, conf);
|
||||||
|
rm.close();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue