diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java index cea4ac250ca..a540dff3d5f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java @@ -20,34 +20,71 @@ package org.apache.hadoop.yarn.server.federation.store.impl; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; -import org.apache.hadoop.yarn.server.federation.store.FederationMembershipStateStore; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration; +import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterResponse; +import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterResponse; +import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster; +import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterResponse; +import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse; +import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterResponse; import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest; import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoResponse; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsResponse; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationResponse; import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest; import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse; +import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationRequest; +import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationResponse; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatRequest; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatResponse; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterResponse; +import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterResponse; +import org.apache.hadoop.yarn.server.records.Version; import org.apache.hadoop.yarn.util.MonotonicClock; /** - * In-memory implementation of FederationMembershipStateStore. + * In-memory implementation of {@link FederationStateStore}. */ -public class MemoryFederationStateStore - implements FederationMembershipStateStore { +public class MemoryFederationStateStore implements FederationStateStore { + + private Map membership; + private Map applications; + private Map policies; - private final Map membership = - new ConcurrentHashMap(); private final MonotonicClock clock = new MonotonicClock(); + @Override + public void init(Configuration conf) { + membership = new ConcurrentHashMap(); + applications = new ConcurrentHashMap(); + policies = new ConcurrentHashMap(); + } + + @Override + public void close() { + membership = null; + applications = null; + policies = null; + } + @Override public SubClusterRegisterResponse registerSubCluster( SubClusterRegisterRequest request) throws YarnException { @@ -116,4 +153,113 @@ public class MemoryFederationStateStore return GetSubClustersInfoResponse.newInstance(result); } + // FederationApplicationHomeSubClusterStore methods + + @Override + public AddApplicationHomeSubClusterResponse addApplicationHomeSubClusterMap( + AddApplicationHomeSubClusterRequest request) throws YarnException { + ApplicationId appId = + request.getApplicationHomeSubCluster().getApplicationId(); + if (applications.containsKey(appId)) { + throw new YarnException("Application " + appId + " already exists"); + } + + applications.put(appId, + request.getApplicationHomeSubCluster().getHomeSubCluster()); + return AddApplicationHomeSubClusterResponse.newInstance(); + } + + @Override + public UpdateApplicationHomeSubClusterResponse updateApplicationHomeSubClusterMap( + UpdateApplicationHomeSubClusterRequest request) throws YarnException { + ApplicationId appId = + request.getApplicationHomeSubCluster().getApplicationId(); + if (!applications.containsKey(appId)) { + throw new YarnException("Application " + appId + " does not exist"); + } + + applications.put(appId, + request.getApplicationHomeSubCluster().getHomeSubCluster()); + return UpdateApplicationHomeSubClusterResponse.newInstance(); + } + + @Override + public GetApplicationHomeSubClusterResponse getApplicationHomeSubClusterMap( + GetApplicationHomeSubClusterRequest request) throws YarnException { + ApplicationId appId = request.getApplicationId(); + if (!applications.containsKey(appId)) { + throw new YarnException("Application " + appId + " does not exist"); + } + + return GetApplicationHomeSubClusterResponse.newInstance( + ApplicationHomeSubCluster.newInstance(appId, applications.get(appId))); + } + + @Override + public GetApplicationsHomeSubClusterResponse getApplicationsHomeSubClusterMap( + GetApplicationsHomeSubClusterRequest request) throws YarnException { + List result = + new ArrayList(); + for (Entry e : applications.entrySet()) { + result + .add(ApplicationHomeSubCluster.newInstance(e.getKey(), e.getValue())); + } + + GetApplicationsHomeSubClusterResponse.newInstance(result); + return GetApplicationsHomeSubClusterResponse.newInstance(result); + } + + @Override + public DeleteApplicationHomeSubClusterResponse deleteApplicationHomeSubClusterMap( + DeleteApplicationHomeSubClusterRequest request) throws YarnException { + ApplicationId appId = request.getApplicationId(); + if (!applications.containsKey(appId)) { + throw new YarnException("Application " + appId + " does not exist"); + } + + applications.remove(appId); + return DeleteApplicationHomeSubClusterResponse.newInstance(); + } + + @Override + public GetSubClusterPolicyConfigurationResponse getPolicyConfiguration( + GetSubClusterPolicyConfigurationRequest request) throws YarnException { + String queue = request.getQueue(); + if (!policies.containsKey(queue)) { + throw new YarnException("Policy for queue " + queue + " does not exist"); + } + + return GetSubClusterPolicyConfigurationResponse + .newInstance(policies.get(queue)); + } + + @Override + public SetSubClusterPolicyConfigurationResponse setPolicyConfiguration( + SetSubClusterPolicyConfigurationRequest request) throws YarnException { + policies.put(request.getPolicyConfiguration().getQueue(), + request.getPolicyConfiguration()); + return SetSubClusterPolicyConfigurationResponse.newInstance(); + } + + @Override + public GetSubClusterPoliciesConfigurationsResponse getPoliciesConfigurations( + GetSubClusterPoliciesConfigurationsRequest request) throws YarnException { + ArrayList result = + new ArrayList(); + for (SubClusterPolicyConfiguration policy : policies.values()) { + result.add(policy); + } + return GetSubClusterPoliciesConfigurationsResponse.newInstance(result); + } + + @Override + public Version getCurrentVersion() { + return null; + } + + @Override + public Version loadVersion() { + return null; + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/GetSubClusterPoliciesConfigurationsRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/GetSubClusterPoliciesConfigurationsRequest.java index 404521b522d..8cb84f3070e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/GetSubClusterPoliciesConfigurationsRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/GetSubClusterPoliciesConfigurationsRequest.java @@ -29,7 +29,7 @@ import org.apache.hadoop.yarn.util.Records; @Private @Unstable public abstract class GetSubClusterPoliciesConfigurationsRequest { - public GetSubClusterPoliciesConfigurationsRequest newInstance() { + public static GetSubClusterPoliciesConfigurationsRequest newInstance() { return Records.newRecord(GetSubClusterPoliciesConfigurationsRequest.class); } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/GetSubClusterPoliciesConfigurationsResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/GetSubClusterPoliciesConfigurationsResponse.java index 6554d687891..2eaeb512c10 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/GetSubClusterPoliciesConfigurationsResponse.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/GetSubClusterPoliciesConfigurationsResponse.java @@ -36,7 +36,7 @@ public abstract class GetSubClusterPoliciesConfigurationsResponse { @Private @Unstable - public GetSubClusterPoliciesConfigurationsResponse newInstance( + public static GetSubClusterPoliciesConfigurationsResponse newInstance( List policyConfigurations) { GetSubClusterPoliciesConfigurationsResponse response = Records.newRecord(GetSubClusterPoliciesConfigurationsResponse.class); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/GetSubClusterPolicyConfigurationRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/GetSubClusterPolicyConfigurationRequest.java index 7b7d8c494ba..c3f49e1e486 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/GetSubClusterPolicyConfigurationRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/GetSubClusterPolicyConfigurationRequest.java @@ -33,7 +33,8 @@ public abstract class GetSubClusterPolicyConfigurationRequest { @Private @Unstable - public GetSubClusterPolicyConfigurationRequest newInstance(String queueName) { + public static GetSubClusterPolicyConfigurationRequest newInstance( + String queueName) { GetSubClusterPolicyConfigurationRequest request = Records.newRecord(GetSubClusterPolicyConfigurationRequest.class); request.setQueue(queueName); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/GetSubClusterPolicyConfigurationResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/GetSubClusterPolicyConfigurationResponse.java index 11a46e00d72..350b2393013 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/GetSubClusterPolicyConfigurationResponse.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/GetSubClusterPolicyConfigurationResponse.java @@ -34,7 +34,7 @@ public abstract class GetSubClusterPolicyConfigurationResponse { @Private @Unstable - public GetSubClusterPolicyConfigurationResponse newInstance( + public static GetSubClusterPolicyConfigurationResponse newInstance( SubClusterPolicyConfiguration policy) { GetSubClusterPolicyConfigurationResponse response = Records.newRecord(GetSubClusterPolicyConfigurationResponse.class); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SetSubClusterPolicyConfigurationRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SetSubClusterPolicyConfigurationRequest.java index 06d539996b2..743ad0ebddd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SetSubClusterPolicyConfigurationRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SetSubClusterPolicyConfigurationRequest.java @@ -32,7 +32,7 @@ import org.apache.hadoop.yarn.util.Records; public abstract class SetSubClusterPolicyConfigurationRequest { @Private @Unstable - public SetSubClusterPolicyConfigurationRequest newInstance( + public static SetSubClusterPolicyConfigurationRequest newInstance( SubClusterPolicyConfiguration policy) { SetSubClusterPolicyConfigurationRequest request = Records.newRecord(SetSubClusterPolicyConfigurationRequest.class); @@ -40,24 +40,6 @@ public abstract class SetSubClusterPolicyConfigurationRequest { return request; } - /** - * Get the name of the queue for which we are configuring a policy. - * - * @return the name of the queue - */ - @Public - @Unstable - public abstract String getQueue(); - - /** - * Sets the name of the queue for which we are configuring a policy. - * - * @param queueName the name of the queue - */ - @Private - @Unstable - public abstract void setQueue(String queueName); - /** * Get the policy configuration assigned to the queue. * diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SetSubClusterPolicyConfigurationResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SetSubClusterPolicyConfigurationResponse.java index 33c404348b0..401e984983e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SetSubClusterPolicyConfigurationResponse.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SetSubClusterPolicyConfigurationResponse.java @@ -30,7 +30,7 @@ import org.apache.hadoop.yarn.util.Records; @Private @Unstable public abstract class SetSubClusterPolicyConfigurationResponse { - public SetSubClusterPolicyConfigurationResponse newInstance() { + public static SetSubClusterPolicyConfigurationResponse newInstance() { return Records.newRecord(SetSubClusterPolicyConfigurationResponse.class); } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SubClusterPolicyConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SubClusterPolicyConfiguration.java index bc12acb4062..28391393f76 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SubClusterPolicyConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SubClusterPolicyConfiguration.java @@ -29,8 +29,8 @@ import java.nio.ByteBuffer; /** * {@link SubClusterPolicyConfiguration} is a class that represents a - * configuration of a policy. It contains a policy type (resolve to a class - * name) and its params as an opaque {@link ByteBuffer}. + * configuration of a policy. For a single queue, it contains a policy type + * (resolve to a class name) and its params as an opaque {@link ByteBuffer}. * * Note: by design the params are an opaque ByteBuffer, this allows for enough * flexibility to evolve the policies without impacting the protocols to/from @@ -42,15 +42,34 @@ public abstract class SubClusterPolicyConfiguration { @Private @Unstable - public static SubClusterPolicyConfiguration newInstance(String policyType, - ByteBuffer policyParams) { + public static SubClusterPolicyConfiguration newInstance(String queue, + String policyType, ByteBuffer policyParams) { SubClusterPolicyConfiguration policy = Records.newRecord(SubClusterPolicyConfiguration.class); + policy.setQueue(queue); policy.setType(policyType); policy.setParams(policyParams); return policy; } + /** + * Get the name of the queue for which we are configuring a policy. + * + * @return the name of the queue + */ + @Public + @Unstable + public abstract String getQueue(); + + /** + * Sets the name of the queue for which we are configuring a policy. + * + * @param queueName the name of the queue + */ + @Private + @Unstable + public abstract void setQueue(String queueName); + /** * Get the type of the policy. This could be random, round-robin, load-based, * etc. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/GetApplicationHomeSubClusterRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/GetApplicationHomeSubClusterRequestPBImpl.java index 865d0c4bc91..585ba81df3a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/GetApplicationHomeSubClusterRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/GetApplicationHomeSubClusterRequestPBImpl.java @@ -108,6 +108,10 @@ public class GetApplicationHomeSubClusterRequestPBImpl public ApplicationId getApplicationId() { GetApplicationHomeSubClusterRequestProtoOrBuilder p = viaProto ? proto : builder; + if (applicationId != null) { + return applicationId; + } + if (!p.hasApplicationId()) { return null; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/SetSubClusterPolicyConfigurationRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/SetSubClusterPolicyConfigurationRequestPBImpl.java index 5e29bd53e42..7b7f89dba84 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/SetSubClusterPolicyConfigurationRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/SetSubClusterPolicyConfigurationRequestPBImpl.java @@ -106,23 +106,6 @@ public class SetSubClusterPolicyConfigurationRequestPBImpl return TextFormat.shortDebugString(getProto()); } - @Override - public String getQueue() { - SetSubClusterPolicyConfigurationRequestProtoOrBuilder p = - viaProto ? proto : builder; - return p.getQueue(); - } - - @Override - public void setQueue(String queueName) { - maybeInitBuilder(); - if (queueName == null) { - builder.clearQueue(); - return; - } - builder.setQueue(queueName); - } - @Override public SubClusterPolicyConfiguration getPolicyConfiguration() { SetSubClusterPolicyConfigurationRequestProtoOrBuilder p = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/SubClusterPolicyConfigurationPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/SubClusterPolicyConfigurationPBImpl.java index fe9d9db202b..305a8d32232 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/SubClusterPolicyConfigurationPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/SubClusterPolicyConfigurationPBImpl.java @@ -86,6 +86,23 @@ public class SubClusterPolicyConfigurationPBImpl return TextFormat.shortDebugString(getProto()); } + @Override + public String getQueue() { + SubClusterPolicyConfigurationProtoOrBuilder p = viaProto ? proto : builder; + return p.getQueue(); + } + + @Override + public void setQueue(String queueName) { + maybeInitBuilder(); + if (queueName == null) { + builder.clearType(); + return; + } + builder.setQueue(queueName); + + } + @Override public String getType() { SubClusterPolicyConfigurationProtoOrBuilder p = viaProto ? proto : builder; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_federation_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_federation_protos.proto index 3f1cee94f40..11f786fdc3a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_federation_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_federation_protos.proto @@ -136,8 +136,9 @@ message DeleteApplicationHomeSubClusterResponseProto { } message SubClusterPolicyConfigurationProto { - optional string type = 1; - optional bytes params = 2; + optional string queue = 1; + optional string type = 2; + optional bytes params = 3; } message GetSubClusterPolicyConfigurationRequestProto { @@ -149,8 +150,7 @@ message GetSubClusterPolicyConfigurationResponseProto { } message SetSubClusterPolicyConfigurationRequestProto { - optional string queue = 1; - optional SubClusterPolicyConfigurationProto policy_configuration = 2; + optional SubClusterPolicyConfigurationProto policy_configuration = 1; } message SetSubClusterPolicyConfigurationResponseProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java index c76a485c2b7..165dd78ee4d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java @@ -18,18 +18,39 @@ package org.apache.hadoop.yarn.server.federation.store.impl; import java.io.IOException; +import java.nio.ByteBuffer; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.server.federation.store.FederationMembershipStateStore; +import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; +import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterResponse; +import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster; +import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterResponse; +import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse; +import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterResponse; import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsResponse; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationResponse; import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest; +import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationRequest; +import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationResponse; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatRequest; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterResponse; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState; +import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterResponse; import org.apache.hadoop.yarn.util.MonotonicClock; import org.junit.After; import org.junit.Assert; @@ -42,20 +63,21 @@ import org.junit.Test; public abstract class FederationStateStoreBaseTest { private static final MonotonicClock CLOCK = new MonotonicClock(); + private FederationStateStore stateStore = createStateStore(); - private FederationMembershipStateStore stateStore; + protected abstract FederationStateStore createStateStore(); @Before - public void before() throws IOException { - stateStore = getCleanStateStore(); + public void before() throws IOException, YarnException { + stateStore.init(new Configuration()); } @After - public void after() { - stateStore = null; + public void after() throws Exception { + stateStore.close(); } - protected abstract FederationMembershipStateStore getCleanStateStore(); + // Test FederationMembershipStateStore @Test public void testRegisterSubCluster() throws Exception { @@ -72,10 +94,7 @@ public abstract class FederationStateStoreBaseTest { @Test public void testDeregisterSubCluster() throws Exception { SubClusterId subClusterId = SubClusterId.newInstance("SC"); - SubClusterInfo subClusterInfo = createSubClusterInfo(subClusterId); - - stateStore.registerSubCluster( - SubClusterRegisterRequest.newInstance(subClusterInfo)); + registerSubCluster(subClusterId); SubClusterDeregisterRequest deregisterRequest = SubClusterDeregisterRequest .newInstance(subClusterId, SubClusterState.SC_UNREGISTERED); @@ -105,9 +124,7 @@ public abstract class FederationStateStoreBaseTest { SubClusterId subClusterId = SubClusterId.newInstance("SC"); SubClusterInfo subClusterInfo = createSubClusterInfo(subClusterId); - - stateStore.registerSubCluster( - SubClusterRegisterRequest.newInstance(subClusterInfo)); + registerSubCluster(subClusterId); GetSubClusterInfoRequest request = GetSubClusterInfoRequest.newInstance(subClusterId); @@ -167,10 +184,7 @@ public abstract class FederationStateStoreBaseTest { @Test public void testSubClusterHeartbeat() throws Exception { SubClusterId subClusterId = SubClusterId.newInstance("SC"); - SubClusterInfo subClusterInfo = createSubClusterInfo(subClusterId); - - stateStore.registerSubCluster( - SubClusterRegisterRequest.newInstance(subClusterInfo)); + registerSubCluster(subClusterId); SubClusterHeartbeatRequest heartbeatRequest = SubClusterHeartbeatRequest .newInstance(subClusterId, SubClusterState.SC_RUNNING, "cabability"); @@ -196,6 +210,271 @@ public abstract class FederationStateStoreBaseTest { } } + // Test FederationApplicationHomeSubClusterStore + + @Test + public void testAddApplicationHomeSubClusterMap() throws Exception { + ApplicationId appId = ApplicationId.newInstance(1, 1); + SubClusterId subClusterId = SubClusterId.newInstance("SC"); + ApplicationHomeSubCluster ahsc = + ApplicationHomeSubCluster.newInstance(appId, subClusterId); + + AddApplicationHomeSubClusterRequest request = + AddApplicationHomeSubClusterRequest.newInstance(ahsc); + AddApplicationHomeSubClusterResponse response = + stateStore.addApplicationHomeSubClusterMap(request); + + Assert.assertNotNull(response); + Assert.assertEquals(subClusterId, queryApplicationHomeSC(appId)); + + } + + @Test + public void testAddApplicationHomeSubClusterMapAppAlreadyExists() + throws Exception { + ApplicationId appId = ApplicationId.newInstance(1, 1); + SubClusterId subClusterId1 = SubClusterId.newInstance("SC1"); + addApplicationHomeSC(appId, subClusterId1); + + SubClusterId subClusterId2 = SubClusterId.newInstance("SC2"); + ApplicationHomeSubCluster ahsc2 = + ApplicationHomeSubCluster.newInstance(appId, subClusterId2); + + try { + stateStore.addApplicationHomeSubClusterMap( + AddApplicationHomeSubClusterRequest.newInstance(ahsc2)); + Assert.fail(); + } catch (YarnException e) { + Assert.assertTrue(e.getMessage() + .startsWith("Application " + appId.toString() + " already exists")); + } + + Assert.assertEquals(subClusterId1, queryApplicationHomeSC(appId)); + + } + + @Test + public void testDeleteApplicationHomeSubClusterMap() throws Exception { + ApplicationId appId = ApplicationId.newInstance(1, 1); + SubClusterId subClusterId = SubClusterId.newInstance("SC"); + addApplicationHomeSC(appId, subClusterId); + + DeleteApplicationHomeSubClusterRequest delRequest = + DeleteApplicationHomeSubClusterRequest.newInstance(appId); + + DeleteApplicationHomeSubClusterResponse response = + stateStore.deleteApplicationHomeSubClusterMap(delRequest); + + Assert.assertNotNull(response); + try { + queryApplicationHomeSC(appId); + Assert.fail(); + } catch (YarnException e) { + Assert.assertTrue(e.getMessage() + .startsWith("Application " + appId + " does not exist")); + } + + } + + @Test + public void testDeleteApplicationHomeSubClusterMapUnknownApp() + throws Exception { + ApplicationId appId = ApplicationId.newInstance(1, 1); + DeleteApplicationHomeSubClusterRequest delRequest = + DeleteApplicationHomeSubClusterRequest.newInstance(appId); + + try { + stateStore.deleteApplicationHomeSubClusterMap(delRequest); + Assert.fail(); + } catch (YarnException e) { + Assert.assertTrue(e.getMessage() + .startsWith("Application " + appId.toString() + " does not exist")); + } + } + + @Test + public void testGetApplicationHomeSubClusterMap() throws Exception { + ApplicationId appId = ApplicationId.newInstance(1, 1); + SubClusterId subClusterId = SubClusterId.newInstance("SC"); + addApplicationHomeSC(appId, subClusterId); + + GetApplicationHomeSubClusterRequest getRequest = + GetApplicationHomeSubClusterRequest.newInstance(appId); + + GetApplicationHomeSubClusterResponse result = + stateStore.getApplicationHomeSubClusterMap(getRequest); + + Assert.assertEquals(appId, + result.getApplicationHomeSubCluster().getApplicationId()); + Assert.assertEquals(subClusterId, + result.getApplicationHomeSubCluster().getHomeSubCluster()); + } + + @Test + public void testGetApplicationHomeSubClusterMapUnknownApp() throws Exception { + ApplicationId appId = ApplicationId.newInstance(1, 1); + GetApplicationHomeSubClusterRequest request = + GetApplicationHomeSubClusterRequest.newInstance(appId); + + try { + stateStore.getApplicationHomeSubClusterMap(request); + Assert.fail(); + } catch (YarnException e) { + Assert.assertTrue(e.getMessage() + .startsWith("Application " + appId.toString() + " does not exist")); + } + } + + @Test + public void testGetApplicationsHomeSubClusterMap() throws Exception { + ApplicationId appId1 = ApplicationId.newInstance(1, 1); + SubClusterId subClusterId1 = SubClusterId.newInstance("SC1"); + ApplicationHomeSubCluster ahsc1 = + ApplicationHomeSubCluster.newInstance(appId1, subClusterId1); + + ApplicationId appId2 = ApplicationId.newInstance(1, 2); + SubClusterId subClusterId2 = SubClusterId.newInstance("SC2"); + ApplicationHomeSubCluster ahsc2 = + ApplicationHomeSubCluster.newInstance(appId2, subClusterId2); + + addApplicationHomeSC(appId1, subClusterId1); + addApplicationHomeSC(appId2, subClusterId2); + + GetApplicationsHomeSubClusterRequest getRequest = + GetApplicationsHomeSubClusterRequest.newInstance(); + + GetApplicationsHomeSubClusterResponse result = + stateStore.getApplicationsHomeSubClusterMap(getRequest); + + Assert.assertEquals(2, result.getAppsHomeSubClusters().size()); + Assert.assertTrue(result.getAppsHomeSubClusters().contains(ahsc1)); + Assert.assertTrue(result.getAppsHomeSubClusters().contains(ahsc2)); + } + + @Test + public void testUpdateApplicationHomeSubClusterMap() throws Exception { + ApplicationId appId = ApplicationId.newInstance(1, 1); + SubClusterId subClusterId1 = SubClusterId.newInstance("SC1"); + addApplicationHomeSC(appId, subClusterId1); + + SubClusterId subClusterId2 = SubClusterId.newInstance("SC2"); + ApplicationHomeSubCluster ahscUpdate = + ApplicationHomeSubCluster.newInstance(appId, subClusterId2); + + UpdateApplicationHomeSubClusterRequest updateRequest = + UpdateApplicationHomeSubClusterRequest.newInstance(ahscUpdate); + + UpdateApplicationHomeSubClusterResponse response = + stateStore.updateApplicationHomeSubClusterMap(updateRequest); + + Assert.assertNotNull(response); + + Assert.assertEquals(subClusterId2, queryApplicationHomeSC(appId)); + } + + @Test + public void testUpdateApplicationHomeSubClusterMapUnknownApp() + throws Exception { + ApplicationId appId = ApplicationId.newInstance(1, 1); + SubClusterId subClusterId1 = SubClusterId.newInstance("SC1"); + ApplicationHomeSubCluster ahsc = + ApplicationHomeSubCluster.newInstance(appId, subClusterId1); + + UpdateApplicationHomeSubClusterRequest updateRequest = + UpdateApplicationHomeSubClusterRequest.newInstance(ahsc); + + try { + stateStore.updateApplicationHomeSubClusterMap((updateRequest)); + Assert.fail(); + } catch (YarnException e) { + Assert.assertTrue(e.getMessage() + .startsWith("Application " + appId.toString() + " does not exist")); + } + } + + // Test FederationPolicyStore + + @Test + public void testSetPolicyConfiguration() throws Exception { + SetSubClusterPolicyConfigurationRequest request = + SetSubClusterPolicyConfigurationRequest + .newInstance(createSCPolicyConf("Queue", "PolicyType")); + + SetSubClusterPolicyConfigurationResponse result = + stateStore.setPolicyConfiguration(request); + + Assert.assertNotNull(result); + Assert.assertEquals(createSCPolicyConf("Queue", "PolicyType"), + queryPolicy("Queue")); + + } + + @Test + public void testSetPolicyConfigurationUpdateExisting() throws Exception { + setPolicyConf("Queue", "PolicyType1"); + + SetSubClusterPolicyConfigurationRequest request2 = + SetSubClusterPolicyConfigurationRequest + .newInstance(createSCPolicyConf("Queue", "PolicyType2")); + SetSubClusterPolicyConfigurationResponse result = + stateStore.setPolicyConfiguration(request2); + + Assert.assertNotNull(result); + Assert.assertEquals(createSCPolicyConf("Queue", "PolicyType2"), + queryPolicy("Queue")); + } + + @Test + public void testGetPolicyConfiguration() throws Exception { + setPolicyConf("Queue", "PolicyType"); + + GetSubClusterPolicyConfigurationRequest getRequest = + GetSubClusterPolicyConfigurationRequest.newInstance("Queue"); + GetSubClusterPolicyConfigurationResponse result = + stateStore.getPolicyConfiguration(getRequest); + + Assert.assertNotNull(result); + Assert.assertEquals(createSCPolicyConf("Queue", "PolicyType"), + result.getPolicyConfiguration()); + + } + + @Test + public void testGetPolicyConfigurationUnknownQueue() throws Exception { + + GetSubClusterPolicyConfigurationRequest request = + GetSubClusterPolicyConfigurationRequest.newInstance("Queue"); + try { + stateStore.getPolicyConfiguration(request); + Assert.fail(); + } catch (YarnException e) { + Assert.assertTrue( + e.getMessage().startsWith("Policy for queue Queue does not exist")); + } + } + + @Test + public void testGetPoliciesConfigurations() throws Exception { + setPolicyConf("Queue1", "PolicyType1"); + setPolicyConf("Queue2", "PolicyType2"); + + GetSubClusterPoliciesConfigurationsResponse response = + stateStore.getPoliciesConfigurations( + GetSubClusterPoliciesConfigurationsRequest.newInstance()); + + Assert.assertNotNull(response); + Assert.assertNotNull(response.getPoliciesConfigs()); + + Assert.assertEquals(2, response.getPoliciesConfigs().size()); + + Assert.assertTrue(response.getPoliciesConfigs() + .contains(createSCPolicyConf("Queue1", "PolicyType1"))); + Assert.assertTrue(response.getPoliciesConfigs() + .contains(createSCPolicyConf("Queue2", "PolicyType2"))); + } + + // Convenience methods + private SubClusterInfo createSubClusterInfo(SubClusterId subClusterId) { String amRMAddress = "1.2.3.4:1"; @@ -208,6 +487,37 @@ public abstract class FederationStateStoreBaseTest { CLOCK.getTime(), "cabability"); } + private SubClusterPolicyConfiguration createSCPolicyConf(String queueName, + String policyType) { + return SubClusterPolicyConfiguration.newInstance(queueName, policyType, + ByteBuffer.allocate(1)); + } + + private void addApplicationHomeSC(ApplicationId appId, + SubClusterId subClusterId) throws YarnException { + ApplicationHomeSubCluster ahsc = + ApplicationHomeSubCluster.newInstance(appId, subClusterId); + AddApplicationHomeSubClusterRequest request = + AddApplicationHomeSubClusterRequest.newInstance(ahsc); + stateStore.addApplicationHomeSubClusterMap(request); + } + + private void setPolicyConf(String queue, String policyType) + throws YarnException { + SetSubClusterPolicyConfigurationRequest request = + SetSubClusterPolicyConfigurationRequest + .newInstance(createSCPolicyConf(queue, policyType)); + stateStore.setPolicyConfiguration(request); + } + + private void registerSubCluster(SubClusterId subClusterId) + throws YarnException { + + SubClusterInfo subClusterInfo = createSubClusterInfo(subClusterId); + stateStore.registerSubCluster( + SubClusterRegisterRequest.newInstance(subClusterInfo)); + } + private SubClusterInfo querySubClusterInfo(SubClusterId subClusterId) throws YarnException { GetSubClusterInfoRequest request = @@ -215,4 +525,25 @@ public abstract class FederationStateStoreBaseTest { return stateStore.getSubCluster(request).getSubClusterInfo(); } + private SubClusterId queryApplicationHomeSC(ApplicationId appId) + throws YarnException { + GetApplicationHomeSubClusterRequest request = + GetApplicationHomeSubClusterRequest.newInstance(appId); + + GetApplicationHomeSubClusterResponse response = + stateStore.getApplicationHomeSubClusterMap(request); + + return response.getApplicationHomeSubCluster().getHomeSubCluster(); + } + + private SubClusterPolicyConfiguration queryPolicy(String queue) + throws YarnException { + GetSubClusterPolicyConfigurationRequest request = + GetSubClusterPolicyConfigurationRequest.newInstance(queue); + + GetSubClusterPolicyConfigurationResponse result = + stateStore.getPolicyConfiguration(request); + return result.getPolicyConfiguration(); + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestMemoryFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestMemoryFederationStateStore.java index 9396edac9ad..74404c7457e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestMemoryFederationStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestMemoryFederationStateStore.java @@ -17,7 +17,7 @@ package org.apache.hadoop.yarn.server.federation.store.impl; -import org.apache.hadoop.yarn.server.federation.store.FederationMembershipStateStore; +import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; /** * Unit tests for MemoryFederationStateStore. @@ -26,7 +26,7 @@ public class TestMemoryFederationStateStore extends FederationStateStoreBaseTest { @Override - protected FederationMembershipStateStore getCleanStateStore() { + protected FederationStateStore createStateStore() { return new MemoryFederationStateStore(); } }