From 635521db4c6bd8362717b23cf5cb3ab3ce1d2e4f Mon Sep 17 00:00:00 2001 From: slfan1989 <55643692+slfan1989@users.noreply.github.com> Date: Wed, 19 Apr 2023 00:05:52 +0800 Subject: [PATCH] YARN-11438. [Federation] ZookeeperFederationStateStore Support Version. (#5537) --- .../impl/ZookeeperFederationStateStore.java | 46 +++++++++++++++++-- .../TestZookeeperFederationStateStore.java | 46 +++++++++++++++++++ 2 files changed, 87 insertions(+), 5 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/ZookeeperFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/ZookeeperFederationStateStore.java index 536faa31dca..9a49a6d3a17 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/ZookeeperFederationStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/ZookeeperFederationStateStore.java @@ -30,7 +30,6 @@ import java.util.TimeZone; import java.util.Comparator; import java.util.stream.Collectors; -import org.apache.commons.lang3.NotImplementedException; import org.apache.curator.framework.recipes.shared.SharedCount; import org.apache.curator.framework.recipes.shared.VersionedValue; import org.apache.hadoop.classification.VisibleForTesting; @@ -43,8 +42,10 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.SubClusterIdProto; import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.SubClusterInfoProto; import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.SubClusterPolicyConfigurationProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto; import org.apache.hadoop.yarn.security.client.YARNDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; +import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateVersionIncompatibleException; import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest; import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterResponse; import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster; @@ -104,6 +105,7 @@ import org.apache.hadoop.yarn.server.federation.store.utils.FederationReservatio import org.apache.hadoop.yarn.server.federation.store.utils.FederationRouterRMTokenInputValidator; import org.apache.hadoop.yarn.server.records.Version; import org.apache.hadoop.yarn.api.records.ReservationId; +import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.SystemClock; @@ -154,6 +156,8 @@ public class ZookeeperFederationStateStore implements FederationStateStore { private final static String ROOT_ZNODE_NAME_POLICY = "policies"; private final static String ROOT_ZNODE_NAME_RESERVATION = "reservation"; + protected static final String ROOT_ZNODE_NAME_VERSION = "version"; + /** Store Delegation Token Node. */ private final static String ROUTER_RM_DT_SECRET_MANAGER_ROOT = "router_rm_dt_secret_manager_root"; private static final String ROUTER_RM_DT_MASTER_KEYS_ROOT_ZNODE_NAME = @@ -184,6 +188,7 @@ public class ZookeeperFederationStateStore implements FederationStateStore { private String membershipZNode; private String policiesZNode; private String reservationsZNode; + private String versionNode; private int maxAppsInStateStore; /** Directory to store the delegation token data. **/ @@ -195,6 +200,8 @@ public class ZookeeperFederationStateStore implements FederationStateStore { private volatile Clock clock = SystemClock.getInstance(); + protected static final Version CURRENT_VERSION_INFO = Version.newInstance(1, 1); + @VisibleForTesting private ZKFederationStateStoreOpDurations opDurations = ZKFederationStateStoreOpDurations.getInstance(); @@ -223,6 +230,7 @@ public class ZookeeperFederationStateStore implements FederationStateStore { appsZNode = getNodePath(baseZNode, ROOT_ZNODE_NAME_APPLICATION); policiesZNode = getNodePath(baseZNode, ROOT_ZNODE_NAME_POLICY); reservationsZNode = getNodePath(baseZNode, ROOT_ZNODE_NAME_RESERVATION); + versionNode = getNodePath(baseZNode, ROOT_ZNODE_NAME_VERSION); // delegation token znodes routerRMDTSecretManagerRoot = getNodePath(baseZNode, ROUTER_RM_DT_SECRET_MANAGER_ROOT); @@ -245,6 +253,7 @@ public class ZookeeperFederationStateStore implements FederationStateStore { zkManager.createRootDirRecursively(routerRMDTSecretManagerRoot, zkAcl); zkManager.createRootDirRecursively(routerRMDTMasterKeysRootPath, zkAcl); zkManager.createRootDirRecursively(routerRMDelegationTokensRootPath, zkAcl); + zkManager.createRootDirRecursively(versionNode, zkAcl); } catch (Exception e) { String errMsg = "Cannot create base directories: " + e.getMessage(); FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); @@ -643,22 +652,49 @@ public class ZookeeperFederationStateStore implements FederationStateStore { @Override public Version getCurrentVersion() { - throw new NotImplementedException("Code is not implemented"); + return CURRENT_VERSION_INFO; } @Override public Version loadVersion() throws Exception { - throw new NotImplementedException("Code is not implemented"); + if (exists(versionNode)) { + byte[] data = get(versionNode); + if (data != null) { + return new VersionPBImpl(VersionProto.parseFrom(data)); + } + } + return null; } @Override public void storeVersion() throws Exception { - throw new NotImplementedException("Code is not implemented"); + byte[] data = ((VersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray(); + boolean isUpdate = exists(versionNode); + put(versionNode, data, isUpdate); } @Override public void checkVersion() throws Exception { - throw new NotImplementedException("Code is not implemented"); + Version loadedVersion = loadVersion(); + LOG.info("Loaded Router State Version Info = {}.", loadedVersion); + Version currentVersion = getCurrentVersion(); + if (loadedVersion != null && loadedVersion.equals(currentVersion)) { + return; + } + + // if there is no version info, treat it as CURRENT_VERSION_INFO; + if (loadedVersion == null) { + loadedVersion = currentVersion; + } + + if (loadedVersion.isCompatibleTo(currentVersion)) { + LOG.info("Storing Router State Version Info {}.", currentVersion); + storeVersion(); + } else { + throw new FederationStateVersionIncompatibleException( + "Expecting Router state version " + currentVersion + + ", but loading version " + loadedVersion); + } } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestZookeeperFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestZookeeperFederationStateStore.java index ba22a1e1894..739f3b6543c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestZookeeperFederationStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestZookeeperFederationStateStore.java @@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKey; import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKeyRequest; import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKeyResponse; import org.apache.hadoop.yarn.server.federation.store.records.RouterStoreToken; +import org.apache.hadoop.yarn.server.records.Version; import org.apache.hadoop.yarn.util.Records; import org.junit.After; import org.junit.Before; @@ -52,6 +53,7 @@ import static org.apache.hadoop.util.curator.ZKCuratorManager.getNodePath; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; /** * Unit tests for ZookeeperFederationStateStore. @@ -276,4 +278,48 @@ public class TestZookeeperFederationStateStore extends FederationStateStoreBaseT assertNotNull(zkRouterStoreToken); assertEquals(token, zkRouterStoreToken); } + + @Test + public void testGetCurrentVersion() { + ZookeeperFederationStateStore zkFederationStateStore = + ZookeeperFederationStateStore.class.cast(this.getStateStore()); + Version version = zkFederationStateStore.getCurrentVersion(); + assertEquals(1, version.getMajorVersion()); + assertEquals(1, version.getMinorVersion()); + } + + @Test + public void testStoreVersion() throws Exception { + ZookeeperFederationStateStore zkFederationStateStore = + ZookeeperFederationStateStore.class.cast(this.getStateStore()); + zkFederationStateStore.storeVersion(); + Version version = zkFederationStateStore.loadVersion(); + assertEquals(1, version.getMajorVersion()); + assertEquals(1, version.getMinorVersion()); + } + + @Test + public void testLoadVersion() throws Exception { + ZookeeperFederationStateStore zkFederationStateStore = + ZookeeperFederationStateStore.class.cast(this.getStateStore()); + // We don't store version, loadversion directly will get a null value. + Version version = zkFederationStateStore.loadVersion(); + assertNull(version); + + // After storing the version information, we will get the accurate version information. + zkFederationStateStore.storeVersion(); + Version version1 = zkFederationStateStore.loadVersion(); + assertEquals(1, version1.getMajorVersion()); + assertEquals(1, version1.getMinorVersion()); + } + + @Test + public void testCheckVersion() throws Exception { + ZookeeperFederationStateStore zkFederationStateStore = + ZookeeperFederationStateStore.class.cast(this.getStateStore()); + zkFederationStateStore.checkVersion(); + Version version = zkFederationStateStore.loadVersion(); + assertEquals(1, version.getMajorVersion()); + assertEquals(1, version.getMinorVersion()); + } } \ No newline at end of file