From e0c3a44396f075c6f402b000a6b286e1d9178eb8 Mon Sep 17 00:00:00 2001 From: Subru Krishnan Date: Mon, 8 Aug 2016 14:53:38 -0700 Subject: [PATCH] YARN-5408. Compose Federation membership/application/policy APIs into an uber FederationStateStore API. (Ellen Hui via Subru). (cherry picked from commit 268389d0211a757439289bea84d502bed6f6ad52) --- ...erationApplicationHomeSubClusterStore.java | 18 ++---- .../store/FederationMembershipStateStore.java | 14 +--- .../store/FederationStateStore.java | 64 +++++++++++++++++++ .../impl/MemoryFederationStateStore.java | 19 ------ .../impl/FederationStateStoreBaseTest.java | 57 ++++++++--------- .../impl/TestMemoryFederationStateStore.java | 21 +----- 6 files changed, 99 insertions(+), 94 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/FederationStateStore.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/FederationApplicationHomeSubClusterStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/FederationApplicationHomeSubClusterStore.java index 217ee2e8e47..22bb88af7da 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/FederationApplicationHomeSubClusterStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/FederationApplicationHomeSubClusterStore.java @@ -30,7 +30,6 @@ import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHom import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterResponse; import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterRequest; import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterResponse; -import org.apache.hadoop.yarn.server.records.Version; /** * FederationApplicationHomeSubClusterStore maintains the state of all @@ -49,15 +48,6 @@ import org.apache.hadoop.yarn.server.records.Version; @Unstable public interface FederationApplicationHomeSubClusterStore { - /** - * Get the {@link Version} of the underlying federation application state - * store. - * - * @return the {@link Version} of the underlying federation application state - * store - */ - Version getApplicationStateStoreVersion(); - /** * Register the home {@code SubClusterId} of the newly submitted * {@code ApplicationId}. Currently response is empty if the operation was @@ -91,16 +81,16 @@ public interface FederationApplicationHomeSubClusterStore { * {@code ApplicationId}. * * @param request contains the application queried - * @return {@code ApplicationHomeSubCluster} containing the application's - * home subcluster + * @return {@code ApplicationHomeSubCluster} containing the application's home + * subcluster * @throws YarnException if the request is invalid/fails */ GetApplicationHomeSubClusterResponse getApplicationHomeSubClusterMap( GetApplicationHomeSubClusterRequest request) throws YarnException; /** - * Get the {@code ApplicationHomeSubCluster} list representing the mapping - * of all submitted applications to it's home sub-cluster. + * Get the {@code ApplicationHomeSubCluster} list representing the mapping of + * all submitted applications to it's home sub-cluster. * * @param request empty representing all applications * @return the mapping of all submitted application to it's home sub-cluster diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/FederationMembershipStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/FederationMembershipStateStore.java index 378eadc49a5..7778d5f3c1d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/FederationMembershipStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/FederationMembershipStateStore.java @@ -30,7 +30,6 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbea import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatResponse; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterResponse; -import org.apache.hadoop.yarn.server.records.Version; /** * FederationMembershipStateStore maintains the state of all @@ -41,15 +40,6 @@ import org.apache.hadoop.yarn.server.records.Version; @Unstable public interface FederationMembershipStateStore { - /** - * Get the {@link Version} of the underlying federation membership state - * store. - * - * @return the {@link Version} of the underlying federation membership state - * store - */ - Version getMembershipStateStoreVersion(); - /** * Register a subcluster by publishing capabilities as represented by * {@code SubClusterInfo} to indicate participation in federation. This is @@ -80,7 +70,7 @@ public interface FederationMembershipStateStore { */ SubClusterDeregisterResponse deregisterSubCluster( SubClusterDeregisterRequest subClusterDeregisterRequest) - throws YarnException; + throws YarnException; /** * Periodic heartbeat from a ResourceManager participating in @@ -96,7 +86,7 @@ public interface FederationMembershipStateStore { */ SubClusterHeartbeatResponse subClusterHeartbeat( SubClusterHeartbeatRequest subClusterHeartbeatRequest) - throws YarnException; + throws YarnException; /** * Get the membership information of subcluster as identified by diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/FederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/FederationStateStore.java new file mode 100644 index 00000000000..9397e9c2404 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/FederationStateStore.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.federation.store; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.records.Version; + +/** + * FederationStore extends the three interfaces used to coordinate the state of + * a federated cluster: {@link FederationApplicationHomeSubClusterStore}, + * {@link FederationMembershipStateStore}, and {@link FederationPolicyStore}. + * + */ +public interface FederationStateStore + extends FederationApplicationHomeSubClusterStore, + FederationMembershipStateStore, FederationPolicyStore { + + /** + * Initialize the FederationStore. + * + * @param conf the cluster configuration + * @throws YarnException if initialization fails + */ + void init(Configuration conf) throws YarnException; + + /** + * Perform any cleanup operations of the StateStore. + * + * @throws Exception if cleanup fails + */ + void close() throws Exception; + + /** + * Get the {@link Version} of the underlying federation state store client. + * + * @return the {@link Version} of the underlying federation store client + */ + Version getCurrentVersion(); + + /** + * Load the version information from the federation state store. + * + * @return the {@link Version} of the federation state store + */ + Version loadVersion(); + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java index 7fdc4a91ea8..cea4ac250ca 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java @@ -36,11 +36,8 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbea import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatResponse; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterResponse; -import org.apache.hadoop.yarn.server.records.Version; import org.apache.hadoop.yarn.util.MonotonicClock; -import com.google.common.annotations.VisibleForTesting; - /** * In-memory implementation of FederationMembershipStateStore. */ @@ -51,11 +48,6 @@ public class MemoryFederationStateStore new ConcurrentHashMap(); private final MonotonicClock clock = new MonotonicClock(); - @Override - public Version getMembershipStateStoreVersion() { - return null; - } - @Override public SubClusterRegisterResponse registerSubCluster( SubClusterRegisterRequest request) throws YarnException { @@ -122,17 +114,6 @@ public class MemoryFederationStateStore } } return GetSubClustersInfoResponse.newInstance(result); - - } - - @VisibleForTesting - public Map getMembershipTable() { - return membership; - } - - @VisibleForTesting - public void clearMembershipTable() { - membership.clear(); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java index 7eb1c863292..c76a485c2b7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java @@ -19,25 +19,21 @@ package org.apache.hadoop.yarn.server.federation.store.impl; import java.io.IOException; -import org.junit.Before; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Map; - -import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; -import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; -import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState; -import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.federation.store.FederationMembershipStateStore; import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest; import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatRequest; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterResponse; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState; import org.apache.hadoop.yarn.util.MonotonicClock; +import org.junit.After; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; /** @@ -45,17 +41,22 @@ import org.junit.Test; */ public abstract class FederationStateStoreBaseTest { - static final Logger LOG = - LoggerFactory.getLogger(FederationStateStoreBaseTest.class); private static final MonotonicClock CLOCK = new MonotonicClock(); - private FederationMembershipStateStore stateStore = getStateStore(); + private FederationMembershipStateStore stateStore; @Before public void before() throws IOException { - clearMembership(); + stateStore = getCleanStateStore(); } + @After + public void after() { + stateStore = null; + } + + protected abstract FederationMembershipStateStore getCleanStateStore(); + @Test public void testRegisterSubCluster() throws Exception { SubClusterId subClusterId = SubClusterId.newInstance("SC"); @@ -63,11 +64,9 @@ public abstract class FederationStateStoreBaseTest { SubClusterRegisterResponse result = stateStore.registerSubCluster( SubClusterRegisterRequest.newInstance(subClusterInfo)); - Map membership = getMembership(); - Assert.assertNotNull(membership.get(subClusterId)); Assert.assertNotNull(result); - Assert.assertEquals(subClusterInfo, membership.get(subClusterId)); + Assert.assertEquals(subClusterInfo, querySubClusterInfo(subClusterId)); } @Test @@ -83,10 +82,8 @@ public abstract class FederationStateStoreBaseTest { stateStore.deregisterSubCluster(deregisterRequest); - Map membership = getMembership(); - Assert.assertNotNull(membership.get(subClusterId)); - Assert.assertEquals(membership.get(subClusterId).getState(), - SubClusterState.SC_UNREGISTERED); + Assert.assertEquals(SubClusterState.SC_UNREGISTERED, + querySubClusterInfo(subClusterId).getState()); } @Test @@ -179,10 +176,9 @@ public abstract class FederationStateStoreBaseTest { .newInstance(subClusterId, SubClusterState.SC_RUNNING, "cabability"); stateStore.subClusterHeartbeat(heartbeatRequest); - Map membership = getMembership(); - Assert.assertEquals(membership.get(subClusterId).getState(), - SubClusterState.SC_RUNNING); - Assert.assertNotNull(membership.get(subClusterId).getLastHeartBeat()); + Assert.assertEquals(SubClusterState.SC_RUNNING, + querySubClusterInfo(subClusterId).getState()); + Assert.assertNotNull(querySubClusterInfo(subClusterId).getLastHeartBeat()); } @Test @@ -212,10 +208,11 @@ public abstract class FederationStateStoreBaseTest { CLOCK.getTime(), "cabability"); } - protected abstract Map getMembership(); - - protected abstract void clearMembership(); - - protected abstract FederationMembershipStateStore getStateStore(); + private SubClusterInfo querySubClusterInfo(SubClusterId subClusterId) + throws YarnException { + GetSubClusterInfoRequest request = + GetSubClusterInfoRequest.newInstance(subClusterId); + return stateStore.getSubCluster(request).getSubClusterInfo(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestMemoryFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestMemoryFederationStateStore.java index b74ffbd7c89..9396edac9ad 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestMemoryFederationStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestMemoryFederationStateStore.java @@ -17,11 +17,7 @@ package org.apache.hadoop.yarn.server.federation.store.impl; -import java.util.Map; - import org.apache.hadoop.yarn.server.federation.store.FederationMembershipStateStore; -import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; -import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; /** * Unit tests for MemoryFederationStateStore. @@ -29,21 +25,8 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; public class TestMemoryFederationStateStore extends FederationStateStoreBaseTest { - private static final MemoryFederationStateStore STATESTORE = - new MemoryFederationStateStore(); - @Override - protected Map getMembership() { - return STATESTORE.getMembershipTable(); - } - - @Override - protected void clearMembership() { - STATESTORE.clearMembershipTable(); - } - - @Override - protected FederationMembershipStateStore getStateStore() { - return STATESTORE; + protected FederationMembershipStateStore getCleanStateStore() { + return new MemoryFederationStateStore(); } }