YARN-11370. [Federation] Refactor MemoryFederationStateStore code. (#5126)
This commit is contained in:
parent
e8a6b2c2c4
commit
2e997d818d
|
@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.api.records.ReservationId;
|
|||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
|
||||
import org.apache.hadoop.yarn.server.federation.store.metrics.FederationStateStoreClientMetrics;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterResponse;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
|
||||
|
@ -107,7 +108,7 @@ import static org.apache.hadoop.yarn.server.federation.store.utils.FederationSta
|
|||
public class MemoryFederationStateStore implements FederationStateStore {
|
||||
|
||||
private Map<SubClusterId, SubClusterInfo> membership;
|
||||
private Map<ApplicationId, SubClusterId> applications;
|
||||
private Map<ApplicationId, ApplicationHomeSubCluster> applications;
|
||||
private Map<ReservationId, SubClusterId> reservations;
|
||||
private Map<String, SubClusterPolicyConfiguration> policies;
|
||||
private RouterRMDTSecretManagerState routerRMSecretManagerState;
|
||||
|
@ -122,10 +123,10 @@ public class MemoryFederationStateStore implements FederationStateStore {
|
|||
|
||||
@Override
|
||||
public void init(Configuration conf) {
|
||||
membership = new ConcurrentHashMap<SubClusterId, SubClusterInfo>();
|
||||
applications = new ConcurrentHashMap<ApplicationId, SubClusterId>();
|
||||
reservations = new ConcurrentHashMap<ReservationId, SubClusterId>();
|
||||
policies = new ConcurrentHashMap<String, SubClusterPolicyConfiguration>();
|
||||
membership = new ConcurrentHashMap<>();
|
||||
applications = new ConcurrentHashMap<>();
|
||||
reservations = new ConcurrentHashMap<>();
|
||||
policies = new ConcurrentHashMap<>();
|
||||
routerRMSecretManagerState = new RouterRMDTSecretManagerState();
|
||||
maxAppsInStateStore = conf.getInt(
|
||||
YarnConfiguration.FEDERATION_STATESTORE_MAX_APPLICATIONS,
|
||||
|
@ -143,14 +144,15 @@ public class MemoryFederationStateStore implements FederationStateStore {
|
|||
}
|
||||
|
||||
@Override
|
||||
public SubClusterRegisterResponse registerSubCluster(
|
||||
SubClusterRegisterRequest request) throws YarnException {
|
||||
public SubClusterRegisterResponse registerSubCluster(SubClusterRegisterRequest request)
|
||||
throws YarnException {
|
||||
long startTime = clock.getTime();
|
||||
|
||||
FederationMembershipStateStoreInputValidator.validate(request);
|
||||
SubClusterInfo subClusterInfo = request.getSubClusterInfo();
|
||||
|
||||
long currentTime =
|
||||
Calendar.getInstance(TimeZone.getTimeZone("UTC")).getTimeInMillis();
|
||||
|
||||
SubClusterInfo subClusterInfoToSave =
|
||||
SubClusterInfo.newInstance(subClusterInfo.getSubClusterId(),
|
||||
subClusterInfo.getAMRMServiceAddress(),
|
||||
|
@ -161,18 +163,21 @@ public class MemoryFederationStateStore implements FederationStateStore {
|
|||
subClusterInfo.getCapability());
|
||||
|
||||
membership.put(subClusterInfo.getSubClusterId(), subClusterInfoToSave);
|
||||
long stopTime = clock.getTime();
|
||||
|
||||
FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - startTime);
|
||||
return SubClusterRegisterResponse.newInstance();
|
||||
}
|
||||
|
||||
@Override
|
||||
public SubClusterDeregisterResponse deregisterSubCluster(
|
||||
SubClusterDeregisterRequest request) throws YarnException {
|
||||
public SubClusterDeregisterResponse deregisterSubCluster(SubClusterDeregisterRequest request)
|
||||
throws YarnException {
|
||||
|
||||
FederationMembershipStateStoreInputValidator.validate(request);
|
||||
SubClusterInfo subClusterInfo = membership.get(request.getSubClusterId());
|
||||
if (subClusterInfo == null) {
|
||||
String errMsg =
|
||||
"SubCluster " + request.getSubClusterId().toString() + " not found";
|
||||
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
|
||||
FederationStateStoreUtils.logAndThrowStoreException(
|
||||
LOG, "SubCluster %s not found", request.getSubClusterId());
|
||||
} else {
|
||||
subClusterInfo.setState(request.getState());
|
||||
}
|
||||
|
@ -181,17 +186,16 @@ public class MemoryFederationStateStore implements FederationStateStore {
|
|||
}
|
||||
|
||||
@Override
|
||||
public SubClusterHeartbeatResponse subClusterHeartbeat(
|
||||
SubClusterHeartbeatRequest request) throws YarnException {
|
||||
public SubClusterHeartbeatResponse subClusterHeartbeat(SubClusterHeartbeatRequest request)
|
||||
throws YarnException {
|
||||
|
||||
FederationMembershipStateStoreInputValidator.validate(request);
|
||||
SubClusterId subClusterId = request.getSubClusterId();
|
||||
SubClusterInfo subClusterInfo = membership.get(subClusterId);
|
||||
|
||||
if (subClusterInfo == null) {
|
||||
String errMsg = "SubCluster " + subClusterId.toString()
|
||||
+ " does not exist; cannot heartbeat";
|
||||
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
|
||||
FederationStateStoreUtils.logAndThrowStoreException(
|
||||
LOG, "SubCluster %s does not exist; cannot heartbeat.", request.getSubClusterId());
|
||||
}
|
||||
|
||||
long currentTime =
|
||||
|
@ -205,11 +209,12 @@ public class MemoryFederationStateStore implements FederationStateStore {
|
|||
}
|
||||
|
||||
@Override
|
||||
public GetSubClusterInfoResponse getSubCluster(
|
||||
GetSubClusterInfoRequest request) throws YarnException {
|
||||
public GetSubClusterInfoResponse getSubCluster(GetSubClusterInfoRequest request)
|
||||
throws YarnException {
|
||||
|
||||
FederationMembershipStateStoreInputValidator.validate(request);
|
||||
SubClusterId subClusterId = request.getSubClusterId();
|
||||
|
||||
if (!membership.containsKey(subClusterId)) {
|
||||
LOG.warn("The queried SubCluster: {} does not exist.", subClusterId);
|
||||
return null;
|
||||
|
@ -219,16 +224,17 @@ public class MemoryFederationStateStore implements FederationStateStore {
|
|||
}
|
||||
|
||||
@Override
|
||||
public GetSubClustersInfoResponse getSubClusters(
|
||||
GetSubClustersInfoRequest request) throws YarnException {
|
||||
List<SubClusterInfo> result = new ArrayList<SubClusterInfo>();
|
||||
public GetSubClustersInfoResponse getSubClusters(GetSubClustersInfoRequest request)
|
||||
throws YarnException {
|
||||
|
||||
List<SubClusterInfo> result = new ArrayList<>();
|
||||
|
||||
for (SubClusterInfo info : membership.values()) {
|
||||
if (!request.getFilterInactiveSubClusters()
|
||||
|| info.getState().isActive()) {
|
||||
if (!request.getFilterInactiveSubClusters() || info.getState().isActive()) {
|
||||
result.add(info);
|
||||
}
|
||||
}
|
||||
|
||||
return GetSubClustersInfoResponse.newInstance(result);
|
||||
}
|
||||
|
||||
|
@ -239,16 +245,16 @@ public class MemoryFederationStateStore implements FederationStateStore {
|
|||
AddApplicationHomeSubClusterRequest request) throws YarnException {
|
||||
|
||||
FederationApplicationHomeSubClusterStoreInputValidator.validate(request);
|
||||
ApplicationId appId =
|
||||
request.getApplicationHomeSubCluster().getApplicationId();
|
||||
ApplicationHomeSubCluster homeSubCluster = request.getApplicationHomeSubCluster();
|
||||
|
||||
ApplicationId appId = homeSubCluster.getApplicationId();
|
||||
|
||||
if (!applications.containsKey(appId)) {
|
||||
applications.put(appId,
|
||||
request.getApplicationHomeSubCluster().getHomeSubCluster());
|
||||
applications.put(appId, homeSubCluster);
|
||||
}
|
||||
|
||||
return AddApplicationHomeSubClusterResponse
|
||||
.newInstance(applications.get(appId));
|
||||
ApplicationHomeSubCluster respHomeSubCluster = applications.get(appId);
|
||||
return AddApplicationHomeSubClusterResponse.newInstance(respHomeSubCluster.getHomeSubCluster());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -256,15 +262,16 @@ public class MemoryFederationStateStore implements FederationStateStore {
|
|||
UpdateApplicationHomeSubClusterRequest request) throws YarnException {
|
||||
|
||||
FederationApplicationHomeSubClusterStoreInputValidator.validate(request);
|
||||
|
||||
ApplicationId appId =
|
||||
request.getApplicationHomeSubCluster().getApplicationId();
|
||||
|
||||
if (!applications.containsKey(appId)) {
|
||||
String errMsg = "Application " + appId + " does not exist";
|
||||
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
|
||||
FederationStateStoreUtils.logAndThrowStoreException(LOG,
|
||||
"Application %s does not exist.", appId);
|
||||
}
|
||||
|
||||
applications.put(appId,
|
||||
request.getApplicationHomeSubCluster().getHomeSubCluster());
|
||||
applications.put(appId, request.getApplicationHomeSubCluster());
|
||||
return UpdateApplicationHomeSubClusterResponse.newInstance();
|
||||
}
|
||||
|
||||
|
@ -275,11 +282,12 @@ public class MemoryFederationStateStore implements FederationStateStore {
|
|||
FederationApplicationHomeSubClusterStoreInputValidator.validate(request);
|
||||
ApplicationId appId = request.getApplicationId();
|
||||
if (!applications.containsKey(appId)) {
|
||||
String errMsg = "Application " + appId + " does not exist";
|
||||
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
|
||||
FederationStateStoreUtils.logAndThrowStoreException(LOG,
|
||||
"Application %s does not exist.", appId);
|
||||
}
|
||||
|
||||
return GetApplicationHomeSubClusterResponse.newInstance(appId, applications.get(appId));
|
||||
return GetApplicationHomeSubClusterResponse.newInstance(appId,
|
||||
applications.get(appId).getHomeSubCluster());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -303,7 +311,7 @@ public class MemoryFederationStateStore implements FederationStateStore {
|
|||
}
|
||||
|
||||
private ApplicationHomeSubCluster generateAppHomeSC(ApplicationId applicationId) {
|
||||
SubClusterId subClusterId = applications.get(applicationId);
|
||||
SubClusterId subClusterId = applications.get(applicationId).getHomeSubCluster();
|
||||
return ApplicationHomeSubCluster.newInstance(applicationId, subClusterId);
|
||||
}
|
||||
|
||||
|
@ -314,8 +322,8 @@ public class MemoryFederationStateStore implements FederationStateStore {
|
|||
FederationApplicationHomeSubClusterStoreInputValidator.validate(request);
|
||||
ApplicationId appId = request.getApplicationId();
|
||||
if (!applications.containsKey(appId)) {
|
||||
String errMsg = "Application " + appId + " does not exist";
|
||||
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
|
||||
FederationStateStoreUtils.logAndThrowStoreException(LOG,
|
||||
"Application %s does not exist.", appId);
|
||||
}
|
||||
|
||||
applications.remove(appId);
|
||||
|
@ -329,12 +337,11 @@ public class MemoryFederationStateStore implements FederationStateStore {
|
|||
FederationPolicyStoreInputValidator.validate(request);
|
||||
String queue = request.getQueue();
|
||||
if (!policies.containsKey(queue)) {
|
||||
LOG.warn("Policy for queue: {} does not exist.", queue);
|
||||
LOG.warn("Policy for queue : {} does not exist.", queue);
|
||||
return null;
|
||||
}
|
||||
|
||||
return GetSubClusterPolicyConfigurationResponse
|
||||
.newInstance(policies.get(queue));
|
||||
return GetSubClusterPolicyConfigurationResponse.newInstance(policies.get(queue));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -350,8 +357,7 @@ public class MemoryFederationStateStore implements FederationStateStore {
|
|||
@Override
|
||||
public GetSubClusterPoliciesConfigurationsResponse getPoliciesConfigurations(
|
||||
GetSubClusterPoliciesConfigurationsRequest request) throws YarnException {
|
||||
ArrayList<SubClusterPolicyConfiguration> result =
|
||||
new ArrayList<SubClusterPolicyConfiguration>();
|
||||
ArrayList<SubClusterPolicyConfiguration> result = new ArrayList<>();
|
||||
for (SubClusterPolicyConfiguration policy : policies.values()) {
|
||||
result.add(policy);
|
||||
}
|
||||
|
@ -386,7 +392,8 @@ public class MemoryFederationStateStore implements FederationStateStore {
|
|||
FederationReservationHomeSubClusterStoreInputValidator.validate(request);
|
||||
ReservationId reservationId = request.getReservationId();
|
||||
if (!reservations.containsKey(reservationId)) {
|
||||
throw new YarnException("Reservation " + reservationId + " does not exist");
|
||||
FederationStateStoreUtils.logAndThrowStoreException(LOG,
|
||||
"Reservation %s does not exist.", reservationId);
|
||||
}
|
||||
SubClusterId subClusterId = reservations.get(reservationId);
|
||||
ReservationHomeSubCluster homeSubCluster =
|
||||
|
@ -417,7 +424,8 @@ public class MemoryFederationStateStore implements FederationStateStore {
|
|||
ReservationId reservationId = request.getReservationHomeSubCluster().getReservationId();
|
||||
|
||||
if (!reservations.containsKey(reservationId)) {
|
||||
throw new YarnException("Reservation " + reservationId + " does not exist.");
|
||||
FederationStateStoreUtils.logAndThrowStoreException(LOG,
|
||||
"Reservation %s does not exist.", reservationId);
|
||||
}
|
||||
|
||||
SubClusterId subClusterId = request.getReservationHomeSubCluster().getHomeSubCluster();
|
||||
|
@ -431,7 +439,8 @@ public class MemoryFederationStateStore implements FederationStateStore {
|
|||
FederationReservationHomeSubClusterStoreInputValidator.validate(request);
|
||||
ReservationId reservationId = request.getReservationId();
|
||||
if (!reservations.containsKey(reservationId)) {
|
||||
throw new YarnException("Reservation " + reservationId + " does not exist");
|
||||
FederationStateStoreUtils.logAndThrowStoreException(LOG,
|
||||
"Reservation %s does not exist.", reservationId);
|
||||
}
|
||||
reservations.remove(reservationId);
|
||||
return DeleteReservationHomeSubClusterResponse.newInstance();
|
||||
|
@ -446,9 +455,8 @@ public class MemoryFederationStateStore implements FederationStateStore {
|
|||
|
||||
Set<DelegationKey> rmDTMasterKeyState = routerRMSecretManagerState.getMasterKeyState();
|
||||
if (rmDTMasterKeyState.contains(delegationKey)) {
|
||||
LOG.info("Error storing info for RMDTMasterKey with keyID: {}.", delegationKey.getKeyId());
|
||||
throw new IOException("RMDTMasterKey with keyID: " + delegationKey.getKeyId() +
|
||||
" is already stored");
|
||||
FederationStateStoreUtils.logAndThrowStoreException(LOG,
|
||||
"Error storing info for RMDTMasterKey with keyID: %s.", delegationKey.getKeyId());
|
||||
}
|
||||
|
||||
routerRMSecretManagerState.getMasterKeyState().add(delegationKey);
|
||||
|
|
|
@ -164,12 +164,9 @@ public abstract class FederationStateStoreBaseTest {
|
|||
|
||||
SubClusterDeregisterRequest deregisterRequest = SubClusterDeregisterRequest
|
||||
.newInstance(subClusterId, SubClusterState.SC_UNREGISTERED);
|
||||
try {
|
||||
stateStore.deregisterSubCluster(deregisterRequest);
|
||||
Assert.fail();
|
||||
} catch (FederationStateStoreException e) {
|
||||
Assert.assertTrue(e.getMessage().startsWith("SubCluster SC not found"));
|
||||
}
|
||||
|
||||
LambdaTestUtils.intercept(YarnException.class,
|
||||
"SubCluster SC not found", () -> stateStore.deregisterSubCluster(deregisterRequest));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -266,13 +263,9 @@ public abstract class FederationStateStoreBaseTest {
|
|||
SubClusterHeartbeatRequest heartbeatRequest = SubClusterHeartbeatRequest
|
||||
.newInstance(subClusterId, SubClusterState.SC_RUNNING, "capability");
|
||||
|
||||
try {
|
||||
stateStore.subClusterHeartbeat(heartbeatRequest);
|
||||
Assert.fail();
|
||||
} catch (FederationStateStoreException e) {
|
||||
Assert.assertTrue(e.getMessage()
|
||||
.startsWith("SubCluster SC does not exist; cannot heartbeat"));
|
||||
}
|
||||
LambdaTestUtils.intercept(YarnException.class,
|
||||
"SubCluster SC does not exist; cannot heartbeat",
|
||||
() -> stateStore.subClusterHeartbeat(heartbeatRequest));
|
||||
}
|
||||
|
||||
// Test FederationApplicationHomeSubClusterStore
|
||||
|
|
Loading…
Reference in New Issue