YARN-5467. InputValidator for the FederationStateStore internal APIs. (Giovanni Matteo Fumarola via Subru)
(cherry picked from commit bd44182e70
)
This commit is contained in:
parent
5c84382397
commit
cfafd173bd
|
@ -57,6 +57,9 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegister
|
|||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterResponse;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterRequest;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterResponse;
|
||||
import org.apache.hadoop.yarn.server.federation.store.utils.FederationApplicationHomeSubClusterStoreInputValidator;
|
||||
import org.apache.hadoop.yarn.server.federation.store.utils.FederationMembershipStateStoreInputValidator;
|
||||
import org.apache.hadoop.yarn.server.federation.store.utils.FederationPolicyStoreInputValidator;
|
||||
import org.apache.hadoop.yarn.server.records.Version;
|
||||
import org.apache.hadoop.yarn.util.MonotonicClock;
|
||||
|
||||
|
@ -88,6 +91,8 @@ public class MemoryFederationStateStore implements FederationStateStore {
|
|||
@Override
|
||||
public SubClusterRegisterResponse registerSubCluster(
|
||||
SubClusterRegisterRequest request) throws YarnException {
|
||||
FederationMembershipStateStoreInputValidator
|
||||
.validateSubClusterRegisterRequest(request);
|
||||
SubClusterInfo subClusterInfo = request.getSubClusterInfo();
|
||||
membership.put(subClusterInfo.getSubClusterId(), subClusterInfo);
|
||||
return SubClusterRegisterResponse.newInstance();
|
||||
|
@ -96,6 +101,8 @@ public class MemoryFederationStateStore implements FederationStateStore {
|
|||
@Override
|
||||
public SubClusterDeregisterResponse deregisterSubCluster(
|
||||
SubClusterDeregisterRequest request) throws YarnException {
|
||||
FederationMembershipStateStoreInputValidator
|
||||
.validateSubClusterDeregisterRequest(request);
|
||||
SubClusterInfo subClusterInfo = membership.get(request.getSubClusterId());
|
||||
if (subClusterInfo == null) {
|
||||
throw new YarnException(
|
||||
|
@ -111,6 +118,8 @@ public class MemoryFederationStateStore implements FederationStateStore {
|
|||
public SubClusterHeartbeatResponse subClusterHeartbeat(
|
||||
SubClusterHeartbeatRequest request) throws YarnException {
|
||||
|
||||
FederationMembershipStateStoreInputValidator
|
||||
.validateSubClusterHeartbeatRequest(request);
|
||||
SubClusterId subClusterId = request.getSubClusterId();
|
||||
SubClusterInfo subClusterInfo = membership.get(subClusterId);
|
||||
|
||||
|
@ -129,6 +138,9 @@ public class MemoryFederationStateStore implements FederationStateStore {
|
|||
@Override
|
||||
public GetSubClusterInfoResponse getSubCluster(
|
||||
GetSubClusterInfoRequest request) throws YarnException {
|
||||
|
||||
FederationMembershipStateStoreInputValidator
|
||||
.validateGetSubClusterInfoRequest(request);
|
||||
SubClusterId subClusterId = request.getSubClusterId();
|
||||
if (!membership.containsKey(subClusterId)) {
|
||||
throw new YarnException(
|
||||
|
@ -157,6 +169,9 @@ public class MemoryFederationStateStore implements FederationStateStore {
|
|||
@Override
|
||||
public AddApplicationHomeSubClusterResponse addApplicationHomeSubCluster(
|
||||
AddApplicationHomeSubClusterRequest request) throws YarnException {
|
||||
|
||||
FederationApplicationHomeSubClusterStoreInputValidator
|
||||
.validateAddApplicationHomeSubClusterRequest(request);
|
||||
ApplicationId appId =
|
||||
request.getApplicationHomeSubCluster().getApplicationId();
|
||||
|
||||
|
@ -172,6 +187,9 @@ public class MemoryFederationStateStore implements FederationStateStore {
|
|||
@Override
|
||||
public UpdateApplicationHomeSubClusterResponse updateApplicationHomeSubCluster(
|
||||
UpdateApplicationHomeSubClusterRequest request) throws YarnException {
|
||||
|
||||
FederationApplicationHomeSubClusterStoreInputValidator
|
||||
.validateUpdateApplicationHomeSubClusterRequest(request);
|
||||
ApplicationId appId =
|
||||
request.getApplicationHomeSubCluster().getApplicationId();
|
||||
if (!applications.containsKey(appId)) {
|
||||
|
@ -186,6 +204,9 @@ public class MemoryFederationStateStore implements FederationStateStore {
|
|||
@Override
|
||||
public GetApplicationHomeSubClusterResponse getApplicationHomeSubCluster(
|
||||
GetApplicationHomeSubClusterRequest request) throws YarnException {
|
||||
|
||||
FederationApplicationHomeSubClusterStoreInputValidator
|
||||
.validateGetApplicationHomeSubClusterRequest(request);
|
||||
ApplicationId appId = request.getApplicationId();
|
||||
if (!applications.containsKey(appId)) {
|
||||
throw new YarnException("Application " + appId + " does not exist");
|
||||
|
@ -212,6 +233,9 @@ public class MemoryFederationStateStore implements FederationStateStore {
|
|||
@Override
|
||||
public DeleteApplicationHomeSubClusterResponse deleteApplicationHomeSubCluster(
|
||||
DeleteApplicationHomeSubClusterRequest request) throws YarnException {
|
||||
|
||||
FederationApplicationHomeSubClusterStoreInputValidator
|
||||
.validateDeleteApplicationHomeSubClusterRequest(request);
|
||||
ApplicationId appId = request.getApplicationId();
|
||||
if (!applications.containsKey(appId)) {
|
||||
throw new YarnException("Application " + appId + " does not exist");
|
||||
|
@ -224,6 +248,9 @@ public class MemoryFederationStateStore implements FederationStateStore {
|
|||
@Override
|
||||
public GetSubClusterPolicyConfigurationResponse getPolicyConfiguration(
|
||||
GetSubClusterPolicyConfigurationRequest request) throws YarnException {
|
||||
|
||||
FederationPolicyStoreInputValidator
|
||||
.validateGetSubClusterPolicyConfigurationRequest(request);
|
||||
String queue = request.getQueue();
|
||||
if (!policies.containsKey(queue)) {
|
||||
throw new YarnException("Policy for queue " + queue + " does not exist");
|
||||
|
@ -236,6 +263,9 @@ public class MemoryFederationStateStore implements FederationStateStore {
|
|||
@Override
|
||||
public SetSubClusterPolicyConfigurationResponse setPolicyConfiguration(
|
||||
SetSubClusterPolicyConfigurationRequest request) throws YarnException {
|
||||
|
||||
FederationPolicyStoreInputValidator
|
||||
.validateSetSubClusterPolicyConfigurationRequest(request);
|
||||
policies.put(request.getPolicyConfiguration().getQueue(),
|
||||
request.getPolicyConfiguration());
|
||||
return SetSubClusterPolicyConfigurationResponse.newInstance();
|
||||
|
|
|
@ -0,0 +1,183 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.federation.store.utils;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterRequest;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterRequest;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Utility class to validate the inputs to
|
||||
* {@code FederationApplicationHomeSubClusterStore}, allows a fail fast
|
||||
* mechanism for invalid user inputs.
|
||||
*
|
||||
*/
|
||||
public final class FederationApplicationHomeSubClusterStoreInputValidator {
|
||||
|
||||
private static final Logger LOG = LoggerFactory
|
||||
.getLogger(FederationApplicationHomeSubClusterStoreInputValidator.class);
|
||||
|
||||
private FederationApplicationHomeSubClusterStoreInputValidator() {
|
||||
}
|
||||
|
||||
/**
|
||||
* Quick validation on the input to check some obvious fail conditions (fail
|
||||
* fast). Check if the provided {@link AddApplicationHomeSubClusterRequest}
|
||||
* for adding a new application is valid or not.
|
||||
*
|
||||
* @param request the {@link AddApplicationHomeSubClusterRequest} to validate
|
||||
* against
|
||||
* @throws FederationStateStoreInvalidInputException if the request is invalid
|
||||
*/
|
||||
public static void validateAddApplicationHomeSubClusterRequest(
|
||||
AddApplicationHomeSubClusterRequest request)
|
||||
throws FederationStateStoreInvalidInputException {
|
||||
if (request == null) {
|
||||
String message = "Missing AddApplicationHomeSubCluster Request."
|
||||
+ " Please try again by specifying"
|
||||
+ " an AddApplicationHomeSubCluster information.";
|
||||
LOG.warn(message);
|
||||
throw new FederationStateStoreInvalidInputException(message);
|
||||
}
|
||||
|
||||
// validate ApplicationHomeSubCluster info
|
||||
checkApplicationHomeSubCluster(request.getApplicationHomeSubCluster());
|
||||
}
|
||||
|
||||
/**
|
||||
* Quick validation on the input to check some obvious fail conditions (fail
|
||||
* fast). Check if the provided {@link UpdateApplicationHomeSubClusterRequest}
|
||||
* for updating an application is valid or not.
|
||||
*
|
||||
* @param request the {@link UpdateApplicationHomeSubClusterRequest} to
|
||||
* validate against
|
||||
* @throws FederationStateStoreInvalidInputException if the request is invalid
|
||||
*/
|
||||
public static void validateUpdateApplicationHomeSubClusterRequest(
|
||||
UpdateApplicationHomeSubClusterRequest request)
|
||||
throws FederationStateStoreInvalidInputException {
|
||||
if (request == null) {
|
||||
String message = "Missing UpdateApplicationHomeSubCluster Request."
|
||||
+ " Please try again by specifying"
|
||||
+ " an ApplicationHomeSubCluster information.";
|
||||
LOG.warn(message);
|
||||
throw new FederationStateStoreInvalidInputException(message);
|
||||
}
|
||||
|
||||
// validate ApplicationHomeSubCluster info
|
||||
checkApplicationHomeSubCluster(request.getApplicationHomeSubCluster());
|
||||
}
|
||||
|
||||
/**
|
||||
* Quick validation on the input to check some obvious fail conditions (fail
|
||||
* fast). Check if the provided {@link GetApplicationHomeSubClusterRequest}
|
||||
* for querying application's information is valid or not.
|
||||
*
|
||||
* @param request the {@link GetApplicationHomeSubClusterRequest} to validate
|
||||
* against
|
||||
* @throws FederationStateStoreInvalidInputException if the request is invalid
|
||||
*/
|
||||
public static void validateGetApplicationHomeSubClusterRequest(
|
||||
GetApplicationHomeSubClusterRequest request)
|
||||
throws FederationStateStoreInvalidInputException {
|
||||
if (request == null) {
|
||||
String message = "Missing GetApplicationHomeSubCluster Request."
|
||||
+ " Please try again by specifying an Application Id information.";
|
||||
LOG.warn(message);
|
||||
throw new FederationStateStoreInvalidInputException(message);
|
||||
}
|
||||
|
||||
// validate application Id
|
||||
checkApplicationId(request.getApplicationId());
|
||||
}
|
||||
|
||||
/**
|
||||
* Quick validation on the input to check some obvious fail conditions (fail
|
||||
* fast). Check if the provided {@link DeleteApplicationHomeSubClusterRequest}
|
||||
* for deleting an application is valid or not.
|
||||
*
|
||||
* @param request the {@link DeleteApplicationHomeSubClusterRequest} to
|
||||
* validate against
|
||||
* @throws FederationStateStoreInvalidInputException if the request is invalid
|
||||
*/
|
||||
public static void validateDeleteApplicationHomeSubClusterRequest(
|
||||
DeleteApplicationHomeSubClusterRequest request)
|
||||
throws FederationStateStoreInvalidInputException {
|
||||
if (request == null) {
|
||||
String message = "Missing DeleteApplicationHomeSubCluster Request."
|
||||
+ " Please try again by specifying"
|
||||
+ " an ApplicationHomeSubCluster information.";
|
||||
LOG.warn(message);
|
||||
throw new FederationStateStoreInvalidInputException(message);
|
||||
}
|
||||
|
||||
// validate application Id
|
||||
checkApplicationId(request.getApplicationId());
|
||||
}
|
||||
|
||||
/**
|
||||
* Validate if the ApplicationHomeSubCluster info are present or not.
|
||||
*
|
||||
* @param applicationHomeSubCluster the information of the application to be
|
||||
* verified
|
||||
* @throws FederationStateStoreInvalidInputException if the SubCluster Info
|
||||
* are invalid
|
||||
*/
|
||||
private static void checkApplicationHomeSubCluster(
|
||||
ApplicationHomeSubCluster applicationHomeSubCluster)
|
||||
|
||||
throws FederationStateStoreInvalidInputException {
|
||||
if (applicationHomeSubCluster == null) {
|
||||
String message = "Missing ApplicationHomeSubCluster Info."
|
||||
+ " Please try again by specifying"
|
||||
+ " an ApplicationHomeSubCluster information.";
|
||||
LOG.warn(message);
|
||||
throw new FederationStateStoreInvalidInputException(message);
|
||||
}
|
||||
// validate application Id
|
||||
checkApplicationId(applicationHomeSubCluster.getApplicationId());
|
||||
|
||||
// validate subcluster Id
|
||||
FederationMembershipStateStoreInputValidator
|
||||
.checkSubClusterId(applicationHomeSubCluster.getHomeSubCluster());
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Validate if the application id is present or not.
|
||||
*
|
||||
* @param appId the id of the application to be verified
|
||||
* @throws FederationStateStoreInvalidInputException if the application Id is
|
||||
* invalid
|
||||
*/
|
||||
private static void checkApplicationId(ApplicationId appId)
|
||||
throws FederationStateStoreInvalidInputException {
|
||||
if (appId == null) {
|
||||
String message = "Missing Application Id."
|
||||
+ " Please try again by specifying an Application Id.";
|
||||
LOG.warn(message);
|
||||
throw new FederationStateStoreInvalidInputException(message);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,317 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.federation.store.utils;
|
||||
|
||||
import java.net.URI;
|
||||
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatRequest;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Utility class to validate the inputs to
|
||||
* {@code FederationMembershipStateStore}, allows a fail fast mechanism for
|
||||
* invalid user inputs.
|
||||
*
|
||||
*/
|
||||
public final class FederationMembershipStateStoreInputValidator {
|
||||
|
||||
private static final Logger LOG = LoggerFactory
|
||||
.getLogger(FederationMembershipStateStoreInputValidator.class);
|
||||
|
||||
private FederationMembershipStateStoreInputValidator() {
|
||||
}
|
||||
|
||||
/**
|
||||
* Quick validation on the input to check some obvious fail conditions (fail
|
||||
* fast). Check if the provided {@link SubClusterRegisterRequest} for
|
||||
* registration a new subcluster is valid or not.
|
||||
*
|
||||
* @param request the {@link SubClusterRegisterRequest} to validate against
|
||||
* @throws FederationStateStoreInvalidInputException if the request is invalid
|
||||
*/
|
||||
public static void validateSubClusterRegisterRequest(
|
||||
SubClusterRegisterRequest request)
|
||||
throws FederationStateStoreInvalidInputException {
|
||||
|
||||
// check if the request is present
|
||||
if (request == null) {
|
||||
String message = "Missing SubClusterRegister Request."
|
||||
+ " Please try again by specifying a"
|
||||
+ " SubCluster Register Information.";
|
||||
LOG.warn(message);
|
||||
throw new FederationStateStoreInvalidInputException(message);
|
||||
|
||||
}
|
||||
|
||||
// validate subcluster info
|
||||
checkSubClusterInfo(request.getSubClusterInfo());
|
||||
}
|
||||
|
||||
/**
|
||||
* Quick validation on the input to check some obvious fail conditions (fail
|
||||
* fast). Check if the provided {@link SubClusterDeregisterRequest} for
|
||||
* deregistration a subcluster is valid or not.
|
||||
*
|
||||
* @param request the {@link SubClusterDeregisterRequest} to validate against
|
||||
* @throws FederationStateStoreInvalidInputException if the request is invalid
|
||||
*/
|
||||
public static void validateSubClusterDeregisterRequest(
|
||||
SubClusterDeregisterRequest request)
|
||||
throws FederationStateStoreInvalidInputException {
|
||||
|
||||
// check if the request is present
|
||||
if (request == null) {
|
||||
String message = "Missing SubClusterDeregister Request."
|
||||
+ " Please try again by specifying a"
|
||||
+ " SubCluster Deregister Information.";
|
||||
LOG.warn(message);
|
||||
throw new FederationStateStoreInvalidInputException(message);
|
||||
}
|
||||
|
||||
// validate subcluster id
|
||||
checkSubClusterId(request.getSubClusterId());
|
||||
// validate subcluster state
|
||||
checkSubClusterState(request.getState());
|
||||
if (!request.getState().isFinal()) {
|
||||
String message = "Invalid non-final state: " + request.getState();
|
||||
LOG.warn(message);
|
||||
throw new FederationStateStoreInvalidInputException(message);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Quick validation on the input to check some obvious fail conditions (fail
|
||||
* fast). Check if the provided {@link SubClusterHeartbeatRequest} for
|
||||
* heartbeating a subcluster is valid or not.
|
||||
*
|
||||
* @param request the {@link SubClusterHeartbeatRequest} to validate against
|
||||
* @throws FederationStateStoreInvalidInputException if the request is invalid
|
||||
*/
|
||||
public static void validateSubClusterHeartbeatRequest(
|
||||
SubClusterHeartbeatRequest request)
|
||||
throws FederationStateStoreInvalidInputException {
|
||||
|
||||
// check if the request is present
|
||||
if (request == null) {
|
||||
String message = "Missing SubClusterHeartbeat Request."
|
||||
+ " Please try again by specifying a"
|
||||
+ " SubCluster Heartbeat Information.";
|
||||
LOG.warn(message);
|
||||
throw new FederationStateStoreInvalidInputException(message);
|
||||
}
|
||||
|
||||
// validate subcluster id
|
||||
checkSubClusterId(request.getSubClusterId());
|
||||
// validate last heartbeat timestamp
|
||||
checkTimestamp(request.getLastHeartBeat());
|
||||
// validate subcluster capability
|
||||
checkCapability(request.getCapability());
|
||||
// validate subcluster state
|
||||
checkSubClusterState(request.getState());
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Quick validation on the input to check some obvious fail conditions (fail
|
||||
* fast). Check if the provided {@link GetSubClusterInfoRequest} for querying
|
||||
* subcluster's information is valid or not.
|
||||
*
|
||||
* @param request the {@link GetSubClusterInfoRequest} to validate against
|
||||
* @throws FederationStateStoreInvalidInputException if the request is invalid
|
||||
*/
|
||||
public static void validateGetSubClusterInfoRequest(
|
||||
GetSubClusterInfoRequest request)
|
||||
throws FederationStateStoreInvalidInputException {
|
||||
|
||||
// check if the request is present
|
||||
if (request == null) {
|
||||
String message = "Missing GetSubClusterInfo Request."
|
||||
+ " Please try again by specifying a Get SubCluster information.";
|
||||
LOG.warn(message);
|
||||
throw new FederationStateStoreInvalidInputException(message);
|
||||
}
|
||||
|
||||
// validate subcluster id
|
||||
checkSubClusterId(request.getSubClusterId());
|
||||
}
|
||||
|
||||
/**
|
||||
* Validate if the SubCluster Info are present or not.
|
||||
*
|
||||
* @param subClusterInfo the information of the subcluster to be verified
|
||||
* @throws FederationStateStoreInvalidInputException if the SubCluster Info
|
||||
* are invalid
|
||||
*/
|
||||
private static void checkSubClusterInfo(SubClusterInfo subClusterInfo)
|
||||
throws FederationStateStoreInvalidInputException {
|
||||
if (subClusterInfo == null) {
|
||||
String message = "Missing SubCluster Information."
|
||||
+ " Please try again by specifying SubCluster Information.";
|
||||
LOG.warn(message);
|
||||
throw new FederationStateStoreInvalidInputException(message);
|
||||
}
|
||||
|
||||
// validate subcluster id
|
||||
checkSubClusterId(subClusterInfo.getSubClusterId());
|
||||
|
||||
// validate AMRM Service address
|
||||
checkAddress(subClusterInfo.getAMRMServiceAddress());
|
||||
// validate ClientRM Service address
|
||||
checkAddress(subClusterInfo.getClientRMServiceAddress());
|
||||
// validate RMClient Service address
|
||||
checkAddress(subClusterInfo.getRMAdminServiceAddress());
|
||||
// validate RMWeb Service address
|
||||
checkAddress(subClusterInfo.getRMWebServiceAddress());
|
||||
|
||||
// validate last heartbeat timestamp
|
||||
checkTimestamp(subClusterInfo.getLastHeartBeat());
|
||||
// validate last start timestamp
|
||||
checkTimestamp(subClusterInfo.getLastStartTime());
|
||||
|
||||
// validate subcluster state
|
||||
checkSubClusterState(subClusterInfo.getState());
|
||||
|
||||
// validate subcluster capability
|
||||
checkCapability(subClusterInfo.getCapability());
|
||||
}
|
||||
|
||||
/**
|
||||
* Validate if the timestamp is positive or not.
|
||||
*
|
||||
* @param timestamp the timestamp to be verified
|
||||
* @throws FederationStateStoreInvalidInputException if the timestamp is
|
||||
* invalid
|
||||
*/
|
||||
private static void checkTimestamp(long timestamp)
|
||||
throws FederationStateStoreInvalidInputException {
|
||||
if (timestamp < 0) {
|
||||
String message = "Invalid timestamp information."
|
||||
+ " Please try again by specifying valid Timestamp Information.";
|
||||
LOG.warn(message);
|
||||
throw new FederationStateStoreInvalidInputException(message);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Validate if the Capability is present or not.
|
||||
*
|
||||
* @param capability the capability of the subcluster to be verified
|
||||
* @throws FederationStateStoreInvalidInputException if the capability is
|
||||
* invalid
|
||||
*/
|
||||
private static void checkCapability(String capability)
|
||||
throws FederationStateStoreInvalidInputException {
|
||||
if (capability == null || capability.isEmpty()) {
|
||||
String message = "Invalid capability information."
|
||||
+ " Please try again by specifying valid Capability Information.";
|
||||
LOG.warn(message);
|
||||
throw new FederationStateStoreInvalidInputException(message);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Validate if the SubCluster Id is present or not.
|
||||
*
|
||||
* @param subClusterId the identifier of the subcluster to be verified
|
||||
* @throws FederationStateStoreInvalidInputException if the SubCluster Id is
|
||||
* invalid
|
||||
*/
|
||||
protected static void checkSubClusterId(SubClusterId subClusterId)
|
||||
throws FederationStateStoreInvalidInputException {
|
||||
// check if cluster id is present
|
||||
if (subClusterId == null) {
|
||||
String message = "Missing SubCluster Id information."
|
||||
+ " Please try again by specifying Subcluster Id information.";
|
||||
LOG.warn(message);
|
||||
throw new FederationStateStoreInvalidInputException(message);
|
||||
}
|
||||
// check if cluster id is valid
|
||||
if (subClusterId.getId().isEmpty()) {
|
||||
String message = "Invalid SubCluster Id information."
|
||||
+ " Please try again by specifying valid Subcluster Id.";
|
||||
LOG.warn(message);
|
||||
throw new FederationStateStoreInvalidInputException(message);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Validate if the SubCluster Address is a valid URL or not.
|
||||
*
|
||||
* @param address the endpoint of the subcluster to be verified
|
||||
* @throws FederationStateStoreInvalidInputException if the address is invalid
|
||||
*/
|
||||
private static void checkAddress(String address)
|
||||
throws FederationStateStoreInvalidInputException {
|
||||
// Ensure url is not null
|
||||
if (address == null || address.isEmpty()) {
|
||||
String message = "Missing SubCluster Endpoint information."
|
||||
+ " Please try again by specifying SubCluster Endpoint information.";
|
||||
LOG.warn(message);
|
||||
throw new FederationStateStoreInvalidInputException(message);
|
||||
}
|
||||
// Validate url is well formed
|
||||
boolean hasScheme = address.contains("://");
|
||||
URI uri = null;
|
||||
try {
|
||||
uri = hasScheme ? URI.create(address)
|
||||
: URI.create("dummyscheme://" + address);
|
||||
} catch (IllegalArgumentException e) {
|
||||
String message = "The provided SubCluster Endpoint does not contain a"
|
||||
+ " valid host:port authority: " + address;
|
||||
LOG.warn(message);
|
||||
throw new FederationStateStoreInvalidInputException(message);
|
||||
}
|
||||
String host = uri.getHost();
|
||||
int port = uri.getPort();
|
||||
String path = uri.getPath();
|
||||
if ((host == null) || (port < 0)
|
||||
|| (!hasScheme && path != null && !path.isEmpty())) {
|
||||
String message = "The provided SubCluster Endpoint does not contain a"
|
||||
+ " valid host:port authority: " + address;
|
||||
LOG.warn(message);
|
||||
throw new FederationStateStoreInvalidInputException(message);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Validate if the SubCluster State is present or not.
|
||||
*
|
||||
* @param state the state of the subcluster to be verified
|
||||
* @throws FederationStateStoreInvalidInputException if the SubCluster State
|
||||
* is invalid
|
||||
*/
|
||||
private static void checkSubClusterState(SubClusterState state)
|
||||
throws FederationStateStoreInvalidInputException {
|
||||
// check sub-cluster state is not empty
|
||||
if (state == null) {
|
||||
String message = "Missing SubCluster State information."
|
||||
+ " Please try again by specifying SubCluster State information.";
|
||||
LOG.warn(message);
|
||||
throw new FederationStateStoreInvalidInputException(message);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,144 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.federation.store.utils;
|
||||
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationRequest;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationRequest;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Utility class to validate the inputs to {@code FederationPolicyStore}, allows
|
||||
* a fail fast mechanism for invalid user inputs.
|
||||
*
|
||||
*/
|
||||
public final class FederationPolicyStoreInputValidator {
|
||||
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(FederationPolicyStoreInputValidator.class);
|
||||
|
||||
private FederationPolicyStoreInputValidator() {
|
||||
}
|
||||
|
||||
/**
|
||||
* Quick validation on the input to check some obvious fail conditions (fail
|
||||
* fast). Check if the provided
|
||||
* {@link GetSubClusterPolicyConfigurationRequest} for querying policy's
|
||||
* information is valid or not.
|
||||
*
|
||||
* @param request the {@link GetSubClusterPolicyConfigurationRequest} to
|
||||
* validate against
|
||||
* @throws FederationStateStoreInvalidInputException if the request is invalid
|
||||
*/
|
||||
public static void validateGetSubClusterPolicyConfigurationRequest(
|
||||
GetSubClusterPolicyConfigurationRequest request)
|
||||
throws FederationStateStoreInvalidInputException {
|
||||
if (request == null) {
|
||||
String message = "Missing GetSubClusterPolicyConfiguration Request."
|
||||
+ " Please try again by specifying a policy selection information.";
|
||||
LOG.warn(message);
|
||||
throw new FederationStateStoreInvalidInputException(message);
|
||||
}
|
||||
|
||||
// validate queue id
|
||||
checkQueue(request.getQueue());
|
||||
}
|
||||
|
||||
/**
|
||||
* Quick validation on the input to check some obvious fail conditions (fail
|
||||
* fast). Check if the provided
|
||||
* {@link SetSubClusterPolicyConfigurationRequest} for adding a new policy is
|
||||
* valid or not.
|
||||
*
|
||||
* @param request the {@link SetSubClusterPolicyConfigurationRequest} to
|
||||
* validate against
|
||||
* @throws FederationStateStoreInvalidInputException if the request is invalid
|
||||
*/
|
||||
public static void validateSetSubClusterPolicyConfigurationRequest(
|
||||
SetSubClusterPolicyConfigurationRequest request)
|
||||
throws FederationStateStoreInvalidInputException {
|
||||
if (request == null) {
|
||||
String message = "Missing SetSubClusterPolicyConfiguration Request."
|
||||
+ " Please try again by specifying an policy insertion information.";
|
||||
LOG.warn(message);
|
||||
throw new FederationStateStoreInvalidInputException(message);
|
||||
}
|
||||
|
||||
// validate subcluster policy configuration
|
||||
checkSubClusterPolicyConfiguration(request.getPolicyConfiguration());
|
||||
}
|
||||
|
||||
/**
|
||||
* Validate if the SubClusterPolicyConfiguration is valid or not.
|
||||
*
|
||||
* @param policyConfiguration the policy information to be verified
|
||||
* @throws FederationStateStoreInvalidInputException if the policy information
|
||||
* are invalid
|
||||
*/
|
||||
private static void checkSubClusterPolicyConfiguration(
|
||||
SubClusterPolicyConfiguration policyConfiguration)
|
||||
throws FederationStateStoreInvalidInputException {
|
||||
if (policyConfiguration == null) {
|
||||
String message = "Missing SubClusterPolicyConfiguration."
|
||||
+ " Please try again by specifying a SubClusterPolicyConfiguration.";
|
||||
LOG.warn(message);
|
||||
throw new FederationStateStoreInvalidInputException(message);
|
||||
}
|
||||
|
||||
// validate queue id
|
||||
checkQueue(policyConfiguration.getQueue());
|
||||
// validate policy type
|
||||
checkType(policyConfiguration.getType());
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Validate if the queue id is a valid or not.
|
||||
*
|
||||
* @param queue the queue id of the policy to be verified
|
||||
* @throws FederationStateStoreInvalidInputException if the queue id is
|
||||
* invalid
|
||||
*/
|
||||
private static void checkQueue(String queue)
|
||||
throws FederationStateStoreInvalidInputException {
|
||||
if (queue == null || queue.isEmpty()) {
|
||||
String message = "Missing Queue. Please try again by specifying a Queue.";
|
||||
LOG.warn(message);
|
||||
throw new FederationStateStoreInvalidInputException(message);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Validate if the policy type is a valid or not.
|
||||
*
|
||||
* @param type the type of the policy to be verified
|
||||
* @throws FederationStateStoreInvalidInputException if the policy is invalid
|
||||
*/
|
||||
private static void checkType(String type)
|
||||
throws FederationStateStoreInvalidInputException {
|
||||
if (type == null || type.isEmpty()) {
|
||||
String message = "Missing Policy Type."
|
||||
+ " Please try again by specifying a Policy Type.";
|
||||
LOG.warn(message);
|
||||
throw new FederationStateStoreInvalidInputException(message);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,48 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.federation.store.utils;
|
||||
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
|
||||
/**
|
||||
* Exception thrown by the {@link FederationMembershipStateStoreInputValidator},
|
||||
* {@link FederationApplicationHomeSubClusterStoreInputValidator},
|
||||
* {@link FederationPolicyStoreInputValidator} if the input is invalid.
|
||||
*
|
||||
*/
|
||||
public class FederationStateStoreInvalidInputException extends YarnException {
|
||||
|
||||
/**
|
||||
* IDE auto-generated.
|
||||
*/
|
||||
private static final long serialVersionUID = -7352144682711430801L;
|
||||
|
||||
public FederationStateStoreInvalidInputException(Throwable cause) {
|
||||
super(cause);
|
||||
}
|
||||
|
||||
public FederationStateStoreInvalidInputException(String message) {
|
||||
super(message);
|
||||
}
|
||||
|
||||
public FederationStateStoreInvalidInputException(String message,
|
||||
Throwable cause) {
|
||||
super(message, cause);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,17 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with this
|
||||
* work for additional information regarding copyright ownership. The ASF
|
||||
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations under
|
||||
* the License.
|
||||
*/
|
||||
package org.apache.hadoop.yarn.server.federation.store.utils;
|
|
@ -162,9 +162,9 @@ public abstract class FederationStateStoreBaseTest {
|
|||
SubClusterRegisterRequest.newInstance(subClusterInfo2));
|
||||
|
||||
stateStore.subClusterHeartbeat(SubClusterHeartbeatRequest
|
||||
.newInstance(subClusterId1, SubClusterState.SC_RUNNING, ""));
|
||||
stateStore.subClusterHeartbeat(SubClusterHeartbeatRequest
|
||||
.newInstance(subClusterId2, SubClusterState.SC_UNHEALTHY, ""));
|
||||
.newInstance(subClusterId1, SubClusterState.SC_RUNNING, "capability"));
|
||||
stateStore.subClusterHeartbeat(SubClusterHeartbeatRequest.newInstance(
|
||||
subClusterId2, SubClusterState.SC_UNHEALTHY, "capability"));
|
||||
|
||||
Assert.assertTrue(
|
||||
stateStore.getSubClusters(GetSubClustersInfoRequest.newInstance(true))
|
||||
|
|
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue