YARN-5407. In-memory based implementation of the FederationApplicationStateStore/FederationPolicyStateStore. (Ellen Hui via Subru)
This commit is contained in:
parent
268389d021
commit
b747d59f41
|
@ -20,34 +20,71 @@ package org.apache.hadoop.yarn.server.federation.store.impl;
|
|||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
|
||||
import org.apache.hadoop.yarn.server.federation.store.FederationMembershipStateStore;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
|
||||
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterResponse;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterResponse;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterRequest;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterResponse;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterRequest;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterResponse;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoResponse;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsRequest;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsResponse;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationRequest;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationResponse;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationRequest;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationResponse;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatRequest;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatResponse;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterResponse;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterRequest;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterResponse;
|
||||
import org.apache.hadoop.yarn.server.records.Version;
|
||||
import org.apache.hadoop.yarn.util.MonotonicClock;
|
||||
|
||||
/**
|
||||
* In-memory implementation of FederationMembershipStateStore.
|
||||
* In-memory implementation of {@link FederationStateStore}.
|
||||
*/
|
||||
public class MemoryFederationStateStore
|
||||
implements FederationMembershipStateStore {
|
||||
public class MemoryFederationStateStore implements FederationStateStore {
|
||||
|
||||
private Map<SubClusterId, SubClusterInfo> membership;
|
||||
private Map<ApplicationId, SubClusterId> applications;
|
||||
private Map<String, SubClusterPolicyConfiguration> policies;
|
||||
|
||||
private final Map<SubClusterId, SubClusterInfo> membership =
|
||||
new ConcurrentHashMap<SubClusterId, SubClusterInfo>();
|
||||
private final MonotonicClock clock = new MonotonicClock();
|
||||
|
||||
@Override
|
||||
public void init(Configuration conf) {
|
||||
membership = new ConcurrentHashMap<SubClusterId, SubClusterInfo>();
|
||||
applications = new ConcurrentHashMap<ApplicationId, SubClusterId>();
|
||||
policies = new ConcurrentHashMap<String, SubClusterPolicyConfiguration>();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
membership = null;
|
||||
applications = null;
|
||||
policies = null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public SubClusterRegisterResponse registerSubCluster(
|
||||
SubClusterRegisterRequest request) throws YarnException {
|
||||
|
@ -116,4 +153,113 @@ public class MemoryFederationStateStore
|
|||
return GetSubClustersInfoResponse.newInstance(result);
|
||||
}
|
||||
|
||||
// FederationApplicationHomeSubClusterStore methods
|
||||
|
||||
@Override
|
||||
public AddApplicationHomeSubClusterResponse addApplicationHomeSubClusterMap(
|
||||
AddApplicationHomeSubClusterRequest request) throws YarnException {
|
||||
ApplicationId appId =
|
||||
request.getApplicationHomeSubCluster().getApplicationId();
|
||||
if (applications.containsKey(appId)) {
|
||||
throw new YarnException("Application " + appId + " already exists");
|
||||
}
|
||||
|
||||
applications.put(appId,
|
||||
request.getApplicationHomeSubCluster().getHomeSubCluster());
|
||||
return AddApplicationHomeSubClusterResponse.newInstance();
|
||||
}
|
||||
|
||||
@Override
|
||||
public UpdateApplicationHomeSubClusterResponse updateApplicationHomeSubClusterMap(
|
||||
UpdateApplicationHomeSubClusterRequest request) throws YarnException {
|
||||
ApplicationId appId =
|
||||
request.getApplicationHomeSubCluster().getApplicationId();
|
||||
if (!applications.containsKey(appId)) {
|
||||
throw new YarnException("Application " + appId + " does not exist");
|
||||
}
|
||||
|
||||
applications.put(appId,
|
||||
request.getApplicationHomeSubCluster().getHomeSubCluster());
|
||||
return UpdateApplicationHomeSubClusterResponse.newInstance();
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetApplicationHomeSubClusterResponse getApplicationHomeSubClusterMap(
|
||||
GetApplicationHomeSubClusterRequest request) throws YarnException {
|
||||
ApplicationId appId = request.getApplicationId();
|
||||
if (!applications.containsKey(appId)) {
|
||||
throw new YarnException("Application " + appId + " does not exist");
|
||||
}
|
||||
|
||||
return GetApplicationHomeSubClusterResponse.newInstance(
|
||||
ApplicationHomeSubCluster.newInstance(appId, applications.get(appId)));
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetApplicationsHomeSubClusterResponse getApplicationsHomeSubClusterMap(
|
||||
GetApplicationsHomeSubClusterRequest request) throws YarnException {
|
||||
List<ApplicationHomeSubCluster> result =
|
||||
new ArrayList<ApplicationHomeSubCluster>();
|
||||
for (Entry<ApplicationId, SubClusterId> e : applications.entrySet()) {
|
||||
result
|
||||
.add(ApplicationHomeSubCluster.newInstance(e.getKey(), e.getValue()));
|
||||
}
|
||||
|
||||
GetApplicationsHomeSubClusterResponse.newInstance(result);
|
||||
return GetApplicationsHomeSubClusterResponse.newInstance(result);
|
||||
}
|
||||
|
||||
@Override
|
||||
public DeleteApplicationHomeSubClusterResponse deleteApplicationHomeSubClusterMap(
|
||||
DeleteApplicationHomeSubClusterRequest request) throws YarnException {
|
||||
ApplicationId appId = request.getApplicationId();
|
||||
if (!applications.containsKey(appId)) {
|
||||
throw new YarnException("Application " + appId + " does not exist");
|
||||
}
|
||||
|
||||
applications.remove(appId);
|
||||
return DeleteApplicationHomeSubClusterResponse.newInstance();
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetSubClusterPolicyConfigurationResponse getPolicyConfiguration(
|
||||
GetSubClusterPolicyConfigurationRequest request) throws YarnException {
|
||||
String queue = request.getQueue();
|
||||
if (!policies.containsKey(queue)) {
|
||||
throw new YarnException("Policy for queue " + queue + " does not exist");
|
||||
}
|
||||
|
||||
return GetSubClusterPolicyConfigurationResponse
|
||||
.newInstance(policies.get(queue));
|
||||
}
|
||||
|
||||
@Override
|
||||
public SetSubClusterPolicyConfigurationResponse setPolicyConfiguration(
|
||||
SetSubClusterPolicyConfigurationRequest request) throws YarnException {
|
||||
policies.put(request.getPolicyConfiguration().getQueue(),
|
||||
request.getPolicyConfiguration());
|
||||
return SetSubClusterPolicyConfigurationResponse.newInstance();
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetSubClusterPoliciesConfigurationsResponse getPoliciesConfigurations(
|
||||
GetSubClusterPoliciesConfigurationsRequest request) throws YarnException {
|
||||
ArrayList<SubClusterPolicyConfiguration> result =
|
||||
new ArrayList<SubClusterPolicyConfiguration>();
|
||||
for (SubClusterPolicyConfiguration policy : policies.values()) {
|
||||
result.add(policy);
|
||||
}
|
||||
return GetSubClusterPoliciesConfigurationsResponse.newInstance(result);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Version getCurrentVersion() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Version loadVersion() {
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -29,7 +29,7 @@ import org.apache.hadoop.yarn.util.Records;
|
|||
@Private
|
||||
@Unstable
|
||||
public abstract class GetSubClusterPoliciesConfigurationsRequest {
|
||||
public GetSubClusterPoliciesConfigurationsRequest newInstance() {
|
||||
public static GetSubClusterPoliciesConfigurationsRequest newInstance() {
|
||||
return Records.newRecord(GetSubClusterPoliciesConfigurationsRequest.class);
|
||||
}
|
||||
}
|
|
@ -36,7 +36,7 @@ public abstract class GetSubClusterPoliciesConfigurationsResponse {
|
|||
|
||||
@Private
|
||||
@Unstable
|
||||
public GetSubClusterPoliciesConfigurationsResponse newInstance(
|
||||
public static GetSubClusterPoliciesConfigurationsResponse newInstance(
|
||||
List<SubClusterPolicyConfiguration> policyConfigurations) {
|
||||
GetSubClusterPoliciesConfigurationsResponse response =
|
||||
Records.newRecord(GetSubClusterPoliciesConfigurationsResponse.class);
|
||||
|
|
|
@ -33,7 +33,8 @@ public abstract class GetSubClusterPolicyConfigurationRequest {
|
|||
|
||||
@Private
|
||||
@Unstable
|
||||
public GetSubClusterPolicyConfigurationRequest newInstance(String queueName) {
|
||||
public static GetSubClusterPolicyConfigurationRequest newInstance(
|
||||
String queueName) {
|
||||
GetSubClusterPolicyConfigurationRequest request =
|
||||
Records.newRecord(GetSubClusterPolicyConfigurationRequest.class);
|
||||
request.setQueue(queueName);
|
||||
|
|
|
@ -34,7 +34,7 @@ public abstract class GetSubClusterPolicyConfigurationResponse {
|
|||
|
||||
@Private
|
||||
@Unstable
|
||||
public GetSubClusterPolicyConfigurationResponse newInstance(
|
||||
public static GetSubClusterPolicyConfigurationResponse newInstance(
|
||||
SubClusterPolicyConfiguration policy) {
|
||||
GetSubClusterPolicyConfigurationResponse response =
|
||||
Records.newRecord(GetSubClusterPolicyConfigurationResponse.class);
|
||||
|
|
|
@ -32,7 +32,7 @@ import org.apache.hadoop.yarn.util.Records;
|
|||
public abstract class SetSubClusterPolicyConfigurationRequest {
|
||||
@Private
|
||||
@Unstable
|
||||
public SetSubClusterPolicyConfigurationRequest newInstance(
|
||||
public static SetSubClusterPolicyConfigurationRequest newInstance(
|
||||
SubClusterPolicyConfiguration policy) {
|
||||
SetSubClusterPolicyConfigurationRequest request =
|
||||
Records.newRecord(SetSubClusterPolicyConfigurationRequest.class);
|
||||
|
@ -40,24 +40,6 @@ public abstract class SetSubClusterPolicyConfigurationRequest {
|
|||
return request;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the name of the queue for which we are configuring a policy.
|
||||
*
|
||||
* @return the name of the queue
|
||||
*/
|
||||
@Public
|
||||
@Unstable
|
||||
public abstract String getQueue();
|
||||
|
||||
/**
|
||||
* Sets the name of the queue for which we are configuring a policy.
|
||||
*
|
||||
* @param queueName the name of the queue
|
||||
*/
|
||||
@Private
|
||||
@Unstable
|
||||
public abstract void setQueue(String queueName);
|
||||
|
||||
/**
|
||||
* Get the policy configuration assigned to the queue.
|
||||
*
|
||||
|
|
|
@ -30,7 +30,7 @@ import org.apache.hadoop.yarn.util.Records;
|
|||
@Private
|
||||
@Unstable
|
||||
public abstract class SetSubClusterPolicyConfigurationResponse {
|
||||
public SetSubClusterPolicyConfigurationResponse newInstance() {
|
||||
public static SetSubClusterPolicyConfigurationResponse newInstance() {
|
||||
return Records.newRecord(SetSubClusterPolicyConfigurationResponse.class);
|
||||
}
|
||||
}
|
|
@ -29,8 +29,8 @@ import java.nio.ByteBuffer;
|
|||
|
||||
/**
|
||||
* {@link SubClusterPolicyConfiguration} is a class that represents a
|
||||
* configuration of a policy. It contains a policy type (resolve to a class
|
||||
* name) and its params as an opaque {@link ByteBuffer}.
|
||||
* configuration of a policy. For a single queue, it contains a policy type
|
||||
* (resolve to a class name) and its params as an opaque {@link ByteBuffer}.
|
||||
*
|
||||
* Note: by design the params are an opaque ByteBuffer, this allows for enough
|
||||
* flexibility to evolve the policies without impacting the protocols to/from
|
||||
|
@ -42,15 +42,34 @@ public abstract class SubClusterPolicyConfiguration {
|
|||
|
||||
@Private
|
||||
@Unstable
|
||||
public static SubClusterPolicyConfiguration newInstance(String policyType,
|
||||
ByteBuffer policyParams) {
|
||||
public static SubClusterPolicyConfiguration newInstance(String queue,
|
||||
String policyType, ByteBuffer policyParams) {
|
||||
SubClusterPolicyConfiguration policy =
|
||||
Records.newRecord(SubClusterPolicyConfiguration.class);
|
||||
policy.setQueue(queue);
|
||||
policy.setType(policyType);
|
||||
policy.setParams(policyParams);
|
||||
return policy;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the name of the queue for which we are configuring a policy.
|
||||
*
|
||||
* @return the name of the queue
|
||||
*/
|
||||
@Public
|
||||
@Unstable
|
||||
public abstract String getQueue();
|
||||
|
||||
/**
|
||||
* Sets the name of the queue for which we are configuring a policy.
|
||||
*
|
||||
* @param queueName the name of the queue
|
||||
*/
|
||||
@Private
|
||||
@Unstable
|
||||
public abstract void setQueue(String queueName);
|
||||
|
||||
/**
|
||||
* Get the type of the policy. This could be random, round-robin, load-based,
|
||||
* etc.
|
||||
|
|
|
@ -108,6 +108,10 @@ public class GetApplicationHomeSubClusterRequestPBImpl
|
|||
public ApplicationId getApplicationId() {
|
||||
GetApplicationHomeSubClusterRequestProtoOrBuilder p =
|
||||
viaProto ? proto : builder;
|
||||
if (applicationId != null) {
|
||||
return applicationId;
|
||||
}
|
||||
|
||||
if (!p.hasApplicationId()) {
|
||||
return null;
|
||||
}
|
||||
|
|
|
@ -106,23 +106,6 @@ public class SetSubClusterPolicyConfigurationRequestPBImpl
|
|||
return TextFormat.shortDebugString(getProto());
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getQueue() {
|
||||
SetSubClusterPolicyConfigurationRequestProtoOrBuilder p =
|
||||
viaProto ? proto : builder;
|
||||
return p.getQueue();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setQueue(String queueName) {
|
||||
maybeInitBuilder();
|
||||
if (queueName == null) {
|
||||
builder.clearQueue();
|
||||
return;
|
||||
}
|
||||
builder.setQueue(queueName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public SubClusterPolicyConfiguration getPolicyConfiguration() {
|
||||
SetSubClusterPolicyConfigurationRequestProtoOrBuilder p =
|
||||
|
|
|
@ -86,6 +86,23 @@ public class SubClusterPolicyConfigurationPBImpl
|
|||
return TextFormat.shortDebugString(getProto());
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getQueue() {
|
||||
SubClusterPolicyConfigurationProtoOrBuilder p = viaProto ? proto : builder;
|
||||
return p.getQueue();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setQueue(String queueName) {
|
||||
maybeInitBuilder();
|
||||
if (queueName == null) {
|
||||
builder.clearType();
|
||||
return;
|
||||
}
|
||||
builder.setQueue(queueName);
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getType() {
|
||||
SubClusterPolicyConfigurationProtoOrBuilder p = viaProto ? proto : builder;
|
||||
|
|
|
@ -136,8 +136,9 @@ message DeleteApplicationHomeSubClusterResponseProto {
|
|||
}
|
||||
|
||||
message SubClusterPolicyConfigurationProto {
|
||||
optional string type = 1;
|
||||
optional bytes params = 2;
|
||||
optional string queue = 1;
|
||||
optional string type = 2;
|
||||
optional bytes params = 3;
|
||||
}
|
||||
|
||||
message GetSubClusterPolicyConfigurationRequestProto {
|
||||
|
@ -149,8 +150,7 @@ message GetSubClusterPolicyConfigurationResponseProto {
|
|||
}
|
||||
|
||||
message SetSubClusterPolicyConfigurationRequestProto {
|
||||
optional string queue = 1;
|
||||
optional SubClusterPolicyConfigurationProto policy_configuration = 2;
|
||||
optional SubClusterPolicyConfigurationProto policy_configuration = 1;
|
||||
}
|
||||
|
||||
message SetSubClusterPolicyConfigurationResponseProto {
|
||||
|
|
|
@ -18,18 +18,39 @@
|
|||
package org.apache.hadoop.yarn.server.federation.store.impl;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.server.federation.store.FederationMembershipStateStore;
|
||||
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterResponse;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterRequest;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterResponse;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterRequest;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterResponse;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsRequest;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsResponse;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationRequest;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationResponse;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationRequest;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationResponse;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatRequest;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterResponse;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterRequest;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterResponse;
|
||||
import org.apache.hadoop.yarn.util.MonotonicClock;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
|
@ -42,20 +63,21 @@ import org.junit.Test;
|
|||
public abstract class FederationStateStoreBaseTest {
|
||||
|
||||
private static final MonotonicClock CLOCK = new MonotonicClock();
|
||||
private FederationStateStore stateStore = createStateStore();
|
||||
|
||||
private FederationMembershipStateStore stateStore;
|
||||
protected abstract FederationStateStore createStateStore();
|
||||
|
||||
@Before
|
||||
public void before() throws IOException {
|
||||
stateStore = getCleanStateStore();
|
||||
public void before() throws IOException, YarnException {
|
||||
stateStore.init(new Configuration());
|
||||
}
|
||||
|
||||
@After
|
||||
public void after() {
|
||||
stateStore = null;
|
||||
public void after() throws Exception {
|
||||
stateStore.close();
|
||||
}
|
||||
|
||||
protected abstract FederationMembershipStateStore getCleanStateStore();
|
||||
// Test FederationMembershipStateStore
|
||||
|
||||
@Test
|
||||
public void testRegisterSubCluster() throws Exception {
|
||||
|
@ -72,10 +94,7 @@ public abstract class FederationStateStoreBaseTest {
|
|||
@Test
|
||||
public void testDeregisterSubCluster() throws Exception {
|
||||
SubClusterId subClusterId = SubClusterId.newInstance("SC");
|
||||
SubClusterInfo subClusterInfo = createSubClusterInfo(subClusterId);
|
||||
|
||||
stateStore.registerSubCluster(
|
||||
SubClusterRegisterRequest.newInstance(subClusterInfo));
|
||||
registerSubCluster(subClusterId);
|
||||
|
||||
SubClusterDeregisterRequest deregisterRequest = SubClusterDeregisterRequest
|
||||
.newInstance(subClusterId, SubClusterState.SC_UNREGISTERED);
|
||||
|
@ -105,9 +124,7 @@ public abstract class FederationStateStoreBaseTest {
|
|||
|
||||
SubClusterId subClusterId = SubClusterId.newInstance("SC");
|
||||
SubClusterInfo subClusterInfo = createSubClusterInfo(subClusterId);
|
||||
|
||||
stateStore.registerSubCluster(
|
||||
SubClusterRegisterRequest.newInstance(subClusterInfo));
|
||||
registerSubCluster(subClusterId);
|
||||
|
||||
GetSubClusterInfoRequest request =
|
||||
GetSubClusterInfoRequest.newInstance(subClusterId);
|
||||
|
@ -167,10 +184,7 @@ public abstract class FederationStateStoreBaseTest {
|
|||
@Test
|
||||
public void testSubClusterHeartbeat() throws Exception {
|
||||
SubClusterId subClusterId = SubClusterId.newInstance("SC");
|
||||
SubClusterInfo subClusterInfo = createSubClusterInfo(subClusterId);
|
||||
|
||||
stateStore.registerSubCluster(
|
||||
SubClusterRegisterRequest.newInstance(subClusterInfo));
|
||||
registerSubCluster(subClusterId);
|
||||
|
||||
SubClusterHeartbeatRequest heartbeatRequest = SubClusterHeartbeatRequest
|
||||
.newInstance(subClusterId, SubClusterState.SC_RUNNING, "cabability");
|
||||
|
@ -196,6 +210,271 @@ public abstract class FederationStateStoreBaseTest {
|
|||
}
|
||||
}
|
||||
|
||||
// Test FederationApplicationHomeSubClusterStore
|
||||
|
||||
@Test
|
||||
public void testAddApplicationHomeSubClusterMap() throws Exception {
|
||||
ApplicationId appId = ApplicationId.newInstance(1, 1);
|
||||
SubClusterId subClusterId = SubClusterId.newInstance("SC");
|
||||
ApplicationHomeSubCluster ahsc =
|
||||
ApplicationHomeSubCluster.newInstance(appId, subClusterId);
|
||||
|
||||
AddApplicationHomeSubClusterRequest request =
|
||||
AddApplicationHomeSubClusterRequest.newInstance(ahsc);
|
||||
AddApplicationHomeSubClusterResponse response =
|
||||
stateStore.addApplicationHomeSubClusterMap(request);
|
||||
|
||||
Assert.assertNotNull(response);
|
||||
Assert.assertEquals(subClusterId, queryApplicationHomeSC(appId));
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAddApplicationHomeSubClusterMapAppAlreadyExists()
|
||||
throws Exception {
|
||||
ApplicationId appId = ApplicationId.newInstance(1, 1);
|
||||
SubClusterId subClusterId1 = SubClusterId.newInstance("SC1");
|
||||
addApplicationHomeSC(appId, subClusterId1);
|
||||
|
||||
SubClusterId subClusterId2 = SubClusterId.newInstance("SC2");
|
||||
ApplicationHomeSubCluster ahsc2 =
|
||||
ApplicationHomeSubCluster.newInstance(appId, subClusterId2);
|
||||
|
||||
try {
|
||||
stateStore.addApplicationHomeSubClusterMap(
|
||||
AddApplicationHomeSubClusterRequest.newInstance(ahsc2));
|
||||
Assert.fail();
|
||||
} catch (YarnException e) {
|
||||
Assert.assertTrue(e.getMessage()
|
||||
.startsWith("Application " + appId.toString() + " already exists"));
|
||||
}
|
||||
|
||||
Assert.assertEquals(subClusterId1, queryApplicationHomeSC(appId));
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDeleteApplicationHomeSubClusterMap() throws Exception {
|
||||
ApplicationId appId = ApplicationId.newInstance(1, 1);
|
||||
SubClusterId subClusterId = SubClusterId.newInstance("SC");
|
||||
addApplicationHomeSC(appId, subClusterId);
|
||||
|
||||
DeleteApplicationHomeSubClusterRequest delRequest =
|
||||
DeleteApplicationHomeSubClusterRequest.newInstance(appId);
|
||||
|
||||
DeleteApplicationHomeSubClusterResponse response =
|
||||
stateStore.deleteApplicationHomeSubClusterMap(delRequest);
|
||||
|
||||
Assert.assertNotNull(response);
|
||||
try {
|
||||
queryApplicationHomeSC(appId);
|
||||
Assert.fail();
|
||||
} catch (YarnException e) {
|
||||
Assert.assertTrue(e.getMessage()
|
||||
.startsWith("Application " + appId + " does not exist"));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDeleteApplicationHomeSubClusterMapUnknownApp()
|
||||
throws Exception {
|
||||
ApplicationId appId = ApplicationId.newInstance(1, 1);
|
||||
DeleteApplicationHomeSubClusterRequest delRequest =
|
||||
DeleteApplicationHomeSubClusterRequest.newInstance(appId);
|
||||
|
||||
try {
|
||||
stateStore.deleteApplicationHomeSubClusterMap(delRequest);
|
||||
Assert.fail();
|
||||
} catch (YarnException e) {
|
||||
Assert.assertTrue(e.getMessage()
|
||||
.startsWith("Application " + appId.toString() + " does not exist"));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetApplicationHomeSubClusterMap() throws Exception {
|
||||
ApplicationId appId = ApplicationId.newInstance(1, 1);
|
||||
SubClusterId subClusterId = SubClusterId.newInstance("SC");
|
||||
addApplicationHomeSC(appId, subClusterId);
|
||||
|
||||
GetApplicationHomeSubClusterRequest getRequest =
|
||||
GetApplicationHomeSubClusterRequest.newInstance(appId);
|
||||
|
||||
GetApplicationHomeSubClusterResponse result =
|
||||
stateStore.getApplicationHomeSubClusterMap(getRequest);
|
||||
|
||||
Assert.assertEquals(appId,
|
||||
result.getApplicationHomeSubCluster().getApplicationId());
|
||||
Assert.assertEquals(subClusterId,
|
||||
result.getApplicationHomeSubCluster().getHomeSubCluster());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetApplicationHomeSubClusterMapUnknownApp() throws Exception {
|
||||
ApplicationId appId = ApplicationId.newInstance(1, 1);
|
||||
GetApplicationHomeSubClusterRequest request =
|
||||
GetApplicationHomeSubClusterRequest.newInstance(appId);
|
||||
|
||||
try {
|
||||
stateStore.getApplicationHomeSubClusterMap(request);
|
||||
Assert.fail();
|
||||
} catch (YarnException e) {
|
||||
Assert.assertTrue(e.getMessage()
|
||||
.startsWith("Application " + appId.toString() + " does not exist"));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetApplicationsHomeSubClusterMap() throws Exception {
|
||||
ApplicationId appId1 = ApplicationId.newInstance(1, 1);
|
||||
SubClusterId subClusterId1 = SubClusterId.newInstance("SC1");
|
||||
ApplicationHomeSubCluster ahsc1 =
|
||||
ApplicationHomeSubCluster.newInstance(appId1, subClusterId1);
|
||||
|
||||
ApplicationId appId2 = ApplicationId.newInstance(1, 2);
|
||||
SubClusterId subClusterId2 = SubClusterId.newInstance("SC2");
|
||||
ApplicationHomeSubCluster ahsc2 =
|
||||
ApplicationHomeSubCluster.newInstance(appId2, subClusterId2);
|
||||
|
||||
addApplicationHomeSC(appId1, subClusterId1);
|
||||
addApplicationHomeSC(appId2, subClusterId2);
|
||||
|
||||
GetApplicationsHomeSubClusterRequest getRequest =
|
||||
GetApplicationsHomeSubClusterRequest.newInstance();
|
||||
|
||||
GetApplicationsHomeSubClusterResponse result =
|
||||
stateStore.getApplicationsHomeSubClusterMap(getRequest);
|
||||
|
||||
Assert.assertEquals(2, result.getAppsHomeSubClusters().size());
|
||||
Assert.assertTrue(result.getAppsHomeSubClusters().contains(ahsc1));
|
||||
Assert.assertTrue(result.getAppsHomeSubClusters().contains(ahsc2));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUpdateApplicationHomeSubClusterMap() throws Exception {
|
||||
ApplicationId appId = ApplicationId.newInstance(1, 1);
|
||||
SubClusterId subClusterId1 = SubClusterId.newInstance("SC1");
|
||||
addApplicationHomeSC(appId, subClusterId1);
|
||||
|
||||
SubClusterId subClusterId2 = SubClusterId.newInstance("SC2");
|
||||
ApplicationHomeSubCluster ahscUpdate =
|
||||
ApplicationHomeSubCluster.newInstance(appId, subClusterId2);
|
||||
|
||||
UpdateApplicationHomeSubClusterRequest updateRequest =
|
||||
UpdateApplicationHomeSubClusterRequest.newInstance(ahscUpdate);
|
||||
|
||||
UpdateApplicationHomeSubClusterResponse response =
|
||||
stateStore.updateApplicationHomeSubClusterMap(updateRequest);
|
||||
|
||||
Assert.assertNotNull(response);
|
||||
|
||||
Assert.assertEquals(subClusterId2, queryApplicationHomeSC(appId));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUpdateApplicationHomeSubClusterMapUnknownApp()
|
||||
throws Exception {
|
||||
ApplicationId appId = ApplicationId.newInstance(1, 1);
|
||||
SubClusterId subClusterId1 = SubClusterId.newInstance("SC1");
|
||||
ApplicationHomeSubCluster ahsc =
|
||||
ApplicationHomeSubCluster.newInstance(appId, subClusterId1);
|
||||
|
||||
UpdateApplicationHomeSubClusterRequest updateRequest =
|
||||
UpdateApplicationHomeSubClusterRequest.newInstance(ahsc);
|
||||
|
||||
try {
|
||||
stateStore.updateApplicationHomeSubClusterMap((updateRequest));
|
||||
Assert.fail();
|
||||
} catch (YarnException e) {
|
||||
Assert.assertTrue(e.getMessage()
|
||||
.startsWith("Application " + appId.toString() + " does not exist"));
|
||||
}
|
||||
}
|
||||
|
||||
// Test FederationPolicyStore
|
||||
|
||||
@Test
|
||||
public void testSetPolicyConfiguration() throws Exception {
|
||||
SetSubClusterPolicyConfigurationRequest request =
|
||||
SetSubClusterPolicyConfigurationRequest
|
||||
.newInstance(createSCPolicyConf("Queue", "PolicyType"));
|
||||
|
||||
SetSubClusterPolicyConfigurationResponse result =
|
||||
stateStore.setPolicyConfiguration(request);
|
||||
|
||||
Assert.assertNotNull(result);
|
||||
Assert.assertEquals(createSCPolicyConf("Queue", "PolicyType"),
|
||||
queryPolicy("Queue"));
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSetPolicyConfigurationUpdateExisting() throws Exception {
|
||||
setPolicyConf("Queue", "PolicyType1");
|
||||
|
||||
SetSubClusterPolicyConfigurationRequest request2 =
|
||||
SetSubClusterPolicyConfigurationRequest
|
||||
.newInstance(createSCPolicyConf("Queue", "PolicyType2"));
|
||||
SetSubClusterPolicyConfigurationResponse result =
|
||||
stateStore.setPolicyConfiguration(request2);
|
||||
|
||||
Assert.assertNotNull(result);
|
||||
Assert.assertEquals(createSCPolicyConf("Queue", "PolicyType2"),
|
||||
queryPolicy("Queue"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetPolicyConfiguration() throws Exception {
|
||||
setPolicyConf("Queue", "PolicyType");
|
||||
|
||||
GetSubClusterPolicyConfigurationRequest getRequest =
|
||||
GetSubClusterPolicyConfigurationRequest.newInstance("Queue");
|
||||
GetSubClusterPolicyConfigurationResponse result =
|
||||
stateStore.getPolicyConfiguration(getRequest);
|
||||
|
||||
Assert.assertNotNull(result);
|
||||
Assert.assertEquals(createSCPolicyConf("Queue", "PolicyType"),
|
||||
result.getPolicyConfiguration());
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetPolicyConfigurationUnknownQueue() throws Exception {
|
||||
|
||||
GetSubClusterPolicyConfigurationRequest request =
|
||||
GetSubClusterPolicyConfigurationRequest.newInstance("Queue");
|
||||
try {
|
||||
stateStore.getPolicyConfiguration(request);
|
||||
Assert.fail();
|
||||
} catch (YarnException e) {
|
||||
Assert.assertTrue(
|
||||
e.getMessage().startsWith("Policy for queue Queue does not exist"));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetPoliciesConfigurations() throws Exception {
|
||||
setPolicyConf("Queue1", "PolicyType1");
|
||||
setPolicyConf("Queue2", "PolicyType2");
|
||||
|
||||
GetSubClusterPoliciesConfigurationsResponse response =
|
||||
stateStore.getPoliciesConfigurations(
|
||||
GetSubClusterPoliciesConfigurationsRequest.newInstance());
|
||||
|
||||
Assert.assertNotNull(response);
|
||||
Assert.assertNotNull(response.getPoliciesConfigs());
|
||||
|
||||
Assert.assertEquals(2, response.getPoliciesConfigs().size());
|
||||
|
||||
Assert.assertTrue(response.getPoliciesConfigs()
|
||||
.contains(createSCPolicyConf("Queue1", "PolicyType1")));
|
||||
Assert.assertTrue(response.getPoliciesConfigs()
|
||||
.contains(createSCPolicyConf("Queue2", "PolicyType2")));
|
||||
}
|
||||
|
||||
// Convenience methods
|
||||
|
||||
private SubClusterInfo createSubClusterInfo(SubClusterId subClusterId) {
|
||||
|
||||
String amRMAddress = "1.2.3.4:1";
|
||||
|
@ -208,6 +487,37 @@ public abstract class FederationStateStoreBaseTest {
|
|||
CLOCK.getTime(), "cabability");
|
||||
}
|
||||
|
||||
private SubClusterPolicyConfiguration createSCPolicyConf(String queueName,
|
||||
String policyType) {
|
||||
return SubClusterPolicyConfiguration.newInstance(queueName, policyType,
|
||||
ByteBuffer.allocate(1));
|
||||
}
|
||||
|
||||
private void addApplicationHomeSC(ApplicationId appId,
|
||||
SubClusterId subClusterId) throws YarnException {
|
||||
ApplicationHomeSubCluster ahsc =
|
||||
ApplicationHomeSubCluster.newInstance(appId, subClusterId);
|
||||
AddApplicationHomeSubClusterRequest request =
|
||||
AddApplicationHomeSubClusterRequest.newInstance(ahsc);
|
||||
stateStore.addApplicationHomeSubClusterMap(request);
|
||||
}
|
||||
|
||||
private void setPolicyConf(String queue, String policyType)
|
||||
throws YarnException {
|
||||
SetSubClusterPolicyConfigurationRequest request =
|
||||
SetSubClusterPolicyConfigurationRequest
|
||||
.newInstance(createSCPolicyConf(queue, policyType));
|
||||
stateStore.setPolicyConfiguration(request);
|
||||
}
|
||||
|
||||
private void registerSubCluster(SubClusterId subClusterId)
|
||||
throws YarnException {
|
||||
|
||||
SubClusterInfo subClusterInfo = createSubClusterInfo(subClusterId);
|
||||
stateStore.registerSubCluster(
|
||||
SubClusterRegisterRequest.newInstance(subClusterInfo));
|
||||
}
|
||||
|
||||
private SubClusterInfo querySubClusterInfo(SubClusterId subClusterId)
|
||||
throws YarnException {
|
||||
GetSubClusterInfoRequest request =
|
||||
|
@ -215,4 +525,25 @@ public abstract class FederationStateStoreBaseTest {
|
|||
return stateStore.getSubCluster(request).getSubClusterInfo();
|
||||
}
|
||||
|
||||
private SubClusterId queryApplicationHomeSC(ApplicationId appId)
|
||||
throws YarnException {
|
||||
GetApplicationHomeSubClusterRequest request =
|
||||
GetApplicationHomeSubClusterRequest.newInstance(appId);
|
||||
|
||||
GetApplicationHomeSubClusterResponse response =
|
||||
stateStore.getApplicationHomeSubClusterMap(request);
|
||||
|
||||
return response.getApplicationHomeSubCluster().getHomeSubCluster();
|
||||
}
|
||||
|
||||
private SubClusterPolicyConfiguration queryPolicy(String queue)
|
||||
throws YarnException {
|
||||
GetSubClusterPolicyConfigurationRequest request =
|
||||
GetSubClusterPolicyConfigurationRequest.newInstance(queue);
|
||||
|
||||
GetSubClusterPolicyConfigurationResponse result =
|
||||
stateStore.getPolicyConfiguration(request);
|
||||
return result.getPolicyConfiguration();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
|
||||
package org.apache.hadoop.yarn.server.federation.store.impl;
|
||||
|
||||
import org.apache.hadoop.yarn.server.federation.store.FederationMembershipStateStore;
|
||||
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
|
||||
|
||||
/**
|
||||
* Unit tests for MemoryFederationStateStore.
|
||||
|
@ -26,7 +26,7 @@ public class TestMemoryFederationStateStore
|
|||
extends FederationStateStoreBaseTest {
|
||||
|
||||
@Override
|
||||
protected FederationMembershipStateStore getCleanStateStore() {
|
||||
protected FederationStateStore createStateStore() {
|
||||
return new MemoryFederationStateStore();
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue