From 1965708d49a9c0e1cc6c2aef9c92da9c6b921c8b Mon Sep 17 00:00:00 2001 From: slfan1989 <55643692+slfan1989@users.noreply.github.com> Date: Sat, 3 Sep 2022 01:39:58 +0800 Subject: [PATCH] YARN-11273. Federation StateStore: Support storage/retrieval of Reservations With SQL. (#4817) --- .../MySQL/FederationStateStoreStoredProcs.sql | 49 ++ .../MySQL/FederationStateStoreTables.sql | 6 + .../MySQL/dropStoreProcedures.sql | 10 + .../FederationStateStore/MySQL/dropTables.sql | 2 + .../FederationStateStoreStoreProcs.sql | 177 +++++++ .../SQLServer/FederationStateStoreTables.sql | 32 ++ .../store/impl/SQLFederationStateStore.java | 357 ++++++++++++- .../utils/FederationStateStoreUtils.java | 54 ++ .../impl/HSQLDBFederationStateStore.java | 108 ++++ .../impl/TestSQLFederationStateStore.java | 489 +++++++++++++++++- 10 files changed, 1255 insertions(+), 29 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/MySQL/FederationStateStoreStoredProcs.sql b/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/MySQL/FederationStateStoreStoredProcs.sql index eae882e4a48..9434ed38488 100644 --- a/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/MySQL/FederationStateStoreStoredProcs.sql +++ b/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/MySQL/FederationStateStoreStoredProcs.sql @@ -159,4 +159,53 @@ BEGIN FROM policies WHERE queue = queue_IN; END // +CREATE PROCEDURE sp_addReservationHomeSubCluster( + IN reservationId_IN varchar(128), IN homeSubCluster_IN varchar(256), + OUT storedHomeSubCluster_OUT varchar(256), OUT rowCount_OUT int) +BEGIN + INSERT INTO reservationsHomeSubCluster + (reservationId,homeSubCluster) + (SELECT reservationId_IN, homeSubCluster_IN + FROM applicationsHomeSubCluster + WHERE reservationId = reservationId_IN + HAVING COUNT(*) = 0 ); + SELECT ROW_COUNT() INTO rowCount_OUT; + SELECT homeSubCluster INTO storedHomeSubCluster_OUT + FROM reservationsHomeSubCluster + WHERE applicationId = reservationId_IN; +END // + +CREATE PROCEDURE sp_getReservationHomeSubCluster( + IN reservationId_IN varchar(128), + OUT homeSubCluster_OUT varchar(256)) +BEGIN + SELECT homeSubCluster INTO homeSubCluster_OUT + FROM reservationsHomeSubCluster + WHERE reservationId = reservationId_IN; +END // + +CREATE PROCEDURE sp_getReservationsHomeSubCluster() +BEGIN + SELECT reservationId, homeSubCluster + FROM reservationsHomeSubCluster; +END // + +CREATE PROCEDURE sp_updateReservationHomeSubCluster( + IN reservationId_IN varchar(128), + IN homeSubCluster_IN varchar(256), OUT rowCount_OUT int) +BEGIN + UPDATE reservationsHomeSubCluster + SET homeSubCluster = homeSubCluster_IN + WHERE reservationId = reservationId_IN; + SELECT ROW_COUNT() INTO rowCount_OUT; +END // + +CREATE PROCEDURE sp_deleteReservationHomeSubCluster( + IN reservationId_IN varchar(128), OUT rowCount_OUT int) +BEGIN + DELETE FROM reservationsHomeSubCluster + WHERE reservationId = reservationId_IN; + SELECT ROW_COUNT() INTO rowCount_OUT; +END // + DELIMITER ; diff --git a/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/MySQL/FederationStateStoreTables.sql b/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/MySQL/FederationStateStoreTables.sql index 3a255f06bc3..6a3188bab6e 100644 --- a/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/MySQL/FederationStateStoreTables.sql +++ b/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/MySQL/FederationStateStoreTables.sql @@ -46,3 +46,9 @@ CREATE TABLE policies( params varbinary(32768), CONSTRAINT pk_queue PRIMARY KEY (queue) ); + +CREATE TABLE reservationsHomeSubCluster ( + reservationId varchar(128) NOT NULL, + homeSubCluster varchar(256) NOT NULL, + CONSTRAINT pk_reservationId PRIMARY KEY (reservationId) +); \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/MySQL/dropStoreProcedures.sql b/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/MySQL/dropStoreProcedures.sql index f24f3fb22b5..a2f0b882b3f 100644 --- a/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/MySQL/dropStoreProcedures.sql +++ b/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/MySQL/dropStoreProcedures.sql @@ -45,3 +45,13 @@ DROP PROCEDURE sp_setPolicyConfiguration; DROP PROCEDURE sp_getPolicyConfiguration; DROP PROCEDURE sp_getPoliciesConfigurations; + +DROP PROCEDURE sp_addReservationHomeSubCluster; + +DROP PROCEDURE sp_getReservationHomeSubCluster; + +DROP PROCEDURE sp_getReservationsHomeSubCluster; + +DROP PROCEDURE sp_deleteReservationHomeSubCluster; + +DROP PROCEDURE sp_updateReservationHomeSubCluster; diff --git a/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/MySQL/dropTables.sql b/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/MySQL/dropTables.sql index ea6567b028b..d29f8652c15 100644 --- a/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/MySQL/dropTables.sql +++ b/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/MySQL/dropTables.sql @@ -25,3 +25,5 @@ DROP TABLE applicationsHomeSubCluster; DROP TABLE membership; DROP TABLE policies; + +DROP TABLE reservationsHomeSubCluster; diff --git a/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/SQLServer/FederationStateStoreStoreProcs.sql b/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/SQLServer/FederationStateStoreStoreProcs.sql index 66d6f0e2035..ab17aae4f88 100644 --- a/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/SQLServer/FederationStateStoreStoreProcs.sql +++ b/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/SQLServer/FederationStateStoreStoreProcs.sql @@ -508,4 +508,181 @@ AS BEGIN ) WITH log END CATCH END; +GO + +IF OBJECT_ID ( '[sp_addApplicationHomeSubCluster]', 'P' ) IS NOT NULL + DROP PROCEDURE [sp_addApplicationHomeSubCluster]; +GO + +CREATE PROCEDURE [dbo].[sp_addReservationHomeSubCluster] + @reservationId VARCHAR(128), + @homeSubCluster VARCHAR(256), + @storedHomeSubCluster VARCHAR(256) OUTPUT, + @rowCount int OUTPUT +AS BEGIN + DECLARE @errorMessage nvarchar(4000) + + BEGIN TRY + BEGIN TRAN + -- If application to sub-cluster map doesn't exist, insert it. + -- Otherwise don't change the current mapping. + IF NOT EXISTS (SELECT TOP 1 * + FROM [dbo].[reservationsHomeSubCluster] + WHERE [reservationId] = @reservationId) + + INSERT INTO [dbo].[reservationsHomeSubCluster] ( + [reservationId], + [homeSubCluster]) + VALUES ( + @reservationId, + @homeSubCluster); + -- End of the IF block + + SELECT @rowCount = @@ROWCOUNT; + + SELECT @storedHomeSubCluster = [homeSubCluster] + FROM [dbo].[reservationsHomeSubCluster] + WHERE [reservationId] = @reservationId; + + COMMIT TRAN + END TRY + + BEGIN CATCH + ROLLBACK TRAN + + SET @errorMessage = dbo.func_FormatErrorMessage(ERROR_MESSAGE(), ERROR_LINE()) + + /* raise error and terminate the execution */ + RAISERROR(@errorMessage, --- Error Message + 1, -- Severity + -1 -- State + ) WITH log + END CATCH +END; +GO + +IF OBJECT_ID ( '[sp_updateReservationHomeSubCluster]', 'P' ) IS NOT NULL + DROP PROCEDURE [sp_updateReservationHomeSubCluster]; +GO + +CREATE PROCEDURE [dbo].[sp_updateReservationHomeSubCluster] + @reservationId VARCHAR(128), + @homeSubCluster VARCHAR(256), + @rowCount int OUTPUT +AS BEGIN + DECLARE @errorMessage nvarchar(4000) + + BEGIN TRY + BEGIN TRAN + + UPDATE [dbo].[reservationsHomeSubCluster] + SET [homeSubCluster] = @homeSubCluster + WHERE [reservationId] = @reservationId; + SELECT @rowCount = @@ROWCOUNT; + + COMMIT TRAN + END TRY + + BEGIN CATCH + ROLLBACK TRAN + + SET @errorMessage = dbo.func_FormatErrorMessage(ERROR_MESSAGE(), ERROR_LINE()) + + /* raise error and terminate the execution */ + RAISERROR(@errorMessage, --- Error Message + 1, -- Severity + -1 -- State + ) WITH log + END CATCH +END; +GO + +IF OBJECT_ID ( '[sp_getReservationsHomeSubCluster]', 'P' ) IS NOT NULL + DROP PROCEDURE [sp_getReservationsHomeSubCluster]; +GO + +CREATE PROCEDURE [dbo].[sp_getReservationsHomeSubCluster] +AS BEGIN + DECLARE @errorMessage nvarchar(4000) + + BEGIN TRY + SELECT [reservationId], [homeSubCluster], [createTime] + FROM [dbo].[reservationsHomeSubCluster] + END TRY + + BEGIN CATCH + SET @errorMessage = dbo.func_FormatErrorMessage(ERROR_MESSAGE(), ERROR_LINE()) + + /* raise error and terminate the execution */ + RAISERROR(@errorMessage, --- Error Message + 1, -- Severity + -1 -- State + ) WITH log + END CATCH +END; +GO + +IF OBJECT_ID ( '[sp_getReservationHomeSubCluster]', 'P' ) IS NOT NULL + DROP PROCEDURE [sp_getReservationHomeSubCluster]; +GO + +CREATE PROCEDURE [dbo].[sp_getReservationHomeSubCluster] + @reservationId VARCHAR(128), + @homeSubCluster VARCHAR(256) OUTPUT +AS BEGIN + DECLARE @errorMessage nvarchar(4000) + + BEGIN TRY + + SELECT @homeSubCluster = [homeSubCluster] + FROM [dbo].[reservationsHomeSubCluster] + WHERE [reservationId] = @reservationId; + + END TRY + + BEGIN CATCH + + SET @errorMessage = dbo.func_FormatErrorMessage(ERROR_MESSAGE(), ERROR_LINE()) + + /* raise error and terminate the execution */ + RAISERROR(@errorMessage, --- Error Message + 1, -- Severity + -1 -- State + ) WITH log + END CATCH +END; +GO + +IF OBJECT_ID ( '[sp_deleteReservationHomeSubCluster]', 'P' ) IS NOT NULL + DROP PROCEDURE [sp_deleteReservationHomeSubCluster]; +GO + +CREATE PROCEDURE [dbo].[sp_deleteReservationHomeSubCluster] + @reservationId VARCHAR(128), + @rowCount int OUTPUT +AS BEGIN + DECLARE @errorMessage nvarchar(4000) + + BEGIN TRY + BEGIN TRAN + + DELETE FROM [dbo].[reservationsHomeSubCluster] + WHERE [reservationId] = @reservationId; + SELECT @rowCount = @@ROWCOUNT; + + COMMIT TRAN + END TRY + + BEGIN CATCH + ROLLBACK TRAN + + SET @errorMessage = dbo.func_FormatErrorMessage(ERROR_MESSAGE(), ERROR_LINE()) + + /* raise error and terminate the execution */ + RAISERROR(@errorMessage, --- Error Message + 1, -- Severity + -1 -- State + ) WITH log + END CATCH +END; GO \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/SQLServer/FederationStateStoreTables.sql b/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/SQLServer/FederationStateStoreTables.sql index c16091ccf14..84f28acc174 100644 --- a/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/SQLServer/FederationStateStoreTables.sql +++ b/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/SQLServer/FederationStateStoreTables.sql @@ -124,3 +124,35 @@ ELSE PRINT 'Table policies exists, no operation required...' GO GO + +IF NOT EXISTS ( SELECT * FROM [FederationStateStore].sys.tables + WHERE name = 'reservationsHomeSubCluster' + AND schema_id = SCHEMA_ID('dbo')) + BEGIN + PRINT 'Table reservationsHomeSubCluster does not exist, create it...' + + SET ANSI_NULLS ON + + SET QUOTED_IDENTIFIER ON + + SET ANSI_PADDING ON + + CREATE TABLE [dbo].[reservationsHomeSubCluster]( + reservationId VARCHAR(128) COLLATE Latin1_General_100_BIN2 NOT NULL, + homeSubCluster VARCHAR(256) NOT NULL, + createTime DATETIME2 NOT NULL CONSTRAINT ts_createAppTime DEFAULT GETUTCDATE(), + + CONSTRAINT [pk_reservationId] PRIMARY KEY + ( + [reservationId] + ) + ) + + SET ANSI_PADDING OFF + + PRINT 'Table reservationsHomeSubCluster created.' + END +ELSE + PRINT 'Table reservationsHomeSubCluster exists, no operation required...' + GO +GO \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java index dffcfa6a10e..0c0b5c9e0f6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java @@ -30,8 +30,10 @@ import java.util.List; import java.util.TimeZone; import org.apache.commons.lang3.NotImplementedException; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; @@ -78,10 +80,12 @@ import org.apache.hadoop.yarn.server.federation.store.records.DeleteReservationH import org.apache.hadoop.yarn.server.federation.store.records.DeleteReservationHomeSubClusterResponse; import org.apache.hadoop.yarn.server.federation.store.records.UpdateReservationHomeSubClusterRequest; import org.apache.hadoop.yarn.server.federation.store.records.UpdateReservationHomeSubClusterResponse; +import org.apache.hadoop.yarn.server.federation.store.records.ReservationHomeSubCluster; import org.apache.hadoop.yarn.server.federation.store.utils.FederationApplicationHomeSubClusterStoreInputValidator; import org.apache.hadoop.yarn.server.federation.store.utils.FederationMembershipStateStoreInputValidator; import org.apache.hadoop.yarn.server.federation.store.utils.FederationPolicyStoreInputValidator; import org.apache.hadoop.yarn.server.federation.store.utils.FederationStateStoreUtils; +import org.apache.hadoop.yarn.server.federation.store.utils.FederationReservationHomeSubClusterStoreInputValidator; import org.apache.hadoop.yarn.server.records.Version; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.MonotonicClock; @@ -140,6 +144,21 @@ public class SQLFederationStateStore implements FederationStateStore { private static final String CALL_SP_GET_POLICIES_CONFIGURATIONS = "{call sp_getPoliciesConfigurations()}"; + protected static final String CALL_SP_ADD_RESERVATION_HOME_SUBCLUSTER = + "{call sp_addReservationHomeSubCluster(?, ?, ?, ?)}"; + + protected static final String CALL_SP_GET_RESERVATION_HOME_SUBCLUSTER = + "{call sp_getReservationHomeSubCluster(?, ?)}"; + + protected static final String CALL_SP_GET_RESERVATIONS_HOME_SUBCLUSTER = + "{call sp_getReservationsHomeSubCluster()}"; + + protected static final String CALL_SP_DELETE_RESERVATION_HOME_SUBCLUSTER = + "{call sp_deleteReservationHomeSubCluster(?, ?)}"; + + protected static final String CALL_SP_UPDATE_RESERVATION_HOME_SUBCLUSTER = + "{call sp_updateReservationHomeSubCluster(?, ?, ?)}"; + private Calendar utcCalendar = Calendar.getInstance(TimeZone.getTimeZone("UTC")); @@ -997,12 +1016,14 @@ public class SQLFederationStateStore implements FederationStateStore { * @return a connection from the DataSource pool. * @throws SQLException on failure */ - public Connection getConnection() throws SQLException { + @VisibleForTesting + protected Connection getConnection() throws SQLException { FederationStateStoreClientMetrics.incrConnections(); return dataSource.getConnection(); } - private CallableStatement getCallableStatement(String procedure) + @VisibleForTesting + protected CallableStatement getCallableStatement(String procedure) throws SQLException { return conn.prepareCall(procedure); } @@ -1016,30 +1037,352 @@ public class SQLFederationStateStore implements FederationStateStore { @Override public AddReservationHomeSubClusterResponse addReservationHomeSubCluster( AddReservationHomeSubClusterRequest request) throws YarnException { - throw new NotImplementedException("Code is not implemented"); + + // validate + FederationReservationHomeSubClusterStoreInputValidator.validate(request); + CallableStatement cstmt = null; + + ReservationHomeSubCluster reservationHomeSubCluster = request.getReservationHomeSubCluster(); + ReservationId reservationId = reservationHomeSubCluster.getReservationId(); + SubClusterId subClusterId = reservationHomeSubCluster.getHomeSubCluster(); + SubClusterId subClusterHomeId = null; + + try { + + // Defined the sp_addReservationHomeSubCluster procedure + // this procedure requires 4 parameters + // Input parameters + // 1)IN reservationId_IN varchar(128) + // 2)IN homeSubCluster_IN varchar(256) + // Output parameters + // 3)OUT storedHomeSubCluster_OUT varchar(256) + // 4)OUT rowCount_OUT int + + // Call procedure + cstmt = getCallableStatement(CALL_SP_ADD_RESERVATION_HOME_SUBCLUSTER); + + // Set the parameters for the stored procedure + // 1)IN reservationId_IN varchar(128) + cstmt.setString("reservationId_IN", reservationId.toString()); + // 2)IN homeSubCluster_IN varchar(256) + cstmt.setString("homeSubCluster_IN", subClusterId.getId()); + // 3) OUT storedHomeSubCluster_OUT varchar(256) + cstmt.registerOutParameter("storedHomeSubCluster_OUT", java.sql.Types.VARCHAR); + // 4) OUT rowCount_OUT int + cstmt.registerOutParameter("rowCount_OUT", java.sql.Types.INTEGER); + + // Execute the query + long startTime = clock.getTime(); + cstmt.executeUpdate(); + long stopTime = clock.getTime(); + + // Get SubClusterHome + String subClusterHomeIdString = cstmt.getString("storedHomeSubCluster_OUT"); + subClusterHomeId = SubClusterId.newInstance(subClusterHomeIdString); + + // Get rowCount + int rowCount = cstmt.getInt("rowCount_OUT"); + + // For failover reason, we check the returned subClusterId. + // 1.If it is equal to the subClusterId we sent, the call added the new + // reservation into FederationStateStore. + // 2.If the call returns a different subClusterId + // it means we already tried to insert this reservation + // but a component (Router/StateStore/RM) failed during the submission. + if (subClusterId.equals(subClusterHomeId)) { + // if it is equal to 0 + // it means the call did not add a new reservation into FederationStateStore. + if (rowCount == 0) { + LOG.info("The reservation {} was not inserted in the StateStore because it" + + " was already present in subCluster {}", reservationId, subClusterHomeId); + } else if (rowCount != 1) { + // if it is different from 1 + // it means the call had a wrong behavior. Maybe the database is not set correctly. + FederationStateStoreUtils.logAndThrowStoreException(LOG, + "Wrong behavior during the insertion of subCluster %s according to reservation %s. " + + "The database expects to insert 1 record, but the number of " + + "inserted changes is greater than 1, " + + "please check the records of the database.", + subClusterId, reservationId); + } + } else { + // If it is different from 0, + // it means that there is a data situation that does not meet the expectations, + // and an exception should be thrown at this time + if (rowCount != 0) { + FederationStateStoreUtils.logAndThrowStoreException(LOG, + "The reservation %s does exist but was overwritten.", reservationId); + } + LOG.info("Reservation: {} already present with subCluster: {}.", + reservationId, subClusterHomeId); + } + + // Record successful call time + FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - startTime); + } catch (SQLException e) { + FederationStateStoreClientMetrics.failedStateStoreCall(); + FederationStateStoreUtils.logAndThrowRetriableException(e, LOG, + "Unable to insert the newly generated reservation %s to subCluster %s.", + reservationId, subClusterId); + } finally { + // Return to the pool the CallableStatement + FederationStateStoreUtils.returnToPool(LOG, cstmt); + } + + return AddReservationHomeSubClusterResponse.newInstance(subClusterHomeId); } @Override public GetReservationHomeSubClusterResponse getReservationHomeSubCluster( GetReservationHomeSubClusterRequest request) throws YarnException { - throw new NotImplementedException("Code is not implemented"); + // validate + FederationReservationHomeSubClusterStoreInputValidator.validate(request); + + CallableStatement cstmt = null; + ReservationId reservationId = request.getReservationId(); + SubClusterId subClusterId = null; + + try { + + // Defined the sp_getReservationHomeSubCluster procedure + // this procedure requires 2 parameters + // Input parameters + // 1)IN reservationId_IN varchar(128) + // Output parameters + // 2)OUT homeSubCluster_OUT varchar(256) + + cstmt = getCallableStatement(CALL_SP_GET_RESERVATION_HOME_SUBCLUSTER); + + // Set the parameters for the stored procedure + // 1)IN reservationId_IN varchar(128) + cstmt.setString("reservationId_IN", reservationId.toString()); + // 2)OUT homeSubCluster_OUT varchar(256) + cstmt.registerOutParameter("homeSubCluster_OUT", java.sql.Types.VARCHAR); + + // Execute the query + long startTime = clock.getTime(); + cstmt.execute(); + long stopTime = clock.getTime(); + + // Get Result + String subClusterHomeIdString = cstmt.getString("homeSubCluster_OUT"); + + if (StringUtils.isNotBlank(subClusterHomeIdString)) { + subClusterId = SubClusterId.newInstance(subClusterHomeIdString); + } else { + // If subClusterHomeIdString blank, we need to throw an exception + FederationStateStoreUtils.logAndThrowRetriableException(LOG, + "Reservation %s does not exist", reservationId); + } + + LOG.info("Got the information about the specified reservation {} in subCluster = {}.", + reservationId, subClusterId); + + FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - startTime); + + ReservationHomeSubCluster homeSubCluster = + ReservationHomeSubCluster.newInstance(reservationId, subClusterId); + return GetReservationHomeSubClusterResponse.newInstance(homeSubCluster); + + } catch (SQLException e) { + FederationStateStoreClientMetrics.failedStateStoreCall(); + FederationStateStoreUtils.logAndThrowRetriableException(e, LOG, + "Unable to obtain the reservation information according to %s.", reservationId); + } finally { + // Return to the pool the CallableStatement + FederationStateStoreUtils.returnToPool(LOG, cstmt); + } + + throw new YarnException( + "Unable to obtain the reservation information according to " + reservationId); } @Override public GetReservationsHomeSubClusterResponse getReservationsHomeSubCluster( GetReservationsHomeSubClusterRequest request) throws YarnException { - throw new NotImplementedException("Code is not implemented"); + CallableStatement cstmt = null; + ResultSet rs = null; + List reservationsHomeSubClusters = new ArrayList<>(); + + try { + + // Defined the sp_getReservationsHomeSubCluster procedure + // This procedure requires no input parameters, but will have 2 output parameters + // Output parameters + // 1)OUT reservationId + // 2)OUT homeSubCluster + + cstmt = getCallableStatement(CALL_SP_GET_RESERVATIONS_HOME_SUBCLUSTER); + + // Execute the query + long startTime = clock.getTime(); + rs = cstmt.executeQuery(); + long stopTime = clock.getTime(); + + while (rs.next()) { + // Extract the output for each tuple + // 1)OUT reservationId + String dbReservationId = rs.getString("reservationId"); + // 2)OUT homeSubCluster + String dbHomeSubCluster = rs.getString("homeSubCluster"); + + // Generate parameters + ReservationId reservationId = ReservationId.parseReservationId(dbReservationId); + SubClusterId homeSubCluster = SubClusterId.newInstance(dbHomeSubCluster); + ReservationHomeSubCluster reservationHomeSubCluster = + ReservationHomeSubCluster.newInstance(reservationId, homeSubCluster); + reservationsHomeSubClusters.add(reservationHomeSubCluster); + } + + FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - startTime); + + return GetReservationsHomeSubClusterResponse.newInstance( + reservationsHomeSubClusters); + } catch (Exception e) { + FederationStateStoreClientMetrics.failedStateStoreCall(); + FederationStateStoreUtils.logAndThrowRetriableException(LOG, + "Unable to obtain the information for all the reservations.", e); + } finally { + // Return to the pool the CallableStatement + FederationStateStoreUtils.returnToPool(LOG, cstmt, null, rs); + } + + throw new YarnException("Unable to obtain the information for all the reservations."); } @Override public DeleteReservationHomeSubClusterResponse deleteReservationHomeSubCluster( DeleteReservationHomeSubClusterRequest request) throws YarnException { - throw new NotImplementedException("Code is not implemented"); + + // validate + FederationReservationHomeSubClusterStoreInputValidator.validate(request); + + CallableStatement cstmt = null; + ReservationId reservationId = request.getReservationId(); + + try { + + // Defined the sp_deleteReservationHomeSubCluster procedure + // This procedure requires 1 input parameters, 1 output parameters + // Input parameters + // 1)IN reservationId_IN varchar(128) + // Output parameters + // 2)OUT rowCount_OUT int + + cstmt = getCallableStatement(CALL_SP_DELETE_RESERVATION_HOME_SUBCLUSTER); + + // Set the parameters for the stored procedure + // 1)IN reservationId_IN varchar(128) + cstmt.setString("reservationId_IN", reservationId.toString()); + // 2)OUT rowCount_OUT int + cstmt.registerOutParameter("rowCount_OUT", java.sql.Types.INTEGER); + + // Execute the query + long startTime = clock.getTime(); + cstmt.executeUpdate(); + long stopTime = clock.getTime(); + + int rowCount = cstmt.getInt("rowCount_OUT"); + + // if it is equal to 0 it means the call + // did not delete the reservation from FederationStateStore + if (rowCount == 0) { + FederationStateStoreUtils.logAndThrowStoreException(LOG, + "Reservation %s does not exist", reservationId); + } else if (rowCount != 1) { + // if it is different from 1 it means the call + // had a wrong behavior. Maybe the database is not set correctly. + FederationStateStoreUtils.logAndThrowStoreException(LOG, + "Wrong behavior during deleting the reservation %s. " + + "The database is expected to delete 1 record, " + + "but the number of deleted records returned by the database is greater than 1, " + + "indicating that a duplicate reservationId occurred during the deletion process.", + reservationId); + } + + LOG.info("Delete from the StateStore the reservation: {}.", reservationId); + FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - startTime); + return DeleteReservationHomeSubClusterResponse.newInstance(); + } catch (SQLException e) { + FederationStateStoreClientMetrics.failedStateStoreCall(); + FederationStateStoreUtils.logAndThrowRetriableException(e, LOG, + "Unable to delete the reservation %s.", reservationId); + } finally { + // Return to the pool the CallableStatement + FederationStateStoreUtils.returnToPool(LOG, cstmt); + } + throw new YarnException("Unable to delete the reservation " + reservationId); } @Override public UpdateReservationHomeSubClusterResponse updateReservationHomeSubCluster( UpdateReservationHomeSubClusterRequest request) throws YarnException { - throw new NotImplementedException("Code is not implemented"); + + // validate + FederationReservationHomeSubClusterStoreInputValidator.validate(request); + + CallableStatement cstmt = null; + ReservationHomeSubCluster reservationHomeSubCluster = request.getReservationHomeSubCluster(); + ReservationId reservationId = reservationHomeSubCluster.getReservationId(); + SubClusterId subClusterId = reservationHomeSubCluster.getHomeSubCluster(); + + try { + + // Defined the sp_updateReservationHomeSubCluster procedure + // This procedure requires 2 input parameters, 1 output parameters + // Input parameters + // 1)IN reservationId_IN varchar(128) + // 2)IN homeSubCluster_IN varchar(256) + // Output parameters + // 3)OUT rowCount_OUT int + + cstmt = getCallableStatement(CALL_SP_UPDATE_RESERVATION_HOME_SUBCLUSTER); + + // Set the parameters for the stored procedure + // 1)IN reservationId_IN varchar(128) + cstmt.setString("reservationId_IN", reservationId.toString()); + // 2)IN homeSubCluster_IN varchar(256) + cstmt.setString("homeSubCluster_IN", subClusterId.getId()); + // 3)OUT rowCount_OUT int + cstmt.registerOutParameter("rowCount_OUT", java.sql.Types.INTEGER); + + // Execute the query + long startTime = clock.getTime(); + cstmt.executeUpdate(); + long stopTime = clock.getTime(); + + int rowCount = cstmt.getInt("rowCount_OUT"); + + // if it is equal to 0 it means the call + // did not update the reservation into FederationStateStore + if (rowCount == 0) { + FederationStateStoreUtils.logAndThrowStoreException(LOG, + "Reservation %s does not exist", reservationId); + } else if (rowCount != 1) { + // if it is different from 1 it means the call + // had a wrong behavior. Maybe the database is not set correctly. + FederationStateStoreUtils.logAndThrowStoreException(LOG, + "Wrong behavior during update the subCluster %s according to reservation %s. " + + "The database is expected to update 1 record, " + + "but the number of database update records is greater than 1, " + + "the records of the database should be checked.", + subClusterId, reservationId); + } + LOG.info("Update the subCluster to {} for reservation {} in the StateStore.", + subClusterId, reservationId); + FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - startTime); + return UpdateReservationHomeSubClusterResponse.newInstance(); + } catch (SQLException e) { + FederationStateStoreClientMetrics.failedStateStoreCall(); + FederationStateStoreUtils.logAndThrowRetriableException(e, LOG, + "Unable to update the subCluster %s according to reservation %s.", + subClusterId, reservationId); + } finally { + // Return to the pool the CallableStatement + FederationStateStoreUtils.returnToPool(LOG, cstmt); + } + throw new YarnException( + "Unable to update the subCluster " + subClusterId + + " according to reservation" + reservationId); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationStateStoreUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationStateStoreUtils.java index 27a4f7dba53..7dc53f8e0ac 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationStateStoreUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationStateStoreUtils.java @@ -145,6 +145,22 @@ public final class FederationStateStoreUtils { throw new FederationStateStoreException(errMsg); } + /** + * Throws an FederationStateStoreException due to an error in + * FederationStateStore. + * + * @param log the logger interface + * @param errMsgFormat the error message format string. + * @param args referenced by the format specifiers in the format string. + * @throws YarnException on failure + */ + public static void logAndThrowStoreException(Logger log, String errMsgFormat, Object... args) + throws YarnException { + String errMsg = String.format(errMsgFormat, args); + log.error(errMsg); + throw new FederationStateStoreException(errMsg); + } + /** * Throws an FederationStateStoreInvalidInputException due to an * error in FederationStateStore. @@ -179,6 +195,44 @@ public final class FederationStateStoreUtils { } } + /** + * Throws an FederationStateStoreRetriableException due to an + * error in FederationStateStore. + * + * @param t the throwable raised in the called class. + * @param log the logger interface. + * @param errMsgFormat the error message format string. + * @param args referenced by the format specifiers in the format string. + * @throws YarnException on failure + */ + public static void logAndThrowRetriableException( + Throwable t, Logger log, String errMsgFormat, Object... args) throws YarnException { + String errMsg = String.format(errMsgFormat, args); + if (t != null) { + log.error(errMsg, t); + throw new FederationStateStoreRetriableException(errMsg, t); + } else { + log.error(errMsg); + throw new FederationStateStoreRetriableException(errMsg); + } + } + + /** + * Throws an FederationStateStoreRetriableException due to an + * error in FederationStateStore. + * + * @param log the logger interface. + * @param errMsgFormat the error message format string. + * @param args referenced by the format specifiers in the format string. + * @throws YarnException on failure + */ + public static void logAndThrowRetriableException( + Logger log, String errMsgFormat, Object... args) throws YarnException { + String errMsg = String.format(errMsgFormat, args); + log.error(errMsg); + throw new FederationStateStoreRetriableException(errMsg); + } + /** * Sets a specific value for a specific property of * HikariDataSource SQL connections. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/HSQLDBFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/HSQLDBFederationStateStore.java index c3d0a9e1bbd..2eb95bbd6f6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/HSQLDBFederationStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/HSQLDBFederationStateStore.java @@ -58,6 +58,12 @@ public class HSQLDBFederationStateStore extends SQLFederationStateStore { + " policyType varchar(256) NOT NULL, params varbinary(512)," + " CONSTRAINT pk_queue PRIMARY KEY (queue))"; + private static final String TABLE_RESERVATIONSHOMESUBCLUSTER = + " CREATE TABLE reservationsHomeSubCluster (" + + " reservationId varchar(128) NOT NULL," + + " homeSubCluster varchar(256) NOT NULL," + + " CONSTRAINT pk_reservationId PRIMARY KEY (reservationId))"; + private static final String SP_REGISTERSUBCLUSTER = "CREATE PROCEDURE sp_registerSubCluster(" + " IN subClusterId_IN varchar(256)," @@ -201,6 +207,101 @@ public class HSQLDBFederationStateStore extends SQLFederationStateStore { + " DECLARE result CURSOR FOR" + " SELECT * FROM policies; OPEN result; END"; + private static final String SP_ADDRESERVATIONHOMESUBCLUSTER = + "CREATE PROCEDURE sp_addReservationHomeSubCluster(" + + " IN reservationId_IN varchar(128)," + + " IN homeSubCluster_IN varchar(256)," + + " OUT storedHomeSubCluster_OUT varchar(256), OUT rowCount_OUT int)" + + " MODIFIES SQL DATA BEGIN ATOMIC" + + " INSERT INTO reservationsHomeSubCluster " + + " (reservationId,homeSubCluster) " + + " (SELECT reservationId_IN, homeSubCluster_IN" + + " FROM reservationsHomeSubCluster" + + " WHERE reservationId = reservationId_IN" + + " HAVING COUNT(*) = 0 );" + + " GET DIAGNOSTICS rowCount_OUT = ROW_COUNT;" + + " SELECT homeSubCluster INTO storedHomeSubCluster_OUT" + + " FROM reservationsHomeSubCluster" + + " WHERE reservationId = reservationId_IN; END"; + + private static final String SP_GETRESERVATIONHOMESUBCLUSTER = + "CREATE PROCEDURE sp_getReservationHomeSubCluster(" + + " IN reservationId_IN varchar(128)," + + " OUT homeSubCluster_OUT varchar(256))" + + " MODIFIES SQL DATA BEGIN ATOMIC" + + " SELECT homeSubCluster INTO homeSubCluster_OUT" + + " FROM reservationsHomeSubCluster" + + " WHERE reservationId = reservationId_IN; END"; + + private static final String SP_GETRESERVATIONSHOMESUBCLUSTER = + "CREATE PROCEDURE sp_getReservationsHomeSubCluster()" + + " MODIFIES SQL DATA DYNAMIC RESULT SETS 1 BEGIN ATOMIC" + + " DECLARE result CURSOR FOR" + + " SELECT reservationId, homeSubCluster" + + " FROM reservationsHomeSubCluster; OPEN result; END"; + + private static final String SP_DELETERESERVATIONHOMESUBCLUSTER = + "CREATE PROCEDURE sp_deleteReservationHomeSubCluster(" + + " IN reservationId_IN varchar(128), OUT rowCount_OUT int)" + + " MODIFIES SQL DATA BEGIN ATOMIC" + + " DELETE FROM reservationsHomeSubCluster" + + " WHERE reservationId = reservationId_IN;" + + " GET DIAGNOSTICS rowCount_OUT = ROW_COUNT; END"; + + private static final String SP_UPDATERESERVATIONHOMESUBCLUSTER = + "CREATE PROCEDURE sp_updateReservationHomeSubCluster(" + + " IN reservationId_IN varchar(128)," + + " IN homeSubCluster_IN varchar(256), OUT rowCount_OUT int)" + + " MODIFIES SQL DATA BEGIN ATOMIC" + + " UPDATE reservationsHomeSubCluster" + + " SET homeSubCluster = homeSubCluster_IN" + + " WHERE reservationId = reservationId_IN;" + + " GET DIAGNOSTICS rowCount_OUT = ROW_COUNT; END"; + + protected static final String SP_DROP_ADDRESERVATIONHOMESUBCLUSTER = + "DROP PROCEDURE sp_addReservationHomeSubCluster"; + + protected static final String SP_ADDRESERVATIONHOMESUBCLUSTER2 = + "CREATE PROCEDURE sp_addReservationHomeSubCluster(" + + " IN reservationId_IN varchar(128)," + + " IN homeSubCluster_IN varchar(256)," + + " OUT storedHomeSubCluster_OUT varchar(256), OUT rowCount_OUT int)" + + " MODIFIES SQL DATA BEGIN ATOMIC" + + " INSERT INTO reservationsHomeSubCluster " + + " (reservationId,homeSubCluster) " + + " (SELECT reservationId_IN, homeSubCluster_IN" + + " FROM reservationsHomeSubCluster" + + " WHERE reservationId = reservationId_IN" + + " HAVING COUNT(*) = 0 );" + + " SELECT homeSubCluster, 2 INTO storedHomeSubCluster_OUT, rowCount_OUT" + + " FROM reservationsHomeSubCluster" + + " WHERE reservationId = reservationId_IN; END"; + + protected static final String SP_DROP_UPDATERESERVATIONHOMESUBCLUSTER = + "DROP PROCEDURE sp_updateReservationHomeSubCluster"; + + protected static final String SP_UPDATERESERVATIONHOMESUBCLUSTER2 = + "CREATE PROCEDURE sp_updateReservationHomeSubCluster(" + + " IN reservationId_IN varchar(128)," + + " IN homeSubCluster_IN varchar(256), OUT rowCount_OUT int)" + + " MODIFIES SQL DATA BEGIN ATOMIC" + + " UPDATE reservationsHomeSubCluster" + + " SET homeSubCluster = homeSubCluster_IN" + + " WHERE reservationId = reservationId_IN;" + + " SET rowCount_OUT = 2; END"; + + protected static final String SP_DROP_DELETERESERVATIONHOMESUBCLUSTER = + "DROP PROCEDURE sp_deleteReservationHomeSubCluster"; + + protected static final String SP_DELETERESERVATIONHOMESUBCLUSTER2 = + "CREATE PROCEDURE sp_deleteReservationHomeSubCluster(" + + " IN reservationId_IN varchar(128), OUT rowCount_OUT int)" + + " MODIFIES SQL DATA BEGIN ATOMIC" + + " DELETE FROM reservationsHomeSubCluster" + + " WHERE reservationId = reservationId_IN;" + + " SET rowCount_OUT = 2; END"; + + @Override public void init(Configuration conf) { try { @@ -216,6 +317,7 @@ public class HSQLDBFederationStateStore extends SQLFederationStateStore { conn.prepareStatement(TABLE_APPLICATIONSHOMESUBCLUSTER).execute(); conn.prepareStatement(TABLE_MEMBERSHIP).execute(); conn.prepareStatement(TABLE_POLICIES).execute(); + conn.prepareStatement(TABLE_RESERVATIONSHOMESUBCLUSTER).execute(); conn.prepareStatement(SP_REGISTERSUBCLUSTER).execute(); conn.prepareStatement(SP_DEREGISTERSUBCLUSTER).execute(); @@ -233,6 +335,12 @@ public class HSQLDBFederationStateStore extends SQLFederationStateStore { conn.prepareStatement(SP_GETPOLICYCONFIGURATION).execute(); conn.prepareStatement(SP_GETPOLICIESCONFIGURATIONS).execute(); + conn.prepareStatement(SP_ADDRESERVATIONHOMESUBCLUSTER).execute(); + conn.prepareStatement(SP_GETRESERVATIONHOMESUBCLUSTER).execute(); + conn.prepareStatement(SP_GETRESERVATIONSHOMESUBCLUSTER).execute(); + conn.prepareStatement(SP_DELETERESERVATIONHOMESUBCLUSTER).execute(); + conn.prepareStatement(SP_UPDATERESERVATIONHOMESUBCLUSTER).execute(); + LOG.info("Database Init: Complete"); } catch (SQLException e) { LOG.error("ERROR: failed to inizialize HSQLDB " + e.getMessage()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestSQLFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestSQLFederationStateStore.java index 72c820b0ed0..d257b870d07 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestSQLFederationStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestSQLFederationStateStore.java @@ -17,8 +17,10 @@ package org.apache.hadoop.yarn.server.federation.store.impl; -import org.apache.commons.lang3.NotImplementedException; +import org.apache.hadoop.test.LambdaTestUtils; +import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; @@ -26,14 +28,43 @@ import org.apache.hadoop.yarn.server.federation.store.metrics.FederationStateSto import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.ReservationHomeSubCluster; +import org.apache.hadoop.yarn.server.federation.store.records.AddReservationHomeSubClusterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.UpdateReservationHomeSubClusterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.DeleteReservationHomeSubClusterRequest; +import org.apache.hadoop.yarn.server.federation.store.utils.FederationStateStoreUtils; import org.junit.Assert; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.sql.CallableStatement; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; + +import static org.apache.hadoop.yarn.server.federation.store.impl.SQLFederationStateStore.CALL_SP_ADD_RESERVATION_HOME_SUBCLUSTER; +import static org.apache.hadoop.yarn.server.federation.store.impl.SQLFederationStateStore.CALL_SP_GET_RESERVATION_HOME_SUBCLUSTER; +import static org.apache.hadoop.yarn.server.federation.store.impl.SQLFederationStateStore.CALL_SP_GET_RESERVATIONS_HOME_SUBCLUSTER; +import static org.apache.hadoop.yarn.server.federation.store.impl.SQLFederationStateStore.CALL_SP_UPDATE_RESERVATION_HOME_SUBCLUSTER; +import static org.apache.hadoop.yarn.server.federation.store.impl.SQLFederationStateStore.CALL_SP_DELETE_RESERVATION_HOME_SUBCLUSTER; +import static org.apache.hadoop.yarn.server.federation.store.impl.HSQLDBFederationStateStore.SP_DROP_ADDRESERVATIONHOMESUBCLUSTER; +import static org.apache.hadoop.yarn.server.federation.store.impl.HSQLDBFederationStateStore.SP_ADDRESERVATIONHOMESUBCLUSTER2; +import static org.apache.hadoop.yarn.server.federation.store.impl.HSQLDBFederationStateStore.SP_DROP_UPDATERESERVATIONHOMESUBCLUSTER; +import static org.apache.hadoop.yarn.server.federation.store.impl.HSQLDBFederationStateStore.SP_UPDATERESERVATIONHOMESUBCLUSTER2; +import static org.apache.hadoop.yarn.server.federation.store.impl.HSQLDBFederationStateStore.SP_DROP_DELETERESERVATIONHOMESUBCLUSTER; +import static org.apache.hadoop.yarn.server.federation.store.impl.HSQLDBFederationStateStore.SP_DELETERESERVATIONHOMESUBCLUSTER2; /** * Unit tests for SQLFederationStateStore. */ public class TestSQLFederationStateStore extends FederationStateStoreBaseTest { + public static final Logger LOG = + LoggerFactory.getLogger(TestSQLFederationStateStore.class); private static final String HSQLDB_DRIVER = "org.hsqldb.jdbc.JDBCDataSource"; private static final String DATABASE_URL = "jdbc:hsqldb:mem:state"; private static final String DATABASE_USERNAME = "SA"; @@ -76,38 +107,452 @@ public class TestSQLFederationStateStore extends FederationStateStoreBaseTest { FederationStateStoreClientMetrics.getNumConnections()); } - @Test(expected = NotImplementedException.class) - public void testAddReservationHomeSubCluster() throws Exception { - super.testAddReservationHomeSubCluster(); + class ReservationHomeSC { + private String reservationId; + private String subHomeClusterId; + private int dbUpdateCount; + + ReservationHomeSC(String rId, String subHomeSCId, int dbUpdateCount) { + this.reservationId = rId; + this.subHomeClusterId = subHomeSCId; + this.dbUpdateCount = dbUpdateCount; + } } - @Test(expected = NotImplementedException.class) - public void testAddReservationHomeSubClusterReservationAlreadyExists() throws Exception { - super.testAddReservationHomeSubClusterReservationAlreadyExists(); + private ReservationHomeSC addReservationHomeSubCluster( + SQLFederationStateStore sqlFederationStateStore, String procedure, + String reservationId, String subHomeClusterId) throws SQLException, YarnException { + // procedure call parameter preparation + CallableStatement cstmt = sqlFederationStateStore.getCallableStatement(procedure); + cstmt.setString("reservationId_IN", reservationId); + cstmt.setString("homeSubCluster_IN", subHomeClusterId); + cstmt.registerOutParameter("storedHomeSubCluster_OUT", java.sql.Types.VARCHAR); + cstmt.registerOutParameter("rowCount_OUT", java.sql.Types.INTEGER); + + // execute procedure + cstmt.executeUpdate(); + + // get call result + String dbStoredHomeSubCluster = cstmt.getString("storedHomeSubCluster_OUT"); + int dbRowCount = cstmt.getInt("rowCount_OUT"); + + // return cstmt to pool + FederationStateStoreUtils.returnToPool(LOG, cstmt); + + return new ReservationHomeSC(reservationId, dbStoredHomeSubCluster, dbRowCount); } - @Test(expected = NotImplementedException.class) - public void testAddReservationHomeSubClusterAppAlreadyExistsInTheSameSC() throws Exception { - super.testAddReservationHomeSubClusterAppAlreadyExistsInTheSameSC(); + private ReservationHomeSC getReservationHomeSubCluster( + SQLFederationStateStore sqlFederationStateStore, String procedure, + String reservationId) throws SQLException, YarnException { + + // procedure call parameter preparation + CallableStatement cstmt = sqlFederationStateStore.getCallableStatement(procedure); + cstmt.setString("reservationId_IN", reservationId.toString()); + cstmt.registerOutParameter("homeSubCluster_OUT", java.sql.Types.VARCHAR); + + // execute procedure + cstmt.execute(); + + // get call result + String dBSubClusterHomeId = cstmt.getString("homeSubCluster_OUT"); + + // return cstmt to pool + FederationStateStoreUtils.returnToPool(LOG, cstmt); + + // returns the ReservationHomeSubCluster object + return new ReservationHomeSC(reservationId, dBSubClusterHomeId, 0); } - @Test(expected = NotImplementedException.class) - public void testDeleteReservationHomeSubCluster() throws Exception { - super.testDeleteReservationHomeSubCluster(); + private List getReservationsHomeSubCluster( + SQLFederationStateStore sqlFederationStateStore, String procedure) + throws SQLException, IOException, YarnException { + + List results = new ArrayList<>(); + + // procedure call parameter preparation + CallableStatement cstmt = sqlFederationStateStore.getCallableStatement(procedure); + + // execute procedure + ResultSet rs = cstmt.executeQuery(); + while (rs.next()) { + // 1)OUT reservationId + String dbReservationId = rs.getString("reservationId"); + + // 2)OUT homeSubCluster + String dbHomeSubCluster = rs.getString("homeSubCluster"); + results.add(new ReservationHomeSC(dbReservationId, dbHomeSubCluster, 0)); + } + + // return cstmt to pool + FederationStateStoreUtils.returnToPool(LOG, cstmt); + + // return ReservationHomeSubCluster List + return results; } - @Test(expected = NotImplementedException.class) - public void testDeleteReservationHomeSubClusterUnknownApp() throws Exception { - super.testDeleteReservationHomeSubClusterUnknownApp(); + private ReservationHomeSC updateReservationHomeSubCluster( + SQLFederationStateStore sqlFederationStateStore, String procedure, + String reservationId, String subHomeClusterId) + throws SQLException, IOException { + + // procedure call parameter preparation + CallableStatement cstmt = sqlFederationStateStore.getCallableStatement(procedure); + + // 1)IN reservationId_IN varchar(128) + cstmt.setString("reservationId_IN", reservationId); + // 2)IN homeSubCluster_IN varchar(256) + cstmt.setString("homeSubCluster_IN", subHomeClusterId); + // 3)OUT rowCount_OUT int + cstmt.registerOutParameter("rowCount_OUT", java.sql.Types.INTEGER); + + // execute procedure + cstmt.executeUpdate(); + + // get rowcount + int rowCount = cstmt.getInt("rowCount_OUT"); + + // returns the ReservationHomeSubCluster object + return new ReservationHomeSC(reservationId, subHomeClusterId, rowCount); } - @Test(expected = NotImplementedException.class) - public void testUpdateReservationHomeSubCluster() throws Exception { - super.testUpdateReservationHomeSubCluster(); + private ReservationHomeSC deleteReservationHomeSubCluster( + SQLFederationStateStore sqlFederationStateStore, String procedure, + String reservationId) throws SQLException { + // procedure call parameter preparation + CallableStatement cstmt = sqlFederationStateStore.getCallableStatement(procedure); + + // Set the parameters for the stored procedure + // 1)IN reservationId_IN varchar(128) + cstmt.setString("reservationId_IN", reservationId); + // 2)OUT rowCount_OUT int + cstmt.registerOutParameter("rowCount_OUT", java.sql.Types.INTEGER); + + // execute procedure + cstmt.executeUpdate(); + + // get rowcount + int rowCount = cstmt.getInt("rowCount_OUT"); + + // returns the ReservationHomeSubCluster object + return new ReservationHomeSC(reservationId, "-", rowCount); } - @Test(expected = NotImplementedException.class) - public void testUpdateReservationHomeSubClusterUnknownApp() throws Exception { - super.testUpdateReservationHomeSubClusterUnknownApp(); + /** + * This test case is used to check whether the procedure + * sp_addReservationHomeSubCluster can be executed normally. + * + * This test case will write 1 record to the database, and check returns the result. + * + * @throws Exception when the error occurs + */ + @Test + public void testCheckAddReservationHomeSubCluster() throws Exception { + FederationStateStore stateStore = getStateStore(); + Assert.assertTrue(stateStore instanceof SQLFederationStateStore); + + SQLFederationStateStore sqlFederationStateStore = (SQLFederationStateStore) stateStore; + + // procedure call parameter preparation + ReservationId reservationId = ReservationId.newInstance(Time.now(), 1); + String subHomeClusterId = "SC-1"; + ReservationHomeSC resultHC = addReservationHomeSubCluster(sqlFederationStateStore, + CALL_SP_ADD_RESERVATION_HOME_SUBCLUSTER, reservationId.toString(), subHomeClusterId); + + // validation results + Assert.assertNotNull(resultHC); + Assert.assertEquals(subHomeClusterId, resultHC.subHomeClusterId); + Assert.assertEquals(1, resultHC.dbUpdateCount); + } + + /** + * This test case is used to check whether the procedure + * sp_getReservationHomeSubCluster can be executed normally. + * + * Query according to reservationId, expect accurate query results, + * and check the homeSubCluster field. + * + * @throws Exception when the error occurs + */ + @Test + public void testCheckGetReservationHomeSubCluster() throws Exception { + FederationStateStore stateStore = getStateStore(); + Assert.assertTrue(stateStore instanceof SQLFederationStateStore); + + SQLFederationStateStore sqlFederationStateStore = (SQLFederationStateStore) stateStore; + + // procedure call parameter preparation + ReservationId reservationId = ReservationId.newInstance(Time.now(), 1); + String subHomeClusterId = "SC-1"; + addReservationHomeSubCluster(sqlFederationStateStore, + CALL_SP_ADD_RESERVATION_HOME_SUBCLUSTER, reservationId.toString(), subHomeClusterId); + + // Call getReservationHomeSubCluster to get the result + ReservationHomeSC resultHC = getReservationHomeSubCluster(sqlFederationStateStore, + CALL_SP_GET_RESERVATION_HOME_SUBCLUSTER, reservationId.toString()); + + Assert.assertNotNull(resultHC); + Assert.assertEquals(subHomeClusterId, resultHC.subHomeClusterId); + Assert.assertEquals(reservationId.toString(), resultHC.reservationId); + } + + /** + * This test case is used to check whether the procedure + * sp_getReservationsHomeSubCluster can be executed normally. + * + * This test case will write 2 record to the database, and check returns the result. + * + * The test case will compare the number of returned records from the database + * and whether the content of each returned record is accurate. + * + * @throws Exception when the error occurs + */ + @Test + public void testCheckGetReservationsHomeSubCluster() throws Exception { + FederationStateStore stateStore = getStateStore(); + Assert.assertTrue(stateStore instanceof SQLFederationStateStore); + + SQLFederationStateStore sqlFederationStateStore = (SQLFederationStateStore) stateStore; + + // add 1st record + ReservationId reservationId1 = ReservationId.newInstance(Time.now(), 1); + String subHomeClusterId1 = "SC-1"; + addReservationHomeSubCluster(sqlFederationStateStore, + CALL_SP_ADD_RESERVATION_HOME_SUBCLUSTER, reservationId1.toString(), subHomeClusterId1); + + // add 2nd record + ReservationId reservationId2 = ReservationId.newInstance(Time.now(), 2); + String subHomeClusterId2 = "SC-2"; + addReservationHomeSubCluster(sqlFederationStateStore, + CALL_SP_ADD_RESERVATION_HOME_SUBCLUSTER, reservationId2.toString(), subHomeClusterId2); + + List reservationHomeSubClusters = getReservationsHomeSubCluster( + sqlFederationStateStore, CALL_SP_GET_RESERVATIONS_HOME_SUBCLUSTER); + + Assert.assertNotNull(reservationHomeSubClusters); + Assert.assertEquals(2, reservationHomeSubClusters.size()); + + ReservationHomeSC resultHC1 = reservationHomeSubClusters.get(0); + Assert.assertNotNull(resultHC1); + Assert.assertEquals(reservationId1.toString(), resultHC1.reservationId); + Assert.assertEquals(subHomeClusterId1, resultHC1.subHomeClusterId); + + ReservationHomeSC resultHC2 = reservationHomeSubClusters.get(1); + Assert.assertNotNull(resultHC2); + Assert.assertEquals(reservationId2.toString(), resultHC2.reservationId); + Assert.assertEquals(subHomeClusterId2, resultHC2.subHomeClusterId); + } + + /** + * This test case is used to check whether the procedure + * sp_updateReservationHomeSubCluster can be executed normally. + * + * This test case will first insert 1 record into the database, + * and then update the new SubHomeClusterId according to the reservationId. + * + * It will check whether the SubHomeClusterId is as expected after the addition and update. + * For the first time, the HomeClusterId of the database should be SC-1, + * and for the second time, the HomeClusterId of the database should be SC-2. + * + * @throws Exception when the error occurs + */ + @Test + public void testCheckUpdateReservationHomeSubCluster() throws Exception { + FederationStateStore stateStore = getStateStore(); + Assert.assertTrue(stateStore instanceof SQLFederationStateStore); + + SQLFederationStateStore sqlFederationStateStore = (SQLFederationStateStore) stateStore; + + // procedure call parameter preparation + ReservationId reservationId = ReservationId.newInstance(Time.now(), 1); + String subHomeClusterId = "SC-1"; + addReservationHomeSubCluster(sqlFederationStateStore, + CALL_SP_ADD_RESERVATION_HOME_SUBCLUSTER, reservationId.toString(), subHomeClusterId); + + // verify that the subHomeClusterId corresponding to reservationId is SC-1 + ReservationHomeSC resultHC = getReservationHomeSubCluster(sqlFederationStateStore, + CALL_SP_GET_RESERVATION_HOME_SUBCLUSTER, reservationId.toString()); + Assert.assertNotNull(resultHC); + Assert.assertEquals(subHomeClusterId, resultHC.subHomeClusterId); + + // prepare to update parameters + String newSubHomeClusterId = "SC-2"; + ReservationHomeSC reservationHomeSubCluster = + updateReservationHomeSubCluster(sqlFederationStateStore, + CALL_SP_UPDATE_RESERVATION_HOME_SUBCLUSTER, reservationId.toString(), newSubHomeClusterId); + + Assert.assertNotNull(reservationHomeSubCluster); + Assert.assertEquals(1, reservationHomeSubCluster.dbUpdateCount); + + // verify that the subHomeClusterId corresponding to reservationId is SC-2 + ReservationHomeSC resultHC2 = getReservationHomeSubCluster(sqlFederationStateStore, + CALL_SP_GET_RESERVATION_HOME_SUBCLUSTER, reservationId.toString()); + Assert.assertNotNull(resultHC2); + Assert.assertEquals(newSubHomeClusterId, resultHC2.subHomeClusterId); + } + + /** + * This test case is used to check whether the procedure + * sp_deleteReservationHomeSubCluster can be executed normally. + * + * This test case will first write 1 record to the database, + * and then delete the corresponding record according to reservationId. + * + * Query the corresponding homeSubCluster according to reservationId, + * we should get a NULL at this time. + * + * @throws Exception when the error occurs + */ + @Test + public void testCheckDeleteReservationHomeSubCluster() throws Exception { + FederationStateStore stateStore = getStateStore(); + Assert.assertTrue(stateStore instanceof SQLFederationStateStore); + + SQLFederationStateStore sqlFederationStateStore = (SQLFederationStateStore) stateStore; + + // procedure call parameter preparation + ReservationId reservationId = ReservationId.newInstance(Time.now(), 1); + String subHomeClusterId = "SC-1"; + addReservationHomeSubCluster(sqlFederationStateStore, + CALL_SP_ADD_RESERVATION_HOME_SUBCLUSTER, reservationId.toString(), subHomeClusterId); + + // call the delete method of the reservation + ReservationHomeSC resultHC = deleteReservationHomeSubCluster(sqlFederationStateStore, + CALL_SP_DELETE_RESERVATION_HOME_SUBCLUSTER, reservationId.toString()); + + Assert.assertNotNull(resultHC); + Assert.assertEquals(1, resultHC.dbUpdateCount); + + // call getReservationHomeSubCluster to get the result + ReservationHomeSC resultHC1 = getReservationHomeSubCluster(sqlFederationStateStore, + CALL_SP_GET_RESERVATION_HOME_SUBCLUSTER, reservationId.toString()); + Assert.assertNotNull(resultHC1); + Assert.assertEquals(null, resultHC1.subHomeClusterId); + } + + /** + * This test case is used to verify the processing logic of the incorrect number of + * updated records returned by the database when AddReservationHomeSubCluster is used. + * + * The probability of the database returning an update record greater than 1 is very low. + * + * @throws Exception when the error occurs + */ + @Test + public void testAddReservationHomeSubClusterAbnormalSituation() throws Exception { + FederationStateStore stateStore = getStateStore(); + Assert.assertTrue(stateStore instanceof SQLFederationStateStore); + + SQLFederationStateStore sqlFederationStateStore = (SQLFederationStateStore) stateStore; + + Connection conn = sqlFederationStateStore.conn; + conn.prepareStatement(SP_DROP_ADDRESERVATIONHOMESUBCLUSTER).execute(); + conn.prepareStatement(SP_ADDRESERVATIONHOMESUBCLUSTER2).execute(); + + ReservationId reservationId = ReservationId.newInstance(Time.now(), 1); + SubClusterId subClusterId = SubClusterId.newInstance("SC"); + + ReservationHomeSubCluster reservationHomeSubCluster = + ReservationHomeSubCluster.newInstance(reservationId, subClusterId); + AddReservationHomeSubClusterRequest request = + AddReservationHomeSubClusterRequest.newInstance(reservationHomeSubCluster); + + String errorMsg = String.format( + "Wrong behavior during the insertion of subCluster %s according to reservation %s. " + + "The database expects to insert 1 record, but the number of " + + "inserted changes is greater than 1, " + + "please check the records of the database.", subClusterId, reservationId); + + LambdaTestUtils.intercept(YarnException.class, errorMsg, + () -> stateStore.addReservationHomeSubCluster(request)); + } + + /** + * This test case is used to verify the logic when calling the updateReservationHomeSubCluster + * method if the database returns an inaccurate result. + * + * The probability of the database returning an update record greater than 1 is very low. + * + * @throws Exception when the error occurs + */ + @Test + public void testUpdateReservationHomeSubClusterAbnormalSituation() throws Exception { + FederationStateStore stateStore = getStateStore(); + Assert.assertTrue(stateStore instanceof SQLFederationStateStore); + + SQLFederationStateStore sqlFederationStateStore = (SQLFederationStateStore) stateStore; + + Connection conn = sqlFederationStateStore.conn; + conn.prepareStatement(SP_DROP_UPDATERESERVATIONHOMESUBCLUSTER).execute(); + conn.prepareStatement(SP_UPDATERESERVATIONHOMESUBCLUSTER2).execute(); + + ReservationId reservationId = ReservationId.newInstance(Time.now(), 1); + SubClusterId subClusterId1 = SubClusterId.newInstance("SC"); + + // add Reservation data. + ReservationHomeSubCluster reservationHomeSubCluster = + ReservationHomeSubCluster.newInstance(reservationId, subClusterId1); + AddReservationHomeSubClusterRequest addRequest = + AddReservationHomeSubClusterRequest.newInstance(reservationHomeSubCluster); + stateStore.addReservationHomeSubCluster(addRequest); + + SubClusterId subClusterId2 = SubClusterId.newInstance("SC2"); + ReservationHomeSubCluster reservationHomeSubCluster2 = + ReservationHomeSubCluster.newInstance(reservationId, subClusterId2); + UpdateReservationHomeSubClusterRequest updateRequest = + UpdateReservationHomeSubClusterRequest.newInstance(reservationHomeSubCluster2); + + String errorMsg = String.format( + "Wrong behavior during update the subCluster %s according to reservation %s. " + + "The database is expected to update 1 record, " + + "but the number of database update records is greater than 1, " + + "the records of the database should be checked.", + subClusterId2, reservationId); + + LambdaTestUtils.intercept(YarnException.class, errorMsg, + () -> stateStore.updateReservationHomeSubCluster(updateRequest)); + } + + /** + * This test case is used to verify the logic when calling the deleteReservationHomeSubCluster + * method if the database returns an inaccurate result. + * + * The probability of the database returning an update record greater than 1 is very low. + * + * @throws Exception when the error occurs + */ + @Test + public void testDeleteReservationHomeSubClusterAbnormalSituation() throws Exception { + FederationStateStore stateStore = getStateStore(); + Assert.assertTrue(stateStore instanceof SQLFederationStateStore); + + SQLFederationStateStore sqlFederationStateStore = (SQLFederationStateStore) stateStore; + + Connection conn = sqlFederationStateStore.conn; + conn.prepareStatement(SP_DROP_DELETERESERVATIONHOMESUBCLUSTER).execute(); + conn.prepareStatement(SP_DELETERESERVATIONHOMESUBCLUSTER2).execute(); + + ReservationId reservationId = ReservationId.newInstance(Time.now(), 1); + SubClusterId subClusterId1 = SubClusterId.newInstance("SC"); + + // add Reservation data. + ReservationHomeSubCluster reservationHomeSubCluster = + ReservationHomeSubCluster.newInstance(reservationId, subClusterId1); + AddReservationHomeSubClusterRequest addRequest = + AddReservationHomeSubClusterRequest.newInstance(reservationHomeSubCluster); + stateStore.addReservationHomeSubCluster(addRequest); + + DeleteReservationHomeSubClusterRequest delRequest = + DeleteReservationHomeSubClusterRequest.newInstance(reservationId); + + String errorMsg = String.format( + "Wrong behavior during deleting the reservation %s. " + + "The database is expected to delete 1 record, " + + "but the number of deleted records returned by the database is greater than 1, " + + "indicating that a duplicate reservationId occurred during the deletion process.", + reservationId); + + LambdaTestUtils.intercept(YarnException.class, errorMsg, + () -> stateStore.deleteReservationHomeSubCluster(delRequest)); } } \ No newline at end of file