{
+ TResult invoke(T input) throws Exception;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/package-info.java
new file mode 100644
index 00000000000..39a46ec3e4c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/package-info.java
@@ -0,0 +1,17 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.yarn.server.federation.utils;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreTestUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreTestUtil.java
new file mode 100644
index 00000000000..c179521ea6a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreTestUtil.java
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.utils;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
+import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
+import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
+import org.apache.hadoop.yarn.util.MonotonicClock;
+
+/**
+ * Utility class for FederationStateStore unit tests.
+ */
+public class FederationStateStoreTestUtil {
+
+ private static final MonotonicClock CLOCK = new MonotonicClock();
+
+ public static final String SC_PREFIX = "SC-";
+ public static final String Q_PREFIX = "queue-";
+ public static final String POLICY_PREFIX = "policy-";
+
+ private FederationStateStore stateStore;
+
+ public FederationStateStoreTestUtil(FederationStateStore stateStore) {
+ this.stateStore = stateStore;
+ }
+
+ private SubClusterInfo createSubClusterInfo(SubClusterId subClusterId) {
+
+ String amRMAddress = "1.2.3.4:1";
+ String clientRMAddress = "1.2.3.4:2";
+ String rmAdminAddress = "1.2.3.4:3";
+ String webAppAddress = "1.2.3.4:4";
+
+ return SubClusterInfo.newInstance(subClusterId, amRMAddress,
+ clientRMAddress, rmAdminAddress, webAppAddress, SubClusterState.SC_NEW,
+ CLOCK.getTime(), "capability");
+ }
+
+ private void registerSubCluster(SubClusterId subClusterId)
+ throws YarnException {
+
+ SubClusterInfo subClusterInfo = createSubClusterInfo(subClusterId);
+ stateStore.registerSubCluster(
+ SubClusterRegisterRequest.newInstance(subClusterInfo));
+ }
+
+ public void registerSubClusters(int numSubClusters) throws YarnException {
+
+ for (int i = 0; i < numSubClusters; i++) {
+ registerSubCluster(SubClusterId.newInstance(SC_PREFIX + i));
+ }
+ }
+
+ private void addApplicationHomeSC(ApplicationId appId,
+ SubClusterId subClusterId) throws YarnException {
+ ApplicationHomeSubCluster ahsc =
+ ApplicationHomeSubCluster.newInstance(appId, subClusterId);
+ AddApplicationHomeSubClusterRequest request =
+ AddApplicationHomeSubClusterRequest.newInstance(ahsc);
+ stateStore.addApplicationHomeSubCluster(request);
+ }
+
+ public void addAppsHomeSC(long clusterTs, int numApps) throws YarnException {
+ for (int i = 0; i < numApps; i++) {
+ addApplicationHomeSC(ApplicationId.newInstance(clusterTs, i),
+ SubClusterId.newInstance(SC_PREFIX + i));
+ }
+ }
+
+ private SubClusterPolicyConfiguration createSCPolicyConf(String queueName,
+ String policyType) {
+ return SubClusterPolicyConfiguration.newInstance(queueName, policyType,
+ ByteBuffer.allocate(1));
+ }
+
+ private void setPolicyConf(String queue, String policyType)
+ throws YarnException {
+ SetSubClusterPolicyConfigurationRequest request =
+ SetSubClusterPolicyConfigurationRequest
+ .newInstance(createSCPolicyConf(queue, policyType));
+ stateStore.setPolicyConfiguration(request);
+ }
+
+ public void addPolicyConfigs(int numQueues) throws YarnException {
+
+ for (int i = 0; i < numQueues; i++) {
+ setPolicyConf(Q_PREFIX + i, POLICY_PREFIX + i);
+ }
+ }
+
+ public SubClusterInfo querySubClusterInfo(SubClusterId subClusterId)
+ throws YarnException {
+ GetSubClusterInfoRequest request =
+ GetSubClusterInfoRequest.newInstance(subClusterId);
+ return stateStore.getSubCluster(request).getSubClusterInfo();
+ }
+
+ public SubClusterId queryApplicationHomeSC(ApplicationId appId)
+ throws YarnException {
+ GetApplicationHomeSubClusterRequest request =
+ GetApplicationHomeSubClusterRequest.newInstance(appId);
+
+ GetApplicationHomeSubClusterResponse response =
+ stateStore.getApplicationHomeSubCluster(request);
+
+ return response.getApplicationHomeSubCluster().getHomeSubCluster();
+ }
+
+ public SubClusterPolicyConfiguration queryPolicyConfiguration(String queue)
+ throws YarnException {
+ GetSubClusterPolicyConfigurationRequest request =
+ GetSubClusterPolicyConfigurationRequest.newInstance(queue);
+
+ GetSubClusterPolicyConfigurationResponse result =
+ stateStore.getPolicyConfiguration(request);
+ return result.getPolicyConfiguration();
+ }
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationStateStoreFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationStateStoreFacade.java
new file mode 100644
index 00000000000..53f4f8442ec
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationStateStoreFacade.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.utils;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
+import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+/**
+ * Unit tests for FederationStateStoreFacade.
+ */
+@RunWith(Parameterized.class)
+public class TestFederationStateStoreFacade {
+
+ @Parameters
+ public static Collection getParameters() {
+ return Arrays
+ .asList(new Boolean[][] {{Boolean.FALSE }, {Boolean.TRUE } });
+ }
+
+ private final long clusterTs = System.currentTimeMillis();
+ private final int numSubClusters = 3;
+ private final int numApps = 5;
+ private final int numQueues = 2;
+
+ private Configuration conf;
+ private FederationStateStore stateStore;
+ private FederationStateStoreTestUtil stateStoreTestUtil;
+ private FederationStateStoreFacade facade =
+ FederationStateStoreFacade.getInstance();
+
+ public TestFederationStateStoreFacade(Boolean isCachingEnabled) {
+ conf = new Configuration();
+ if (!(isCachingEnabled.booleanValue())) {
+ conf.setInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, 0);
+ }
+ }
+
+ @Before
+ public void setUp() throws IOException, YarnException {
+ stateStore = new MemoryFederationStateStore();
+ stateStore.init(conf);
+ facade.reinitialize(stateStore, conf);
+ // hydrate the store
+ stateStoreTestUtil = new FederationStateStoreTestUtil(stateStore);
+ stateStoreTestUtil.registerSubClusters(numSubClusters);
+ stateStoreTestUtil.addAppsHomeSC(clusterTs, numApps);
+ stateStoreTestUtil.addPolicyConfigs(numQueues);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ stateStore.close();
+ stateStore = null;
+ }
+
+ @Test
+ public void testGetSubCluster() throws YarnException {
+ for (int i = 0; i < numSubClusters; i++) {
+ SubClusterId subClusterId =
+ SubClusterId.newInstance(FederationStateStoreTestUtil.SC_PREFIX + i);
+ Assert.assertEquals(stateStoreTestUtil.querySubClusterInfo(subClusterId),
+ facade.getSubCluster(subClusterId));
+ }
+ }
+
+ @Test
+ public void testGetSubClusterFlushCache() throws YarnException {
+ for (int i = 0; i < numSubClusters; i++) {
+ SubClusterId subClusterId =
+ SubClusterId.newInstance(FederationStateStoreTestUtil.SC_PREFIX + i);
+ Assert.assertEquals(stateStoreTestUtil.querySubClusterInfo(subClusterId),
+ facade.getSubCluster(subClusterId, true));
+ }
+ }
+
+ @Test
+ public void testGetSubClusters() throws YarnException {
+ Map subClusters =
+ facade.getSubClusters(false);
+ for (SubClusterId subClusterId : subClusters.keySet()) {
+ Assert.assertEquals(stateStoreTestUtil.querySubClusterInfo(subClusterId),
+ subClusters.get(subClusterId));
+ }
+ }
+
+ @Test
+ public void testGetPolicyConfiguration() throws YarnException {
+ for (int i = 0; i < numQueues; i++) {
+ String queue = FederationStateStoreTestUtil.Q_PREFIX + i;
+ Assert.assertEquals(stateStoreTestUtil.queryPolicyConfiguration(queue),
+ facade.getPolicyConfiguration(queue));
+ }
+ }
+
+ @Test
+ public void testGetPoliciesConfigurations() throws YarnException {
+ Map queuePolicies =
+ facade.getPoliciesConfigurations();
+ for (String queue : queuePolicies.keySet()) {
+ Assert.assertEquals(stateStoreTestUtil.queryPolicyConfiguration(queue),
+ queuePolicies.get(queue));
+ }
+ }
+
+ @Test
+ public void testGetHomeSubClusterForApp() throws YarnException {
+ for (int i = 0; i < numApps; i++) {
+ ApplicationId appId = ApplicationId.newInstance(clusterTs, i);
+ Assert.assertEquals(stateStoreTestUtil.queryApplicationHomeSC(appId),
+ facade.getApplicationHomeSubCluster(appId));
+ }
+ }
+
+}