From a716459cdf63e4fcf90e41c8e30fb8d5eddacfbb Mon Sep 17 00:00:00 2001 From: slfan1989 <55643692+slfan1989@users.noreply.github.com> Date: Wed, 26 Apr 2023 05:09:45 +0800 Subject: [PATCH] YARN-11437. [Federation] SQLFederationStateStore Support Version. (#5589) --- .../MySQL/FederationStateStoreStoredProcs.sql | 17 +++ .../MySQL/FederationStateStoreTables.sql | 6 + .../MySQL/dropStoreProcedures.sql | 6 +- .../FederationStateStore/MySQL/dropTables.sql | 2 + .../FederationStateStoreStoredProcs.sql | 71 ++++++++++++ .../SQLServer/FederationStateStoreTables.sql | 30 +++++ .../SQLServer/dropStoreProcedures.sql | 8 ++ .../SQLServer/dropTables.sql | 4 + .../store/FederationStateStore.java | 27 ++++- .../impl/MemoryFederationStateStore.java | 22 ---- .../store/impl/SQLFederationStateStore.java | 108 ++++++++++++++++-- .../impl/ZookeeperFederationStateStore.java | 24 ---- .../impl/FederationStateStoreBaseTest.java | 23 +++- .../impl/HSQLDBFederationStateStore.java | 28 +++++ .../impl/TestMemoryFederationStateStore.java | 35 ------ .../TestZookeeperFederationStateStore.java | 44 ------- 16 files changed, 315 insertions(+), 140 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/MySQL/FederationStateStoreStoredProcs.sql b/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/MySQL/FederationStateStoreStoredProcs.sql index 2edda86cd3c..3ae33850e54 100644 --- a/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/MySQL/FederationStateStoreStoredProcs.sql +++ b/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/MySQL/FederationStateStoreStoredProcs.sql @@ -290,4 +290,21 @@ BEGIN SELECT ROW_COUNT() INTO rowCount_OUT; END // +CREATE PROCEDURE sp_storeVersion( + IN fedVersion_IN varbinary(1024), IN versionComment_IN varchar(255), OUT rowCount_OUT int) +BEGIN + DELETE FROM versions; + INSERT INTO versions (fedVersion, versionComment) + VALUES (fedVersion_IN, versionComment_IN); + SELECT ROW_COUNT() INTO rowCount_OUT; +END // + +CREATE PROCEDURE sp_getVersion( + OUT fedVersion_OUT varbinary(1024), OUT versionComment_OUT varchar(255)) +BEGIN + SELECT fedVersion, versionComment INTO fedVersion_OUT, versionComment_OUT + FROM versions + LIMIT 1; +END // + DELIMITER ; diff --git a/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/MySQL/FederationStateStoreTables.sql b/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/MySQL/FederationStateStoreTables.sql index 9e864eb5ed7..fa823ff9a03 100644 --- a/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/MySQL/FederationStateStoreTables.sql +++ b/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/MySQL/FederationStateStoreTables.sql @@ -73,4 +73,10 @@ CREATE TABLE sequenceTable ( sequenceName varchar(255) NOT NULL, nextVal bigint(20) NOT NULL, CONSTRAINT pk_sequenceName PRIMARY KEY (sequenceName) +); + +CREATE TABLE versions ( + fedVersion varbinary(1024) NOT NULL, + versionComment VARCHAR(255), + CONSTRAINT pk_fedVersion PRIMARY KEY (fedVersion) ); \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/MySQL/dropStoreProcedures.sql b/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/MySQL/dropStoreProcedures.sql index e7a16c81deb..62760f9b190 100644 --- a/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/MySQL/dropStoreProcedures.sql +++ b/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/MySQL/dropStoreProcedures.sql @@ -68,4 +68,8 @@ DROP PROCEDURE sp_getDelegationToken; DROP PROCEDURE sp_updateDelegationToken; -DROP PROCEDURE sp_deleteDelegationToken; \ No newline at end of file +DROP PROCEDURE sp_deleteDelegationToken; + +DROP PROCEDURE sp_storeVersion; + +DROP PROCEDURE sp_getVersion; \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/MySQL/dropTables.sql b/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/MySQL/dropTables.sql index 38d00d3cb10..0c496eb50c1 100644 --- a/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/MySQL/dropTables.sql +++ b/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/MySQL/dropTables.sql @@ -33,3 +33,5 @@ DROP TABLE masterKeys; DROP TABLE delegationTokens; DROP TABLE sequenceTable; + +DROP TABLE versions; diff --git a/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/SQLServer/FederationStateStoreStoredProcs.sql b/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/SQLServer/FederationStateStoreStoredProcs.sql index d82b53e73aa..c0ffaf709ac 100644 --- a/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/SQLServer/FederationStateStoreStoredProcs.sql +++ b/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/SQLServer/FederationStateStoreStoredProcs.sql @@ -968,4 +968,75 @@ AS BEGIN ) WITH log END CATCH END; +GO + +IF OBJECT_ID ( '[sp_storeVersion]', 'P' ) IS NOT NULL + DROP PROCEDURE [sp_storeVersion]; +GO + +CREATE PROCEDURE [dbo].[sp_storeVersion] + @fedVersion_IN VARBINARY(1024), + @versionComment_IN VARCHAR(255), + @rowCount_OUT BIGINT OUTPUT +AS BEGIN + DECLARE @errorMessage nvarchar(4000) + + BEGIN TRY + BEGIN TRAN + + DELETE FROM [dbo].[versions]; + INSERT INTO [dbo].[versions] ( + [fedVersion], + [versionComment]) + VALUES ( + @fedVersion_IN, + @versionComment_IN); + SELECT @rowCount_OUT = @@ROWCOUNT; + COMMIT TRAN + END TRY + + BEGIN CATCH + ROLLBACK TRAN + + SET @errorMessage = dbo.func_FormatErrorMessage(ERROR_MESSAGE(), ERROR_LINE()) + + /* raise error and terminate the execution */ + RAISERROR(@errorMessage, --- Error Message + 1, -- Severity + -1 -- State + ) WITH log + END CATCH +END; +GO + +IF OBJECT_ID ( '[sp_getVersion]', 'P' ) IS NOT NULL + DROP PROCEDURE [sp_getVersion]; +GO + +CREATE PROCEDURE [dbo].[sp_getVersion] + @fedVersion_OUT VARCHAR(1024) OUTPUT, + @versionComment_OUT VARCHAR(255) OUTPUT +AS BEGIN + DECLARE @errorMessage nvarchar(4000) + + BEGIN TRY + + SELECT @fedVersion_OUT = [fedVersion], + @versionComment_OUT = [versionComment] + FROM [dbo].[versions] + LIMIT 1; + + END TRY + + BEGIN CATCH + + SET @errorMessage = dbo.func_FormatErrorMessage(ERROR_MESSAGE(), ERROR_LINE()) + + /* raise error and terminate the execution */ + RAISERROR(@errorMessage, --- Error Message + 1, -- Severity + -1 -- State + ) WITH log + END CATCH +END; GO \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/SQLServer/FederationStateStoreTables.sql b/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/SQLServer/FederationStateStoreTables.sql index 4d187d4459a..da0d2ed6468 100644 --- a/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/SQLServer/FederationStateStoreTables.sql +++ b/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/SQLServer/FederationStateStoreTables.sql @@ -309,4 +309,34 @@ IF NOT EXISTS ( SELECT * FROM [FederationStateStore].sys.tables ELSE PRINT 'Table sequenceTable exists, no operation required...' GO +GO + +IF NOT EXISTS ( SELECT * FROM [FederationStateStore].sys.tables + WHERE name = 'versions' + AND schema_id = SCHEMA_ID('dbo')) + BEGIN + PRINT 'Table versions does not exist, create it...' + + SET ANSI_NULLS ON + + SET QUOTED_IDENTIFIER ON + + SET ANSI_PADDING ON + + CREATE TABLE [dbo].[versions]( + fedVersion VARBINARY(1024) NOT NULL, + versionComment VARCHAR(255) NOT NULL + CONSTRAINT [pk_fedVersion] PRIMARY KEY + ( + [fedVersion] + ) + ) + + SET ANSI_PADDING OFF + + PRINT 'Table versions created.' + END +ELSE + PRINT 'Table versions exists, no operation required...' + GO GO \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/SQLServer/dropStoreProcedures.sql b/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/SQLServer/dropStoreProcedures.sql index a6e35df1af4..53573cb1982 100644 --- a/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/SQLServer/dropStoreProcedures.sql +++ b/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/SQLServer/dropStoreProcedures.sql @@ -119,4 +119,12 @@ GO IF OBJECT_ID ('[sp_deleteDelegationToken]', 'P') IS NOT NULL DROP PROCEDURE [sp_deleteDelegationToken]; +GO + +IF OBJECT_ID ('[sp_storeVersion]', 'P') IS NOT NULL + DROP PROCEDURE [sp_storeVersion]; +GO + +IF OBJECT_ID ('[sp_getVersion]', 'P') IS NOT NULL + DROP PROCEDURE [sp_getVersion]; GO \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/SQLServer/dropTables.sql b/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/SQLServer/dropTables.sql index 9a2188cbe19..592f0998163 100644 --- a/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/SQLServer/dropTables.sql +++ b/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/SQLServer/dropTables.sql @@ -52,3 +52,7 @@ GO IF OBJECT_ID ( '[sequenceTable]', 'U' ) IS NOT NULL DROP TABLE [sequenceTable]; GO + +IF OBJECT_ID ( '[versions]', 'U' ) IS NOT NULL + DROP TABLE [versions]; +GO diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/FederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/FederationStateStore.java index e65bdf42e2e..3949767d811 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/FederationStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/FederationStateStore.java @@ -20,7 +20,10 @@ package org.apache.hadoop.yarn.server.federation.store; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateVersionIncompatibleException; import org.apache.hadoop.yarn.server.records.Version; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * FederationStore extends the three interfaces used to coordinate the state of @@ -34,6 +37,9 @@ public interface FederationStateStore extends FederationPolicyStore, FederationReservationHomeSubClusterStore, FederationDelegationTokenStateStore { + public static final Logger LOG = + LoggerFactory.getLogger(FederationStateStore.class); + /** * Initialize the FederationStore. * @@ -76,5 +82,24 @@ public interface FederationStateStore extends * * @throws Exception an exception occurred in check version. */ - void checkVersion() throws Exception; + default void checkVersion() throws Exception { + Version loadedVersion = loadVersion(); + LOG.info("Loaded Router State Version Info = {}.", loadedVersion); + Version currentVersion = getCurrentVersion(); + if (loadedVersion != null && loadedVersion.equals(currentVersion)) { + return; + } + // if there is no version info, treat it as CURRENT_VERSION_INFO; + if (loadedVersion == null) { + loadedVersion = currentVersion; + } + if (loadedVersion.isCompatibleTo(currentVersion)) { + LOG.info("Storing Router State Version Info {}.", currentVersion); + storeVersion(); + } else { + throw new FederationStateVersionIncompatibleException( + "Expecting Router state version " + currentVersion + + ", but loading version " + loadedVersion); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java index 4aad86fbb16..1cf07075fd7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java @@ -390,28 +390,6 @@ public class MemoryFederationStateStore implements FederationStateStore { version = ((VersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray(); } - @Override - public void checkVersion() throws Exception { - Version loadedVersion = loadVersion(); - LOG.info("Loaded Router State Version Info = {}.", loadedVersion); - Version currentVersion = getCurrentVersion(); - if (loadedVersion != null && loadedVersion.equals(currentVersion)) { - return; - } - // if there is no version info, treat it as CURRENT_VERSION_INFO; - if (loadedVersion == null) { - loadedVersion = currentVersion; - } - if (loadedVersion.isCompatibleTo(currentVersion)) { - LOG.info("Storing Router State Version Info {}.", currentVersion); - storeVersion(); - } else { - throw new FederationStateVersionIncompatibleException( - "Expecting Router state version " + currentVersion + - ", but loading version " + loadedVersion); - } - } - @Override public AddReservationHomeSubClusterResponse addReservationHomeSubCluster( AddReservationHomeSubClusterRequest request) throws YarnException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java index e1fc3f2a47e..5f9fe3075e6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java @@ -30,7 +30,6 @@ import java.util.Calendar; import java.util.List; import java.util.TimeZone; -import org.apache.commons.lang3.NotImplementedException; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.token.delegation.DelegationKey; @@ -38,6 +37,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.proto.YarnServerCommonProtos; import org.apache.hadoop.yarn.security.client.YARNDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreInvalidInputException; @@ -102,6 +102,7 @@ import org.apache.hadoop.yarn.server.federation.store.sql.RouterMasterKeyHandler import org.apache.hadoop.yarn.server.federation.store.sql.RouterStoreTokenHandler; import org.apache.hadoop.yarn.server.federation.store.sql.RowCountHandler; import org.apache.hadoop.yarn.server.records.Version; +import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.MonotonicClock; import org.slf4j.Logger; @@ -202,6 +203,12 @@ public class SQLFederationStateStore implements FederationStateStore { protected static final String CALL_SP_DELETE_DELEGATIONTOKEN = "{call sp_deleteDelegationToken(?, ?)}"; + private static final String CALL_SP_STORE_VERSION = + "{call sp_storeVersion(?, ?, ?)}"; + + private static final String CALL_SP_LOAD_VERSION = + "{call sp_getVersion(?, ?)}"; + private Calendar utcCalendar = Calendar.getInstance(TimeZone.getTimeZone("UTC")); @@ -218,6 +225,8 @@ public class SQLFederationStateStore implements FederationStateStore { private Connection conn = null; private int maxAppsInStateStore; + protected static final Version CURRENT_VERSION_INFO = Version.newInstance(1, 1); + @Override public void init(Configuration conf) throws YarnException { driverClass = @@ -993,22 +1002,107 @@ public class SQLFederationStateStore implements FederationStateStore { @Override public Version getCurrentVersion() { - throw new NotImplementedException("Code is not implemented"); + return CURRENT_VERSION_INFO; } @Override public Version loadVersion() throws Exception { - throw new NotImplementedException("Code is not implemented"); + return getVersion(); + } + + /** + * Query the Version information of Federation from the database. + * + * @return Version Info. + * @throws Exception Exception Information. + */ + public Version getVersion() throws Exception { + CallableStatement callableStatement = null; + Version version = null; + try { + callableStatement = getCallableStatement(CALL_SP_LOAD_VERSION); + + // Set the parameters for the stored procedure + callableStatement.registerOutParameter("fedVersion_OUT", java.sql.Types.VARBINARY); + callableStatement.registerOutParameter("versionComment_OUT", VARCHAR); + + // Execute the query + long startTime = clock.getTime(); + callableStatement.executeUpdate(); + long stopTime = clock.getTime(); + + // Parsing version information. + String versionComment = callableStatement.getString("versionComment_OUT"); + byte[] fedVersion = callableStatement.getBytes("fedVersion_OUT"); + if (versionComment != null && fedVersion != null) { + version = new VersionPBImpl(YarnServerCommonProtos.VersionProto.parseFrom(fedVersion)); + FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - startTime); + } + } catch (SQLException e) { + FederationStateStoreClientMetrics.failedStateStoreCall(); + FederationStateStoreUtils.logAndThrowRetriableException(e, LOG, + "Unable to select the version."); + } finally { + // Return to the pool the CallableStatement + FederationStateStoreUtils.returnToPool(LOG, callableStatement); + } + return version; } @Override public void storeVersion() throws Exception { - throw new NotImplementedException("Code is not implemented"); + byte[] fedVersion = ((VersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray(); + String versionComment = CURRENT_VERSION_INFO.toString(); + storeVersion(fedVersion, versionComment); } - @Override - public void checkVersion() throws Exception { - throw new NotImplementedException("Code is not implemented"); + /** + * Store the Federation Version in the database. + * + * @param fedVersion Federation Version. + * @param versionComment Federation Version Comment, + * We use the result of Version toString as version Comment. + * @throws YarnException indicates exceptions from yarn servers. + */ + public void storeVersion(byte[] fedVersion, String versionComment) throws YarnException { + CallableStatement callableStatement = null; + + try { + callableStatement = getCallableStatement(CALL_SP_STORE_VERSION); + + // Set the parameters for the stored procedure + callableStatement.setBytes("fedVersion_IN", fedVersion); + callableStatement.setString("versionComment_IN", versionComment); + callableStatement.registerOutParameter("rowCount_OUT", INTEGER); + + // Execute the query + long startTime = clock.getTime(); + callableStatement.executeUpdate(); + long stopTime = clock.getTime(); + + // Check the ROWCOUNT value, if it is equal to 0 it means the call + // did not add a new version into FederationStateStore + int rowCount = callableStatement.getInt("rowCount_OUT"); + if (rowCount == 0) { + FederationStateStoreUtils.logAndThrowStoreException(LOG, + "The version %s was not insert into the StateStore.", versionComment); + } + // Check the ROWCOUNT value, if it is different from 1 it means the call + // had a wrong behavior. Maybe the database is not set correctly. + if (rowCount != 1) { + FederationStateStoreUtils.logAndThrowStoreException(LOG, + "Wrong behavior during insert the version %s.", versionComment); + } + FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - startTime); + LOG.info("Insert into the state store the version : {}.", versionComment); + } catch (SQLException e) { + FederationStateStoreClientMetrics.failedStateStoreCall(); + FederationStateStoreUtils.logAndThrowRetriableException(e, LOG, + "Unable to insert the newly version : %s.", versionComment); + } finally { + // Return to the pool the CallableStatement + FederationStateStoreUtils.returnToPool(LOG, callableStatement); + } } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/ZookeeperFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/ZookeeperFederationStateStore.java index 9a49a6d3a17..ca0a87112bd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/ZookeeperFederationStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/ZookeeperFederationStateStore.java @@ -673,30 +673,6 @@ public class ZookeeperFederationStateStore implements FederationStateStore { put(versionNode, data, isUpdate); } - @Override - public void checkVersion() throws Exception { - Version loadedVersion = loadVersion(); - LOG.info("Loaded Router State Version Info = {}.", loadedVersion); - Version currentVersion = getCurrentVersion(); - if (loadedVersion != null && loadedVersion.equals(currentVersion)) { - return; - } - - // if there is no version info, treat it as CURRENT_VERSION_INFO; - if (loadedVersion == null) { - loadedVersion = currentVersion; - } - - if (loadedVersion.isCompatibleTo(currentVersion)) { - LOG.info("Storing Router State Version Info {}.", currentVersion); - storeVersion(); - } else { - throw new FederationStateVersionIncompatibleException( - "Expecting Router state version " + currentVersion + - ", but loading version " + loadedVersion); - } - } - /** * Get the subcluster for an application. * @param appId Application identifier. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java index b93763583d7..08de34552a1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java @@ -81,12 +81,15 @@ import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKeyRes import org.apache.hadoop.yarn.server.federation.store.records.RouterStoreToken; import org.apache.hadoop.yarn.server.federation.store.records.RouterRMTokenRequest; import org.apache.hadoop.yarn.server.federation.store.records.RouterRMTokenResponse; +import org.apache.hadoop.yarn.server.records.Version; import org.apache.hadoop.yarn.util.MonotonicClock; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import static org.junit.Assert.assertEquals; + /** * Base class for FederationMembershipStateStore implementations. */ @@ -1045,22 +1048,30 @@ public abstract class FederationStateStoreBaseTest { checkRouterStoreToken(identifier, getStoreTokenResp); } - @Test(expected = NotImplementedException.class) + @Test public void testGetCurrentVersion() { - stateStore.getCurrentVersion(); + Version version = stateStore.getCurrentVersion(); + assertEquals(1, version.getMajorVersion()); + assertEquals(1, version.getMinorVersion()); } - @Test(expected = NotImplementedException.class) + @Test public void testStoreVersion() throws Exception { stateStore.storeVersion(); + Version version = stateStore.getCurrentVersion(); + assertEquals(1, version.getMajorVersion()); + assertEquals(1, version.getMinorVersion()); } - @Test(expected = NotImplementedException.class) + @Test public void testLoadVersion() throws Exception { - stateStore.loadVersion(); + stateStore.storeVersion(); + Version version = stateStore.loadVersion(); + assertEquals(1, version.getMajorVersion()); + assertEquals(1, version.getMinorVersion()); } - @Test(expected = NotImplementedException.class) + @Test public void testCheckVersion() throws Exception { stateStore.checkVersion(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/HSQLDBFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/HSQLDBFederationStateStore.java index 73b65feb48e..c3f6f6ffecd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/HSQLDBFederationStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/HSQLDBFederationStateStore.java @@ -95,6 +95,11 @@ public class HSQLDBFederationStateStore extends SQLFederationStateStore { + " nextVal bigint NOT NULL," + " CONSTRAINT pk_sequenceName PRIMARY KEY (sequenceName))"; + private static final String TABLE_VERSIONS = + "CREATE TABLE versions (" + + " fedVersion varbinary(1024) NOT NULL," + + " versionComment VARCHAR(255)," + + " CONSTRAINT pk_fedVersion PRIMARY KEY (fedVersion))"; private static final String SP_REGISTERSUBCLUSTER = "CREATE PROCEDURE sp_registerSubCluster(" + " IN subClusterId_IN varchar(256)," @@ -431,6 +436,25 @@ public class HSQLDBFederationStateStore extends SQLFederationStateStore { + " GET DIAGNOSTICS rowCount_OUT = ROW_COUNT; " + " END "; + protected static final String SP_STORE_VERSION = + "CREATE PROCEDURE sp_storeVersion(" + + " IN fedVersion_IN varbinary(1024), IN versionComment_IN varchar(256), " + + " OUT rowCount_OUT int)" + + " MODIFIES SQL DATA BEGIN ATOMIC" + + " DELETE FROM versions;" + + " INSERT INTO versions (fedVersion, versionComment)" + + " VALUES (fedVersion_IN, versionComment_IN);" + + " GET DIAGNOSTICS rowCount_OUT = ROW_COUNT; " + + " END "; + + protected static final String SP_GET_VERSION = + "CREATE PROCEDURE sp_getVersion(" + + " OUT fedVersion_OUT varbinary(1024), OUT versionComment_OUT varchar(256))" + + " MODIFIES SQL DATA BEGIN ATOMIC" + + " SELECT fedVersion, versionComment INTO fedVersion_OUT, versionComment_OUT" + + " FROM versions; " + + " END "; + private List tables = new ArrayList<>(); @Override @@ -449,6 +473,7 @@ public class HSQLDBFederationStateStore extends SQLFederationStateStore { conn.prepareStatement(TABLE_MASTERKEYS).execute(); conn.prepareStatement(TABLE_DELEGATIONTOKENS).execute(); conn.prepareStatement(TABLE_SEQUENCETABLE).execute(); + conn.prepareStatement(TABLE_VERSIONS).execute(); conn.prepareStatement(SP_REGISTERSUBCLUSTER).execute(); conn.prepareStatement(SP_DEREGISTERSUBCLUSTER).execute(); @@ -481,6 +506,9 @@ public class HSQLDBFederationStateStore extends SQLFederationStateStore { conn.prepareStatement(SP_UPDATE_DELEGATIONTOKEN).execute(); conn.prepareStatement(SP_DELETE_DELEGATIONTOKEN).execute(); + conn.prepareStatement(SP_STORE_VERSION).execute(); + conn.prepareStatement(SP_GET_VERSION).execute(); + LOG.info("Database Init: Complete"); } catch (Exception e) { LOG.error("ERROR: failed to initialize HSQLDB {}.", e.getMessage()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestMemoryFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestMemoryFederationStateStore.java index bb7e130b5e7..3db5c56fce7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestMemoryFederationStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestMemoryFederationStateStore.java @@ -90,39 +90,4 @@ public class TestMemoryFederationStateStore extends FederationStateStoreBaseTest assertTrue(tokenIdentifier instanceof RMDelegationTokenIdentifier); assertEquals(identifier, tokenIdentifier); } - - @Test - public void testGetCurrentVersion() { - MemoryFederationStateStore memoryStateStore = - MemoryFederationStateStore.class.cast(this.getStateStore()); - Version version = memoryStateStore.getCurrentVersion(); - assertEquals(version.getMajorVersion(), 1); - assertEquals(version.getMinorVersion(), 1); - } - - @Test - public void testStoreVersion() throws Exception { - MemoryFederationStateStore memoryStateStore = - MemoryFederationStateStore.class.cast(this.getStateStore()); - memoryStateStore.storeVersion(); - Version version = memoryStateStore.getCurrentVersion(); - assertEquals(version.getMajorVersion(), 1); - assertEquals(version.getMinorVersion(), 1); - } - - @Test - public void testLoadVersion() throws Exception { - MemoryFederationStateStore memoryStateStore = - MemoryFederationStateStore.class.cast(this.getStateStore()); - Version version = memoryStateStore.loadVersion(); - assertEquals(version.getMajorVersion(), 1); - assertEquals(version.getMinorVersion(), 1); - } - - @Test - public void testCheckVersion() throws Exception { - MemoryFederationStateStore memoryStateStore = - MemoryFederationStateStore.class.cast(this.getStateStore()); - memoryStateStore.checkVersion(); - } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestZookeeperFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestZookeeperFederationStateStore.java index 739f3b6543c..5542453b205 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestZookeeperFederationStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestZookeeperFederationStateStore.java @@ -278,48 +278,4 @@ public class TestZookeeperFederationStateStore extends FederationStateStoreBaseT assertNotNull(zkRouterStoreToken); assertEquals(token, zkRouterStoreToken); } - - @Test - public void testGetCurrentVersion() { - ZookeeperFederationStateStore zkFederationStateStore = - ZookeeperFederationStateStore.class.cast(this.getStateStore()); - Version version = zkFederationStateStore.getCurrentVersion(); - assertEquals(1, version.getMajorVersion()); - assertEquals(1, version.getMinorVersion()); - } - - @Test - public void testStoreVersion() throws Exception { - ZookeeperFederationStateStore zkFederationStateStore = - ZookeeperFederationStateStore.class.cast(this.getStateStore()); - zkFederationStateStore.storeVersion(); - Version version = zkFederationStateStore.loadVersion(); - assertEquals(1, version.getMajorVersion()); - assertEquals(1, version.getMinorVersion()); - } - - @Test - public void testLoadVersion() throws Exception { - ZookeeperFederationStateStore zkFederationStateStore = - ZookeeperFederationStateStore.class.cast(this.getStateStore()); - // We don't store version, loadversion directly will get a null value. - Version version = zkFederationStateStore.loadVersion(); - assertNull(version); - - // After storing the version information, we will get the accurate version information. - zkFederationStateStore.storeVersion(); - Version version1 = zkFederationStateStore.loadVersion(); - assertEquals(1, version1.getMajorVersion()); - assertEquals(1, version1.getMinorVersion()); - } - - @Test - public void testCheckVersion() throws Exception { - ZookeeperFederationStateStore zkFederationStateStore = - ZookeeperFederationStateStore.class.cast(this.getStateStore()); - zkFederationStateStore.checkVersion(); - Version version = zkFederationStateStore.loadVersion(); - assertEquals(1, version.getMajorVersion()); - assertEquals(1, version.getMinorVersion()); - } } \ No newline at end of file