YARN-11437. [Federation] SQLFederationStateStore Support Version. (#5589)

This commit is contained in:
slfan1989 2023-04-26 05:09:45 +08:00 committed by GitHub
parent 5af0845076
commit a716459cdf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 315 additions and 140 deletions

View File

@ -290,4 +290,21 @@ BEGIN
SELECT ROW_COUNT() INTO rowCount_OUT;
END //
CREATE PROCEDURE sp_storeVersion(
IN fedVersion_IN varbinary(1024), IN versionComment_IN varchar(255), OUT rowCount_OUT int)
BEGIN
DELETE FROM versions;
INSERT INTO versions (fedVersion, versionComment)
VALUES (fedVersion_IN, versionComment_IN);
SELECT ROW_COUNT() INTO rowCount_OUT;
END //
CREATE PROCEDURE sp_getVersion(
OUT fedVersion_OUT varbinary(1024), OUT versionComment_OUT varchar(255))
BEGIN
SELECT fedVersion, versionComment INTO fedVersion_OUT, versionComment_OUT
FROM versions
LIMIT 1;
END //
DELIMITER ;

View File

@ -73,4 +73,10 @@ CREATE TABLE sequenceTable (
sequenceName varchar(255) NOT NULL,
nextVal bigint(20) NOT NULL,
CONSTRAINT pk_sequenceName PRIMARY KEY (sequenceName)
);
CREATE TABLE versions (
fedVersion varbinary(1024) NOT NULL,
versionComment VARCHAR(255),
CONSTRAINT pk_fedVersion PRIMARY KEY (fedVersion)
);

View File

@ -68,4 +68,8 @@ DROP PROCEDURE sp_getDelegationToken;
DROP PROCEDURE sp_updateDelegationToken;
DROP PROCEDURE sp_deleteDelegationToken;
DROP PROCEDURE sp_deleteDelegationToken;
DROP PROCEDURE sp_storeVersion;
DROP PROCEDURE sp_getVersion;

View File

@ -33,3 +33,5 @@ DROP TABLE masterKeys;
DROP TABLE delegationTokens;
DROP TABLE sequenceTable;
DROP TABLE versions;

View File

@ -968,4 +968,75 @@ AS BEGIN
) WITH log
END CATCH
END;
GO
IF OBJECT_ID ( '[sp_storeVersion]', 'P' ) IS NOT NULL
DROP PROCEDURE [sp_storeVersion];
GO
CREATE PROCEDURE [dbo].[sp_storeVersion]
@fedVersion_IN VARBINARY(1024),
@versionComment_IN VARCHAR(255),
@rowCount_OUT BIGINT OUTPUT
AS BEGIN
DECLARE @errorMessage nvarchar(4000)
BEGIN TRY
BEGIN TRAN
DELETE FROM [dbo].[versions];
INSERT INTO [dbo].[versions] (
[fedVersion],
[versionComment])
VALUES (
@fedVersion_IN,
@versionComment_IN);
SELECT @rowCount_OUT = @@ROWCOUNT;
COMMIT TRAN
END TRY
BEGIN CATCH
ROLLBACK TRAN
SET @errorMessage = dbo.func_FormatErrorMessage(ERROR_MESSAGE(), ERROR_LINE())
/* raise error and terminate the execution */
RAISERROR(@errorMessage, --- Error Message
1, -- Severity
-1 -- State
) WITH log
END CATCH
END;
GO
IF OBJECT_ID ( '[sp_getVersion]', 'P' ) IS NOT NULL
DROP PROCEDURE [sp_getVersion];
GO
CREATE PROCEDURE [dbo].[sp_getVersion]
@fedVersion_OUT VARCHAR(1024) OUTPUT,
@versionComment_OUT VARCHAR(255) OUTPUT
AS BEGIN
DECLARE @errorMessage nvarchar(4000)
BEGIN TRY
SELECT @fedVersion_OUT = [fedVersion],
@versionComment_OUT = [versionComment]
FROM [dbo].[versions]
LIMIT 1;
END TRY
BEGIN CATCH
SET @errorMessage = dbo.func_FormatErrorMessage(ERROR_MESSAGE(), ERROR_LINE())
/* raise error and terminate the execution */
RAISERROR(@errorMessage, --- Error Message
1, -- Severity
-1 -- State
) WITH log
END CATCH
END;
GO

View File

@ -309,4 +309,34 @@ IF NOT EXISTS ( SELECT * FROM [FederationStateStore].sys.tables
ELSE
PRINT 'Table sequenceTable exists, no operation required...'
GO
GO
IF NOT EXISTS ( SELECT * FROM [FederationStateStore].sys.tables
WHERE name = 'versions'
AND schema_id = SCHEMA_ID('dbo'))
BEGIN
PRINT 'Table versions does not exist, create it...'
SET ANSI_NULLS ON
SET QUOTED_IDENTIFIER ON
SET ANSI_PADDING ON
CREATE TABLE [dbo].[versions](
fedVersion VARBINARY(1024) NOT NULL,
versionComment VARCHAR(255) NOT NULL
CONSTRAINT [pk_fedVersion] PRIMARY KEY
(
[fedVersion]
)
)
SET ANSI_PADDING OFF
PRINT 'Table versions created.'
END
ELSE
PRINT 'Table versions exists, no operation required...'
GO
GO

View File

@ -119,4 +119,12 @@ GO
IF OBJECT_ID ('[sp_deleteDelegationToken]', 'P') IS NOT NULL
DROP PROCEDURE [sp_deleteDelegationToken];
GO
IF OBJECT_ID ('[sp_storeVersion]', 'P') IS NOT NULL
DROP PROCEDURE [sp_storeVersion];
GO
IF OBJECT_ID ('[sp_getVersion]', 'P') IS NOT NULL
DROP PROCEDURE [sp_getVersion];
GO

View File

@ -52,3 +52,7 @@ GO
IF OBJECT_ID ( '[sequenceTable]', 'U' ) IS NOT NULL
DROP TABLE [sequenceTable];
GO
IF OBJECT_ID ( '[versions]', 'U' ) IS NOT NULL
DROP TABLE [versions];
GO

View File

@ -20,7 +20,10 @@ package org.apache.hadoop.yarn.server.federation.store;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateVersionIncompatibleException;
import org.apache.hadoop.yarn.server.records.Version;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* FederationStore extends the three interfaces used to coordinate the state of
@ -34,6 +37,9 @@ public interface FederationStateStore extends
FederationPolicyStore, FederationReservationHomeSubClusterStore,
FederationDelegationTokenStateStore {
public static final Logger LOG =
LoggerFactory.getLogger(FederationStateStore.class);
/**
* Initialize the FederationStore.
*
@ -76,5 +82,24 @@ public interface FederationStateStore extends
*
* @throws Exception an exception occurred in check version.
*/
void checkVersion() throws Exception;
default void checkVersion() throws Exception {
Version loadedVersion = loadVersion();
LOG.info("Loaded Router State Version Info = {}.", loadedVersion);
Version currentVersion = getCurrentVersion();
if (loadedVersion != null && loadedVersion.equals(currentVersion)) {
return;
}
// if there is no version info, treat it as CURRENT_VERSION_INFO;
if (loadedVersion == null) {
loadedVersion = currentVersion;
}
if (loadedVersion.isCompatibleTo(currentVersion)) {
LOG.info("Storing Router State Version Info {}.", currentVersion);
storeVersion();
} else {
throw new FederationStateVersionIncompatibleException(
"Expecting Router state version " + currentVersion +
", but loading version " + loadedVersion);
}
}
}

View File

@ -390,28 +390,6 @@ public class MemoryFederationStateStore implements FederationStateStore {
version = ((VersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray();
}
@Override
public void checkVersion() throws Exception {
Version loadedVersion = loadVersion();
LOG.info("Loaded Router State Version Info = {}.", loadedVersion);
Version currentVersion = getCurrentVersion();
if (loadedVersion != null && loadedVersion.equals(currentVersion)) {
return;
}
// if there is no version info, treat it as CURRENT_VERSION_INFO;
if (loadedVersion == null) {
loadedVersion = currentVersion;
}
if (loadedVersion.isCompatibleTo(currentVersion)) {
LOG.info("Storing Router State Version Info {}.", currentVersion);
storeVersion();
} else {
throw new FederationStateVersionIncompatibleException(
"Expecting Router state version " + currentVersion +
", but loading version " + loadedVersion);
}
}
@Override
public AddReservationHomeSubClusterResponse addReservationHomeSubCluster(
AddReservationHomeSubClusterRequest request) throws YarnException {

View File

@ -30,7 +30,6 @@ import java.util.Calendar;
import java.util.List;
import java.util.TimeZone;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.token.delegation.DelegationKey;
@ -38,6 +37,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos;
import org.apache.hadoop.yarn.security.client.YARNDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreInvalidInputException;
@ -102,6 +102,7 @@ import org.apache.hadoop.yarn.server.federation.store.sql.RouterMasterKeyHandler
import org.apache.hadoop.yarn.server.federation.store.sql.RouterStoreTokenHandler;
import org.apache.hadoop.yarn.server.federation.store.sql.RowCountHandler;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.MonotonicClock;
import org.slf4j.Logger;
@ -202,6 +203,12 @@ public class SQLFederationStateStore implements FederationStateStore {
protected static final String CALL_SP_DELETE_DELEGATIONTOKEN =
"{call sp_deleteDelegationToken(?, ?)}";
private static final String CALL_SP_STORE_VERSION =
"{call sp_storeVersion(?, ?, ?)}";
private static final String CALL_SP_LOAD_VERSION =
"{call sp_getVersion(?, ?)}";
private Calendar utcCalendar =
Calendar.getInstance(TimeZone.getTimeZone("UTC"));
@ -218,6 +225,8 @@ public class SQLFederationStateStore implements FederationStateStore {
private Connection conn = null;
private int maxAppsInStateStore;
protected static final Version CURRENT_VERSION_INFO = Version.newInstance(1, 1);
@Override
public void init(Configuration conf) throws YarnException {
driverClass =
@ -993,22 +1002,107 @@ public class SQLFederationStateStore implements FederationStateStore {
@Override
public Version getCurrentVersion() {
throw new NotImplementedException("Code is not implemented");
return CURRENT_VERSION_INFO;
}
@Override
public Version loadVersion() throws Exception {
throw new NotImplementedException("Code is not implemented");
return getVersion();
}
/**
* Query the Version information of Federation from the database.
*
* @return Version Info.
* @throws Exception Exception Information.
*/
public Version getVersion() throws Exception {
CallableStatement callableStatement = null;
Version version = null;
try {
callableStatement = getCallableStatement(CALL_SP_LOAD_VERSION);
// Set the parameters for the stored procedure
callableStatement.registerOutParameter("fedVersion_OUT", java.sql.Types.VARBINARY);
callableStatement.registerOutParameter("versionComment_OUT", VARCHAR);
// Execute the query
long startTime = clock.getTime();
callableStatement.executeUpdate();
long stopTime = clock.getTime();
// Parsing version information.
String versionComment = callableStatement.getString("versionComment_OUT");
byte[] fedVersion = callableStatement.getBytes("fedVersion_OUT");
if (versionComment != null && fedVersion != null) {
version = new VersionPBImpl(YarnServerCommonProtos.VersionProto.parseFrom(fedVersion));
FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - startTime);
}
} catch (SQLException e) {
FederationStateStoreClientMetrics.failedStateStoreCall();
FederationStateStoreUtils.logAndThrowRetriableException(e, LOG,
"Unable to select the version.");
} finally {
// Return to the pool the CallableStatement
FederationStateStoreUtils.returnToPool(LOG, callableStatement);
}
return version;
}
@Override
public void storeVersion() throws Exception {
throw new NotImplementedException("Code is not implemented");
byte[] fedVersion = ((VersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray();
String versionComment = CURRENT_VERSION_INFO.toString();
storeVersion(fedVersion, versionComment);
}
@Override
public void checkVersion() throws Exception {
throw new NotImplementedException("Code is not implemented");
/**
* Store the Federation Version in the database.
*
* @param fedVersion Federation Version.
* @param versionComment Federation Version Comment,
* We use the result of Version toString as version Comment.
* @throws YarnException indicates exceptions from yarn servers.
*/
public void storeVersion(byte[] fedVersion, String versionComment) throws YarnException {
CallableStatement callableStatement = null;
try {
callableStatement = getCallableStatement(CALL_SP_STORE_VERSION);
// Set the parameters for the stored procedure
callableStatement.setBytes("fedVersion_IN", fedVersion);
callableStatement.setString("versionComment_IN", versionComment);
callableStatement.registerOutParameter("rowCount_OUT", INTEGER);
// Execute the query
long startTime = clock.getTime();
callableStatement.executeUpdate();
long stopTime = clock.getTime();
// Check the ROWCOUNT value, if it is equal to 0 it means the call
// did not add a new version into FederationStateStore
int rowCount = callableStatement.getInt("rowCount_OUT");
if (rowCount == 0) {
FederationStateStoreUtils.logAndThrowStoreException(LOG,
"The version %s was not insert into the StateStore.", versionComment);
}
// Check the ROWCOUNT value, if it is different from 1 it means the call
// had a wrong behavior. Maybe the database is not set correctly.
if (rowCount != 1) {
FederationStateStoreUtils.logAndThrowStoreException(LOG,
"Wrong behavior during insert the version %s.", versionComment);
}
FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - startTime);
LOG.info("Insert into the state store the version : {}.", versionComment);
} catch (SQLException e) {
FederationStateStoreClientMetrics.failedStateStoreCall();
FederationStateStoreUtils.logAndThrowRetriableException(e, LOG,
"Unable to insert the newly version : %s.", versionComment);
} finally {
// Return to the pool the CallableStatement
FederationStateStoreUtils.returnToPool(LOG, callableStatement);
}
}
@Override

View File

@ -673,30 +673,6 @@ public class ZookeeperFederationStateStore implements FederationStateStore {
put(versionNode, data, isUpdate);
}
@Override
public void checkVersion() throws Exception {
Version loadedVersion = loadVersion();
LOG.info("Loaded Router State Version Info = {}.", loadedVersion);
Version currentVersion = getCurrentVersion();
if (loadedVersion != null && loadedVersion.equals(currentVersion)) {
return;
}
// if there is no version info, treat it as CURRENT_VERSION_INFO;
if (loadedVersion == null) {
loadedVersion = currentVersion;
}
if (loadedVersion.isCompatibleTo(currentVersion)) {
LOG.info("Storing Router State Version Info {}.", currentVersion);
storeVersion();
} else {
throw new FederationStateVersionIncompatibleException(
"Expecting Router state version " + currentVersion +
", but loading version " + loadedVersion);
}
}
/**
* Get the subcluster for an application.
* @param appId Application identifier.

View File

@ -81,12 +81,15 @@ import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKeyRes
import org.apache.hadoop.yarn.server.federation.store.records.RouterStoreToken;
import org.apache.hadoop.yarn.server.federation.store.records.RouterRMTokenRequest;
import org.apache.hadoop.yarn.server.federation.store.records.RouterRMTokenResponse;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.util.MonotonicClock;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
/**
* Base class for FederationMembershipStateStore implementations.
*/
@ -1045,22 +1048,30 @@ public abstract class FederationStateStoreBaseTest {
checkRouterStoreToken(identifier, getStoreTokenResp);
}
@Test(expected = NotImplementedException.class)
@Test
public void testGetCurrentVersion() {
stateStore.getCurrentVersion();
Version version = stateStore.getCurrentVersion();
assertEquals(1, version.getMajorVersion());
assertEquals(1, version.getMinorVersion());
}
@Test(expected = NotImplementedException.class)
@Test
public void testStoreVersion() throws Exception {
stateStore.storeVersion();
Version version = stateStore.getCurrentVersion();
assertEquals(1, version.getMajorVersion());
assertEquals(1, version.getMinorVersion());
}
@Test(expected = NotImplementedException.class)
@Test
public void testLoadVersion() throws Exception {
stateStore.loadVersion();
stateStore.storeVersion();
Version version = stateStore.loadVersion();
assertEquals(1, version.getMajorVersion());
assertEquals(1, version.getMinorVersion());
}
@Test(expected = NotImplementedException.class)
@Test
public void testCheckVersion() throws Exception {
stateStore.checkVersion();
}

View File

@ -95,6 +95,11 @@ public class HSQLDBFederationStateStore extends SQLFederationStateStore {
+ " nextVal bigint NOT NULL,"
+ " CONSTRAINT pk_sequenceName PRIMARY KEY (sequenceName))";
private static final String TABLE_VERSIONS =
"CREATE TABLE versions ("
+ " fedVersion varbinary(1024) NOT NULL,"
+ " versionComment VARCHAR(255),"
+ " CONSTRAINT pk_fedVersion PRIMARY KEY (fedVersion))";
private static final String SP_REGISTERSUBCLUSTER =
"CREATE PROCEDURE sp_registerSubCluster("
+ " IN subClusterId_IN varchar(256),"
@ -431,6 +436,25 @@ public class HSQLDBFederationStateStore extends SQLFederationStateStore {
+ " GET DIAGNOSTICS rowCount_OUT = ROW_COUNT; "
+ " END ";
protected static final String SP_STORE_VERSION =
"CREATE PROCEDURE sp_storeVersion("
+ " IN fedVersion_IN varbinary(1024), IN versionComment_IN varchar(256), "
+ " OUT rowCount_OUT int)"
+ " MODIFIES SQL DATA BEGIN ATOMIC"
+ " DELETE FROM versions;"
+ " INSERT INTO versions (fedVersion, versionComment)"
+ " VALUES (fedVersion_IN, versionComment_IN);"
+ " GET DIAGNOSTICS rowCount_OUT = ROW_COUNT; "
+ " END ";
protected static final String SP_GET_VERSION =
"CREATE PROCEDURE sp_getVersion("
+ " OUT fedVersion_OUT varbinary(1024), OUT versionComment_OUT varchar(256))"
+ " MODIFIES SQL DATA BEGIN ATOMIC"
+ " SELECT fedVersion, versionComment INTO fedVersion_OUT, versionComment_OUT"
+ " FROM versions; "
+ " END ";
private List<String> tables = new ArrayList<>();
@Override
@ -449,6 +473,7 @@ public class HSQLDBFederationStateStore extends SQLFederationStateStore {
conn.prepareStatement(TABLE_MASTERKEYS).execute();
conn.prepareStatement(TABLE_DELEGATIONTOKENS).execute();
conn.prepareStatement(TABLE_SEQUENCETABLE).execute();
conn.prepareStatement(TABLE_VERSIONS).execute();
conn.prepareStatement(SP_REGISTERSUBCLUSTER).execute();
conn.prepareStatement(SP_DEREGISTERSUBCLUSTER).execute();
@ -481,6 +506,9 @@ public class HSQLDBFederationStateStore extends SQLFederationStateStore {
conn.prepareStatement(SP_UPDATE_DELEGATIONTOKEN).execute();
conn.prepareStatement(SP_DELETE_DELEGATIONTOKEN).execute();
conn.prepareStatement(SP_STORE_VERSION).execute();
conn.prepareStatement(SP_GET_VERSION).execute();
LOG.info("Database Init: Complete");
} catch (Exception e) {
LOG.error("ERROR: failed to initialize HSQLDB {}.", e.getMessage());

View File

@ -90,39 +90,4 @@ public class TestMemoryFederationStateStore extends FederationStateStoreBaseTest
assertTrue(tokenIdentifier instanceof RMDelegationTokenIdentifier);
assertEquals(identifier, tokenIdentifier);
}
@Test
public void testGetCurrentVersion() {
MemoryFederationStateStore memoryStateStore =
MemoryFederationStateStore.class.cast(this.getStateStore());
Version version = memoryStateStore.getCurrentVersion();
assertEquals(version.getMajorVersion(), 1);
assertEquals(version.getMinorVersion(), 1);
}
@Test
public void testStoreVersion() throws Exception {
MemoryFederationStateStore memoryStateStore =
MemoryFederationStateStore.class.cast(this.getStateStore());
memoryStateStore.storeVersion();
Version version = memoryStateStore.getCurrentVersion();
assertEquals(version.getMajorVersion(), 1);
assertEquals(version.getMinorVersion(), 1);
}
@Test
public void testLoadVersion() throws Exception {
MemoryFederationStateStore memoryStateStore =
MemoryFederationStateStore.class.cast(this.getStateStore());
Version version = memoryStateStore.loadVersion();
assertEquals(version.getMajorVersion(), 1);
assertEquals(version.getMinorVersion(), 1);
}
@Test
public void testCheckVersion() throws Exception {
MemoryFederationStateStore memoryStateStore =
MemoryFederationStateStore.class.cast(this.getStateStore());
memoryStateStore.checkVersion();
}
}

View File

@ -278,48 +278,4 @@ public class TestZookeeperFederationStateStore extends FederationStateStoreBaseT
assertNotNull(zkRouterStoreToken);
assertEquals(token, zkRouterStoreToken);
}
@Test
public void testGetCurrentVersion() {
ZookeeperFederationStateStore zkFederationStateStore =
ZookeeperFederationStateStore.class.cast(this.getStateStore());
Version version = zkFederationStateStore.getCurrentVersion();
assertEquals(1, version.getMajorVersion());
assertEquals(1, version.getMinorVersion());
}
@Test
public void testStoreVersion() throws Exception {
ZookeeperFederationStateStore zkFederationStateStore =
ZookeeperFederationStateStore.class.cast(this.getStateStore());
zkFederationStateStore.storeVersion();
Version version = zkFederationStateStore.loadVersion();
assertEquals(1, version.getMajorVersion());
assertEquals(1, version.getMinorVersion());
}
@Test
public void testLoadVersion() throws Exception {
ZookeeperFederationStateStore zkFederationStateStore =
ZookeeperFederationStateStore.class.cast(this.getStateStore());
// We don't store version, loadversion directly will get a null value.
Version version = zkFederationStateStore.loadVersion();
assertNull(version);
// After storing the version information, we will get the accurate version information.
zkFederationStateStore.storeVersion();
Version version1 = zkFederationStateStore.loadVersion();
assertEquals(1, version1.getMajorVersion());
assertEquals(1, version1.getMinorVersion());
}
@Test
public void testCheckVersion() throws Exception {
ZookeeperFederationStateStore zkFederationStateStore =
ZookeeperFederationStateStore.class.cast(this.getStateStore());
zkFederationStateStore.checkVersion();
Version version = zkFederationStateStore.loadVersion();
assertEquals(1, version.getMajorVersion());
assertEquals(1, version.getMinorVersion());
}
}