YARN-11273. Federation StateStore: Support storage/retrieval of Reservations With SQL. (#4817)

This commit is contained in:
slfan1989 2022-09-03 01:39:58 +08:00 committed by GitHub
parent b266f852d7
commit 1965708d49
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 1255 additions and 29 deletions

View File

@ -159,4 +159,53 @@ BEGIN
FROM policies WHERE queue = queue_IN; FROM policies WHERE queue = queue_IN;
END // END //
CREATE PROCEDURE sp_addReservationHomeSubCluster(
IN reservationId_IN varchar(128), IN homeSubCluster_IN varchar(256),
OUT storedHomeSubCluster_OUT varchar(256), OUT rowCount_OUT int)
BEGIN
INSERT INTO reservationsHomeSubCluster
(reservationId,homeSubCluster)
(SELECT reservationId_IN, homeSubCluster_IN
FROM applicationsHomeSubCluster
WHERE reservationId = reservationId_IN
HAVING COUNT(*) = 0 );
SELECT ROW_COUNT() INTO rowCount_OUT;
SELECT homeSubCluster INTO storedHomeSubCluster_OUT
FROM reservationsHomeSubCluster
WHERE applicationId = reservationId_IN;
END //
CREATE PROCEDURE sp_getReservationHomeSubCluster(
IN reservationId_IN varchar(128),
OUT homeSubCluster_OUT varchar(256))
BEGIN
SELECT homeSubCluster INTO homeSubCluster_OUT
FROM reservationsHomeSubCluster
WHERE reservationId = reservationId_IN;
END //
CREATE PROCEDURE sp_getReservationsHomeSubCluster()
BEGIN
SELECT reservationId, homeSubCluster
FROM reservationsHomeSubCluster;
END //
CREATE PROCEDURE sp_updateReservationHomeSubCluster(
IN reservationId_IN varchar(128),
IN homeSubCluster_IN varchar(256), OUT rowCount_OUT int)
BEGIN
UPDATE reservationsHomeSubCluster
SET homeSubCluster = homeSubCluster_IN
WHERE reservationId = reservationId_IN;
SELECT ROW_COUNT() INTO rowCount_OUT;
END //
CREATE PROCEDURE sp_deleteReservationHomeSubCluster(
IN reservationId_IN varchar(128), OUT rowCount_OUT int)
BEGIN
DELETE FROM reservationsHomeSubCluster
WHERE reservationId = reservationId_IN;
SELECT ROW_COUNT() INTO rowCount_OUT;
END //
DELIMITER ; DELIMITER ;

View File

@ -46,3 +46,9 @@ CREATE TABLE policies(
params varbinary(32768), params varbinary(32768),
CONSTRAINT pk_queue PRIMARY KEY (queue) CONSTRAINT pk_queue PRIMARY KEY (queue)
); );
CREATE TABLE reservationsHomeSubCluster (
reservationId varchar(128) NOT NULL,
homeSubCluster varchar(256) NOT NULL,
CONSTRAINT pk_reservationId PRIMARY KEY (reservationId)
);

View File

@ -45,3 +45,13 @@ DROP PROCEDURE sp_setPolicyConfiguration;
DROP PROCEDURE sp_getPolicyConfiguration; DROP PROCEDURE sp_getPolicyConfiguration;
DROP PROCEDURE sp_getPoliciesConfigurations; DROP PROCEDURE sp_getPoliciesConfigurations;
DROP PROCEDURE sp_addReservationHomeSubCluster;
DROP PROCEDURE sp_getReservationHomeSubCluster;
DROP PROCEDURE sp_getReservationsHomeSubCluster;
DROP PROCEDURE sp_deleteReservationHomeSubCluster;
DROP PROCEDURE sp_updateReservationHomeSubCluster;

View File

@ -25,3 +25,5 @@ DROP TABLE applicationsHomeSubCluster;
DROP TABLE membership; DROP TABLE membership;
DROP TABLE policies; DROP TABLE policies;
DROP TABLE reservationsHomeSubCluster;

View File

@ -509,3 +509,180 @@ AS BEGIN
END CATCH END CATCH
END; END;
GO GO
IF OBJECT_ID ( '[sp_addApplicationHomeSubCluster]', 'P' ) IS NOT NULL
DROP PROCEDURE [sp_addApplicationHomeSubCluster];
GO
CREATE PROCEDURE [dbo].[sp_addReservationHomeSubCluster]
@reservationId VARCHAR(128),
@homeSubCluster VARCHAR(256),
@storedHomeSubCluster VARCHAR(256) OUTPUT,
@rowCount int OUTPUT
AS BEGIN
DECLARE @errorMessage nvarchar(4000)
BEGIN TRY
BEGIN TRAN
-- If application to sub-cluster map doesn't exist, insert it.
-- Otherwise don't change the current mapping.
IF NOT EXISTS (SELECT TOP 1 *
FROM [dbo].[reservationsHomeSubCluster]
WHERE [reservationId] = @reservationId)
INSERT INTO [dbo].[reservationsHomeSubCluster] (
[reservationId],
[homeSubCluster])
VALUES (
@reservationId,
@homeSubCluster);
-- End of the IF block
SELECT @rowCount = @@ROWCOUNT;
SELECT @storedHomeSubCluster = [homeSubCluster]
FROM [dbo].[reservationsHomeSubCluster]
WHERE [reservationId] = @reservationId;
COMMIT TRAN
END TRY
BEGIN CATCH
ROLLBACK TRAN
SET @errorMessage = dbo.func_FormatErrorMessage(ERROR_MESSAGE(), ERROR_LINE())
/* raise error and terminate the execution */
RAISERROR(@errorMessage, --- Error Message
1, -- Severity
-1 -- State
) WITH log
END CATCH
END;
GO
IF OBJECT_ID ( '[sp_updateReservationHomeSubCluster]', 'P' ) IS NOT NULL
DROP PROCEDURE [sp_updateReservationHomeSubCluster];
GO
CREATE PROCEDURE [dbo].[sp_updateReservationHomeSubCluster]
@reservationId VARCHAR(128),
@homeSubCluster VARCHAR(256),
@rowCount int OUTPUT
AS BEGIN
DECLARE @errorMessage nvarchar(4000)
BEGIN TRY
BEGIN TRAN
UPDATE [dbo].[reservationsHomeSubCluster]
SET [homeSubCluster] = @homeSubCluster
WHERE [reservationId] = @reservationId;
SELECT @rowCount = @@ROWCOUNT;
COMMIT TRAN
END TRY
BEGIN CATCH
ROLLBACK TRAN
SET @errorMessage = dbo.func_FormatErrorMessage(ERROR_MESSAGE(), ERROR_LINE())
/* raise error and terminate the execution */
RAISERROR(@errorMessage, --- Error Message
1, -- Severity
-1 -- State
) WITH log
END CATCH
END;
GO
IF OBJECT_ID ( '[sp_getReservationsHomeSubCluster]', 'P' ) IS NOT NULL
DROP PROCEDURE [sp_getReservationsHomeSubCluster];
GO
CREATE PROCEDURE [dbo].[sp_getReservationsHomeSubCluster]
AS BEGIN
DECLARE @errorMessage nvarchar(4000)
BEGIN TRY
SELECT [reservationId], [homeSubCluster], [createTime]
FROM [dbo].[reservationsHomeSubCluster]
END TRY
BEGIN CATCH
SET @errorMessage = dbo.func_FormatErrorMessage(ERROR_MESSAGE(), ERROR_LINE())
/* raise error and terminate the execution */
RAISERROR(@errorMessage, --- Error Message
1, -- Severity
-1 -- State
) WITH log
END CATCH
END;
GO
IF OBJECT_ID ( '[sp_getReservationHomeSubCluster]', 'P' ) IS NOT NULL
DROP PROCEDURE [sp_getReservationHomeSubCluster];
GO
CREATE PROCEDURE [dbo].[sp_getReservationHomeSubCluster]
@reservationId VARCHAR(128),
@homeSubCluster VARCHAR(256) OUTPUT
AS BEGIN
DECLARE @errorMessage nvarchar(4000)
BEGIN TRY
SELECT @homeSubCluster = [homeSubCluster]
FROM [dbo].[reservationsHomeSubCluster]
WHERE [reservationId] = @reservationId;
END TRY
BEGIN CATCH
SET @errorMessage = dbo.func_FormatErrorMessage(ERROR_MESSAGE(), ERROR_LINE())
/* raise error and terminate the execution */
RAISERROR(@errorMessage, --- Error Message
1, -- Severity
-1 -- State
) WITH log
END CATCH
END;
GO
IF OBJECT_ID ( '[sp_deleteReservationHomeSubCluster]', 'P' ) IS NOT NULL
DROP PROCEDURE [sp_deleteReservationHomeSubCluster];
GO
CREATE PROCEDURE [dbo].[sp_deleteReservationHomeSubCluster]
@reservationId VARCHAR(128),
@rowCount int OUTPUT
AS BEGIN
DECLARE @errorMessage nvarchar(4000)
BEGIN TRY
BEGIN TRAN
DELETE FROM [dbo].[reservationsHomeSubCluster]
WHERE [reservationId] = @reservationId;
SELECT @rowCount = @@ROWCOUNT;
COMMIT TRAN
END TRY
BEGIN CATCH
ROLLBACK TRAN
SET @errorMessage = dbo.func_FormatErrorMessage(ERROR_MESSAGE(), ERROR_LINE())
/* raise error and terminate the execution */
RAISERROR(@errorMessage, --- Error Message
1, -- Severity
-1 -- State
) WITH log
END CATCH
END;
GO

View File

@ -124,3 +124,35 @@ ELSE
PRINT 'Table policies exists, no operation required...' PRINT 'Table policies exists, no operation required...'
GO GO
GO GO
IF NOT EXISTS ( SELECT * FROM [FederationStateStore].sys.tables
WHERE name = 'reservationsHomeSubCluster'
AND schema_id = SCHEMA_ID('dbo'))
BEGIN
PRINT 'Table reservationsHomeSubCluster does not exist, create it...'
SET ANSI_NULLS ON
SET QUOTED_IDENTIFIER ON
SET ANSI_PADDING ON
CREATE TABLE [dbo].[reservationsHomeSubCluster](
reservationId VARCHAR(128) COLLATE Latin1_General_100_BIN2 NOT NULL,
homeSubCluster VARCHAR(256) NOT NULL,
createTime DATETIME2 NOT NULL CONSTRAINT ts_createAppTime DEFAULT GETUTCDATE(),
CONSTRAINT [pk_reservationId] PRIMARY KEY
(
[reservationId]
)
)
SET ANSI_PADDING OFF
PRINT 'Table reservationsHomeSubCluster created.'
END
ELSE
PRINT 'Table reservationsHomeSubCluster exists, no operation required...'
GO
GO

View File

@ -30,8 +30,10 @@ import java.util.List;
import java.util.TimeZone; import java.util.TimeZone;
import org.apache.commons.lang3.NotImplementedException; import org.apache.commons.lang3.NotImplementedException;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
@ -78,10 +80,12 @@ import org.apache.hadoop.yarn.server.federation.store.records.DeleteReservationH
import org.apache.hadoop.yarn.server.federation.store.records.DeleteReservationHomeSubClusterResponse; import org.apache.hadoop.yarn.server.federation.store.records.DeleteReservationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.UpdateReservationHomeSubClusterRequest; import org.apache.hadoop.yarn.server.federation.store.records.UpdateReservationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.UpdateReservationHomeSubClusterResponse; import org.apache.hadoop.yarn.server.federation.store.records.UpdateReservationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.ReservationHomeSubCluster;
import org.apache.hadoop.yarn.server.federation.store.utils.FederationApplicationHomeSubClusterStoreInputValidator; import org.apache.hadoop.yarn.server.federation.store.utils.FederationApplicationHomeSubClusterStoreInputValidator;
import org.apache.hadoop.yarn.server.federation.store.utils.FederationMembershipStateStoreInputValidator; import org.apache.hadoop.yarn.server.federation.store.utils.FederationMembershipStateStoreInputValidator;
import org.apache.hadoop.yarn.server.federation.store.utils.FederationPolicyStoreInputValidator; import org.apache.hadoop.yarn.server.federation.store.utils.FederationPolicyStoreInputValidator;
import org.apache.hadoop.yarn.server.federation.store.utils.FederationStateStoreUtils; import org.apache.hadoop.yarn.server.federation.store.utils.FederationStateStoreUtils;
import org.apache.hadoop.yarn.server.federation.store.utils.FederationReservationHomeSubClusterStoreInputValidator;
import org.apache.hadoop.yarn.server.records.Version; import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.MonotonicClock; import org.apache.hadoop.yarn.util.MonotonicClock;
@ -140,6 +144,21 @@ public class SQLFederationStateStore implements FederationStateStore {
private static final String CALL_SP_GET_POLICIES_CONFIGURATIONS = private static final String CALL_SP_GET_POLICIES_CONFIGURATIONS =
"{call sp_getPoliciesConfigurations()}"; "{call sp_getPoliciesConfigurations()}";
protected static final String CALL_SP_ADD_RESERVATION_HOME_SUBCLUSTER =
"{call sp_addReservationHomeSubCluster(?, ?, ?, ?)}";
protected static final String CALL_SP_GET_RESERVATION_HOME_SUBCLUSTER =
"{call sp_getReservationHomeSubCluster(?, ?)}";
protected static final String CALL_SP_GET_RESERVATIONS_HOME_SUBCLUSTER =
"{call sp_getReservationsHomeSubCluster()}";
protected static final String CALL_SP_DELETE_RESERVATION_HOME_SUBCLUSTER =
"{call sp_deleteReservationHomeSubCluster(?, ?)}";
protected static final String CALL_SP_UPDATE_RESERVATION_HOME_SUBCLUSTER =
"{call sp_updateReservationHomeSubCluster(?, ?, ?)}";
private Calendar utcCalendar = private Calendar utcCalendar =
Calendar.getInstance(TimeZone.getTimeZone("UTC")); Calendar.getInstance(TimeZone.getTimeZone("UTC"));
@ -997,12 +1016,14 @@ public class SQLFederationStateStore implements FederationStateStore {
* @return a connection from the DataSource pool. * @return a connection from the DataSource pool.
* @throws SQLException on failure * @throws SQLException on failure
*/ */
public Connection getConnection() throws SQLException { @VisibleForTesting
protected Connection getConnection() throws SQLException {
FederationStateStoreClientMetrics.incrConnections(); FederationStateStoreClientMetrics.incrConnections();
return dataSource.getConnection(); return dataSource.getConnection();
} }
private CallableStatement getCallableStatement(String procedure) @VisibleForTesting
protected CallableStatement getCallableStatement(String procedure)
throws SQLException { throws SQLException {
return conn.prepareCall(procedure); return conn.prepareCall(procedure);
} }
@ -1016,30 +1037,352 @@ public class SQLFederationStateStore implements FederationStateStore {
@Override @Override
public AddReservationHomeSubClusterResponse addReservationHomeSubCluster( public AddReservationHomeSubClusterResponse addReservationHomeSubCluster(
AddReservationHomeSubClusterRequest request) throws YarnException { AddReservationHomeSubClusterRequest request) throws YarnException {
throw new NotImplementedException("Code is not implemented");
// validate
FederationReservationHomeSubClusterStoreInputValidator.validate(request);
CallableStatement cstmt = null;
ReservationHomeSubCluster reservationHomeSubCluster = request.getReservationHomeSubCluster();
ReservationId reservationId = reservationHomeSubCluster.getReservationId();
SubClusterId subClusterId = reservationHomeSubCluster.getHomeSubCluster();
SubClusterId subClusterHomeId = null;
try {
// Defined the sp_addReservationHomeSubCluster procedure
// this procedure requires 4 parameters
// Input parameters
// 1IN reservationId_IN varchar(128)
// 2IN homeSubCluster_IN varchar(256)
// Output parameters
// 3OUT storedHomeSubCluster_OUT varchar(256)
// 4OUT rowCount_OUT int
// Call procedure
cstmt = getCallableStatement(CALL_SP_ADD_RESERVATION_HOME_SUBCLUSTER);
// Set the parameters for the stored procedure
// 1IN reservationId_IN varchar(128)
cstmt.setString("reservationId_IN", reservationId.toString());
// 2IN homeSubCluster_IN varchar(256)
cstmt.setString("homeSubCluster_IN", subClusterId.getId());
// 3) OUT storedHomeSubCluster_OUT varchar(256)
cstmt.registerOutParameter("storedHomeSubCluster_OUT", java.sql.Types.VARCHAR);
// 4) OUT rowCount_OUT int
cstmt.registerOutParameter("rowCount_OUT", java.sql.Types.INTEGER);
// Execute the query
long startTime = clock.getTime();
cstmt.executeUpdate();
long stopTime = clock.getTime();
// Get SubClusterHome
String subClusterHomeIdString = cstmt.getString("storedHomeSubCluster_OUT");
subClusterHomeId = SubClusterId.newInstance(subClusterHomeIdString);
// Get rowCount
int rowCount = cstmt.getInt("rowCount_OUT");
// For failover reason, we check the returned subClusterId.
// 1.If it is equal to the subClusterId we sent, the call added the new
// reservation into FederationStateStore.
// 2.If the call returns a different subClusterId
// it means we already tried to insert this reservation
// but a component (Router/StateStore/RM) failed during the submission.
if (subClusterId.equals(subClusterHomeId)) {
// if it is equal to 0
// it means the call did not add a new reservation into FederationStateStore.
if (rowCount == 0) {
LOG.info("The reservation {} was not inserted in the StateStore because it" +
" was already present in subCluster {}", reservationId, subClusterHomeId);
} else if (rowCount != 1) {
// if it is different from 1
// it means the call had a wrong behavior. Maybe the database is not set correctly.
FederationStateStoreUtils.logAndThrowStoreException(LOG,
"Wrong behavior during the insertion of subCluster %s according to reservation %s. " +
"The database expects to insert 1 record, but the number of " +
"inserted changes is greater than 1, " +
"please check the records of the database.",
subClusterId, reservationId);
}
} else {
// If it is different from 0,
// it means that there is a data situation that does not meet the expectations,
// and an exception should be thrown at this time
if (rowCount != 0) {
FederationStateStoreUtils.logAndThrowStoreException(LOG,
"The reservation %s does exist but was overwritten.", reservationId);
}
LOG.info("Reservation: {} already present with subCluster: {}.",
reservationId, subClusterHomeId);
}
// Record successful call time
FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - startTime);
} catch (SQLException e) {
FederationStateStoreClientMetrics.failedStateStoreCall();
FederationStateStoreUtils.logAndThrowRetriableException(e, LOG,
"Unable to insert the newly generated reservation %s to subCluster %s.",
reservationId, subClusterId);
} finally {
// Return to the pool the CallableStatement
FederationStateStoreUtils.returnToPool(LOG, cstmt);
}
return AddReservationHomeSubClusterResponse.newInstance(subClusterHomeId);
} }
@Override @Override
public GetReservationHomeSubClusterResponse getReservationHomeSubCluster( public GetReservationHomeSubClusterResponse getReservationHomeSubCluster(
GetReservationHomeSubClusterRequest request) throws YarnException { GetReservationHomeSubClusterRequest request) throws YarnException {
throw new NotImplementedException("Code is not implemented"); // validate
FederationReservationHomeSubClusterStoreInputValidator.validate(request);
CallableStatement cstmt = null;
ReservationId reservationId = request.getReservationId();
SubClusterId subClusterId = null;
try {
// Defined the sp_getReservationHomeSubCluster procedure
// this procedure requires 2 parameters
// Input parameters
// 1IN reservationId_IN varchar(128)
// Output parameters
// 2OUT homeSubCluster_OUT varchar(256)
cstmt = getCallableStatement(CALL_SP_GET_RESERVATION_HOME_SUBCLUSTER);
// Set the parameters for the stored procedure
// 1IN reservationId_IN varchar(128)
cstmt.setString("reservationId_IN", reservationId.toString());
// 2OUT homeSubCluster_OUT varchar(256)
cstmt.registerOutParameter("homeSubCluster_OUT", java.sql.Types.VARCHAR);
// Execute the query
long startTime = clock.getTime();
cstmt.execute();
long stopTime = clock.getTime();
// Get Result
String subClusterHomeIdString = cstmt.getString("homeSubCluster_OUT");
if (StringUtils.isNotBlank(subClusterHomeIdString)) {
subClusterId = SubClusterId.newInstance(subClusterHomeIdString);
} else {
// If subClusterHomeIdString blank, we need to throw an exception
FederationStateStoreUtils.logAndThrowRetriableException(LOG,
"Reservation %s does not exist", reservationId);
}
LOG.info("Got the information about the specified reservation {} in subCluster = {}.",
reservationId, subClusterId);
FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - startTime);
ReservationHomeSubCluster homeSubCluster =
ReservationHomeSubCluster.newInstance(reservationId, subClusterId);
return GetReservationHomeSubClusterResponse.newInstance(homeSubCluster);
} catch (SQLException e) {
FederationStateStoreClientMetrics.failedStateStoreCall();
FederationStateStoreUtils.logAndThrowRetriableException(e, LOG,
"Unable to obtain the reservation information according to %s.", reservationId);
} finally {
// Return to the pool the CallableStatement
FederationStateStoreUtils.returnToPool(LOG, cstmt);
}
throw new YarnException(
"Unable to obtain the reservation information according to " + reservationId);
} }
@Override @Override
public GetReservationsHomeSubClusterResponse getReservationsHomeSubCluster( public GetReservationsHomeSubClusterResponse getReservationsHomeSubCluster(
GetReservationsHomeSubClusterRequest request) throws YarnException { GetReservationsHomeSubClusterRequest request) throws YarnException {
throw new NotImplementedException("Code is not implemented"); CallableStatement cstmt = null;
ResultSet rs = null;
List<ReservationHomeSubCluster> reservationsHomeSubClusters = new ArrayList<>();
try {
// Defined the sp_getReservationsHomeSubCluster procedure
// This procedure requires no input parameters, but will have 2 output parameters
// Output parameters
// 1OUT reservationId
// 2OUT homeSubCluster
cstmt = getCallableStatement(CALL_SP_GET_RESERVATIONS_HOME_SUBCLUSTER);
// Execute the query
long startTime = clock.getTime();
rs = cstmt.executeQuery();
long stopTime = clock.getTime();
while (rs.next()) {
// Extract the output for each tuple
// 1OUT reservationId
String dbReservationId = rs.getString("reservationId");
// 2OUT homeSubCluster
String dbHomeSubCluster = rs.getString("homeSubCluster");
// Generate parameters
ReservationId reservationId = ReservationId.parseReservationId(dbReservationId);
SubClusterId homeSubCluster = SubClusterId.newInstance(dbHomeSubCluster);
ReservationHomeSubCluster reservationHomeSubCluster =
ReservationHomeSubCluster.newInstance(reservationId, homeSubCluster);
reservationsHomeSubClusters.add(reservationHomeSubCluster);
}
FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - startTime);
return GetReservationsHomeSubClusterResponse.newInstance(
reservationsHomeSubClusters);
} catch (Exception e) {
FederationStateStoreClientMetrics.failedStateStoreCall();
FederationStateStoreUtils.logAndThrowRetriableException(LOG,
"Unable to obtain the information for all the reservations.", e);
} finally {
// Return to the pool the CallableStatement
FederationStateStoreUtils.returnToPool(LOG, cstmt, null, rs);
}
throw new YarnException("Unable to obtain the information for all the reservations.");
} }
@Override @Override
public DeleteReservationHomeSubClusterResponse deleteReservationHomeSubCluster( public DeleteReservationHomeSubClusterResponse deleteReservationHomeSubCluster(
DeleteReservationHomeSubClusterRequest request) throws YarnException { DeleteReservationHomeSubClusterRequest request) throws YarnException {
throw new NotImplementedException("Code is not implemented");
// validate
FederationReservationHomeSubClusterStoreInputValidator.validate(request);
CallableStatement cstmt = null;
ReservationId reservationId = request.getReservationId();
try {
// Defined the sp_deleteReservationHomeSubCluster procedure
// This procedure requires 1 input parameters, 1 output parameters
// Input parameters
// 1IN reservationId_IN varchar(128)
// Output parameters
// 2OUT rowCount_OUT int
cstmt = getCallableStatement(CALL_SP_DELETE_RESERVATION_HOME_SUBCLUSTER);
// Set the parameters for the stored procedure
// 1IN reservationId_IN varchar(128)
cstmt.setString("reservationId_IN", reservationId.toString());
// 2OUT rowCount_OUT int
cstmt.registerOutParameter("rowCount_OUT", java.sql.Types.INTEGER);
// Execute the query
long startTime = clock.getTime();
cstmt.executeUpdate();
long stopTime = clock.getTime();
int rowCount = cstmt.getInt("rowCount_OUT");
// if it is equal to 0 it means the call
// did not delete the reservation from FederationStateStore
if (rowCount == 0) {
FederationStateStoreUtils.logAndThrowStoreException(LOG,
"Reservation %s does not exist", reservationId);
} else if (rowCount != 1) {
// if it is different from 1 it means the call
// had a wrong behavior. Maybe the database is not set correctly.
FederationStateStoreUtils.logAndThrowStoreException(LOG,
"Wrong behavior during deleting the reservation %s. " +
"The database is expected to delete 1 record, " +
"but the number of deleted records returned by the database is greater than 1, " +
"indicating that a duplicate reservationId occurred during the deletion process.",
reservationId);
}
LOG.info("Delete from the StateStore the reservation: {}.", reservationId);
FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - startTime);
return DeleteReservationHomeSubClusterResponse.newInstance();
} catch (SQLException e) {
FederationStateStoreClientMetrics.failedStateStoreCall();
FederationStateStoreUtils.logAndThrowRetriableException(e, LOG,
"Unable to delete the reservation %s.", reservationId);
} finally {
// Return to the pool the CallableStatement
FederationStateStoreUtils.returnToPool(LOG, cstmt);
}
throw new YarnException("Unable to delete the reservation " + reservationId);
} }
@Override @Override
public UpdateReservationHomeSubClusterResponse updateReservationHomeSubCluster( public UpdateReservationHomeSubClusterResponse updateReservationHomeSubCluster(
UpdateReservationHomeSubClusterRequest request) throws YarnException { UpdateReservationHomeSubClusterRequest request) throws YarnException {
throw new NotImplementedException("Code is not implemented");
// validate
FederationReservationHomeSubClusterStoreInputValidator.validate(request);
CallableStatement cstmt = null;
ReservationHomeSubCluster reservationHomeSubCluster = request.getReservationHomeSubCluster();
ReservationId reservationId = reservationHomeSubCluster.getReservationId();
SubClusterId subClusterId = reservationHomeSubCluster.getHomeSubCluster();
try {
// Defined the sp_updateReservationHomeSubCluster procedure
// This procedure requires 2 input parameters, 1 output parameters
// Input parameters
// 1IN reservationId_IN varchar(128)
// 2IN homeSubCluster_IN varchar(256)
// Output parameters
// 3OUT rowCount_OUT int
cstmt = getCallableStatement(CALL_SP_UPDATE_RESERVATION_HOME_SUBCLUSTER);
// Set the parameters for the stored procedure
// 1IN reservationId_IN varchar(128)
cstmt.setString("reservationId_IN", reservationId.toString());
// 2IN homeSubCluster_IN varchar(256)
cstmt.setString("homeSubCluster_IN", subClusterId.getId());
// 3OUT rowCount_OUT int
cstmt.registerOutParameter("rowCount_OUT", java.sql.Types.INTEGER);
// Execute the query
long startTime = clock.getTime();
cstmt.executeUpdate();
long stopTime = clock.getTime();
int rowCount = cstmt.getInt("rowCount_OUT");
// if it is equal to 0 it means the call
// did not update the reservation into FederationStateStore
if (rowCount == 0) {
FederationStateStoreUtils.logAndThrowStoreException(LOG,
"Reservation %s does not exist", reservationId);
} else if (rowCount != 1) {
// if it is different from 1 it means the call
// had a wrong behavior. Maybe the database is not set correctly.
FederationStateStoreUtils.logAndThrowStoreException(LOG,
"Wrong behavior during update the subCluster %s according to reservation %s. " +
"The database is expected to update 1 record, " +
"but the number of database update records is greater than 1, " +
"the records of the database should be checked.",
subClusterId, reservationId);
}
LOG.info("Update the subCluster to {} for reservation {} in the StateStore.",
subClusterId, reservationId);
FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - startTime);
return UpdateReservationHomeSubClusterResponse.newInstance();
} catch (SQLException e) {
FederationStateStoreClientMetrics.failedStateStoreCall();
FederationStateStoreUtils.logAndThrowRetriableException(e, LOG,
"Unable to update the subCluster %s according to reservation %s.",
subClusterId, reservationId);
} finally {
// Return to the pool the CallableStatement
FederationStateStoreUtils.returnToPool(LOG, cstmt);
}
throw new YarnException(
"Unable to update the subCluster " + subClusterId +
" according to reservation" + reservationId);
} }
} }

View File

@ -145,6 +145,22 @@ public final class FederationStateStoreUtils {
throw new FederationStateStoreException(errMsg); throw new FederationStateStoreException(errMsg);
} }
/**
* Throws an <code>FederationStateStoreException</code> due to an error in
* <code>FederationStateStore</code>.
*
* @param log the logger interface
* @param errMsgFormat the error message format string.
* @param args referenced by the format specifiers in the format string.
* @throws YarnException on failure
*/
public static void logAndThrowStoreException(Logger log, String errMsgFormat, Object... args)
throws YarnException {
String errMsg = String.format(errMsgFormat, args);
log.error(errMsg);
throw new FederationStateStoreException(errMsg);
}
/** /**
* Throws an <code>FederationStateStoreInvalidInputException</code> due to an * Throws an <code>FederationStateStoreInvalidInputException</code> due to an
* error in <code>FederationStateStore</code>. * error in <code>FederationStateStore</code>.
@ -179,6 +195,44 @@ public final class FederationStateStoreUtils {
} }
} }
/**
* Throws an <code>FederationStateStoreRetriableException</code> due to an
* error in <code>FederationStateStore</code>.
*
* @param t the throwable raised in the called class.
* @param log the logger interface.
* @param errMsgFormat the error message format string.
* @param args referenced by the format specifiers in the format string.
* @throws YarnException on failure
*/
public static void logAndThrowRetriableException(
Throwable t, Logger log, String errMsgFormat, Object... args) throws YarnException {
String errMsg = String.format(errMsgFormat, args);
if (t != null) {
log.error(errMsg, t);
throw new FederationStateStoreRetriableException(errMsg, t);
} else {
log.error(errMsg);
throw new FederationStateStoreRetriableException(errMsg);
}
}
/**
* Throws an <code>FederationStateStoreRetriableException</code> due to an
* error in <code>FederationStateStore</code>.
*
* @param log the logger interface.
* @param errMsgFormat the error message format string.
* @param args referenced by the format specifiers in the format string.
* @throws YarnException on failure
*/
public static void logAndThrowRetriableException(
Logger log, String errMsgFormat, Object... args) throws YarnException {
String errMsg = String.format(errMsgFormat, args);
log.error(errMsg);
throw new FederationStateStoreRetriableException(errMsg);
}
/** /**
* Sets a specific value for a specific property of * Sets a specific value for a specific property of
* <code>HikariDataSource</code> SQL connections. * <code>HikariDataSource</code> SQL connections.

View File

@ -58,6 +58,12 @@ public class HSQLDBFederationStateStore extends SQLFederationStateStore {
+ " policyType varchar(256) NOT NULL, params varbinary(512)," + " policyType varchar(256) NOT NULL, params varbinary(512),"
+ " CONSTRAINT pk_queue PRIMARY KEY (queue))"; + " CONSTRAINT pk_queue PRIMARY KEY (queue))";
private static final String TABLE_RESERVATIONSHOMESUBCLUSTER =
" CREATE TABLE reservationsHomeSubCluster ("
+ " reservationId varchar(128) NOT NULL,"
+ " homeSubCluster varchar(256) NOT NULL,"
+ " CONSTRAINT pk_reservationId PRIMARY KEY (reservationId))";
private static final String SP_REGISTERSUBCLUSTER = private static final String SP_REGISTERSUBCLUSTER =
"CREATE PROCEDURE sp_registerSubCluster(" "CREATE PROCEDURE sp_registerSubCluster("
+ " IN subClusterId_IN varchar(256)," + " IN subClusterId_IN varchar(256),"
@ -201,6 +207,101 @@ public class HSQLDBFederationStateStore extends SQLFederationStateStore {
+ " DECLARE result CURSOR FOR" + " DECLARE result CURSOR FOR"
+ " SELECT * FROM policies; OPEN result; END"; + " SELECT * FROM policies; OPEN result; END";
private static final String SP_ADDRESERVATIONHOMESUBCLUSTER =
"CREATE PROCEDURE sp_addReservationHomeSubCluster("
+ " IN reservationId_IN varchar(128),"
+ " IN homeSubCluster_IN varchar(256),"
+ " OUT storedHomeSubCluster_OUT varchar(256), OUT rowCount_OUT int)"
+ " MODIFIES SQL DATA BEGIN ATOMIC"
+ " INSERT INTO reservationsHomeSubCluster "
+ " (reservationId,homeSubCluster) "
+ " (SELECT reservationId_IN, homeSubCluster_IN"
+ " FROM reservationsHomeSubCluster"
+ " WHERE reservationId = reservationId_IN"
+ " HAVING COUNT(*) = 0 );"
+ " GET DIAGNOSTICS rowCount_OUT = ROW_COUNT;"
+ " SELECT homeSubCluster INTO storedHomeSubCluster_OUT"
+ " FROM reservationsHomeSubCluster"
+ " WHERE reservationId = reservationId_IN; END";
private static final String SP_GETRESERVATIONHOMESUBCLUSTER =
"CREATE PROCEDURE sp_getReservationHomeSubCluster("
+ " IN reservationId_IN varchar(128),"
+ " OUT homeSubCluster_OUT varchar(256))"
+ " MODIFIES SQL DATA BEGIN ATOMIC"
+ " SELECT homeSubCluster INTO homeSubCluster_OUT"
+ " FROM reservationsHomeSubCluster"
+ " WHERE reservationId = reservationId_IN; END";
private static final String SP_GETRESERVATIONSHOMESUBCLUSTER =
"CREATE PROCEDURE sp_getReservationsHomeSubCluster()"
+ " MODIFIES SQL DATA DYNAMIC RESULT SETS 1 BEGIN ATOMIC"
+ " DECLARE result CURSOR FOR"
+ " SELECT reservationId, homeSubCluster"
+ " FROM reservationsHomeSubCluster; OPEN result; END";
private static final String SP_DELETERESERVATIONHOMESUBCLUSTER =
"CREATE PROCEDURE sp_deleteReservationHomeSubCluster("
+ " IN reservationId_IN varchar(128), OUT rowCount_OUT int)"
+ " MODIFIES SQL DATA BEGIN ATOMIC"
+ " DELETE FROM reservationsHomeSubCluster"
+ " WHERE reservationId = reservationId_IN;"
+ " GET DIAGNOSTICS rowCount_OUT = ROW_COUNT; END";
private static final String SP_UPDATERESERVATIONHOMESUBCLUSTER =
"CREATE PROCEDURE sp_updateReservationHomeSubCluster("
+ " IN reservationId_IN varchar(128),"
+ " IN homeSubCluster_IN varchar(256), OUT rowCount_OUT int)"
+ " MODIFIES SQL DATA BEGIN ATOMIC"
+ " UPDATE reservationsHomeSubCluster"
+ " SET homeSubCluster = homeSubCluster_IN"
+ " WHERE reservationId = reservationId_IN;"
+ " GET DIAGNOSTICS rowCount_OUT = ROW_COUNT; END";
protected static final String SP_DROP_ADDRESERVATIONHOMESUBCLUSTER =
"DROP PROCEDURE sp_addReservationHomeSubCluster";
protected static final String SP_ADDRESERVATIONHOMESUBCLUSTER2 =
"CREATE PROCEDURE sp_addReservationHomeSubCluster("
+ " IN reservationId_IN varchar(128),"
+ " IN homeSubCluster_IN varchar(256),"
+ " OUT storedHomeSubCluster_OUT varchar(256), OUT rowCount_OUT int)"
+ " MODIFIES SQL DATA BEGIN ATOMIC"
+ " INSERT INTO reservationsHomeSubCluster "
+ " (reservationId,homeSubCluster) "
+ " (SELECT reservationId_IN, homeSubCluster_IN"
+ " FROM reservationsHomeSubCluster"
+ " WHERE reservationId = reservationId_IN"
+ " HAVING COUNT(*) = 0 );"
+ " SELECT homeSubCluster, 2 INTO storedHomeSubCluster_OUT, rowCount_OUT"
+ " FROM reservationsHomeSubCluster"
+ " WHERE reservationId = reservationId_IN; END";
protected static final String SP_DROP_UPDATERESERVATIONHOMESUBCLUSTER =
"DROP PROCEDURE sp_updateReservationHomeSubCluster";
protected static final String SP_UPDATERESERVATIONHOMESUBCLUSTER2 =
"CREATE PROCEDURE sp_updateReservationHomeSubCluster("
+ " IN reservationId_IN varchar(128),"
+ " IN homeSubCluster_IN varchar(256), OUT rowCount_OUT int)"
+ " MODIFIES SQL DATA BEGIN ATOMIC"
+ " UPDATE reservationsHomeSubCluster"
+ " SET homeSubCluster = homeSubCluster_IN"
+ " WHERE reservationId = reservationId_IN;"
+ " SET rowCount_OUT = 2; END";
protected static final String SP_DROP_DELETERESERVATIONHOMESUBCLUSTER =
"DROP PROCEDURE sp_deleteReservationHomeSubCluster";
protected static final String SP_DELETERESERVATIONHOMESUBCLUSTER2 =
"CREATE PROCEDURE sp_deleteReservationHomeSubCluster("
+ " IN reservationId_IN varchar(128), OUT rowCount_OUT int)"
+ " MODIFIES SQL DATA BEGIN ATOMIC"
+ " DELETE FROM reservationsHomeSubCluster"
+ " WHERE reservationId = reservationId_IN;"
+ " SET rowCount_OUT = 2; END";
@Override @Override
public void init(Configuration conf) { public void init(Configuration conf) {
try { try {
@ -216,6 +317,7 @@ public class HSQLDBFederationStateStore extends SQLFederationStateStore {
conn.prepareStatement(TABLE_APPLICATIONSHOMESUBCLUSTER).execute(); conn.prepareStatement(TABLE_APPLICATIONSHOMESUBCLUSTER).execute();
conn.prepareStatement(TABLE_MEMBERSHIP).execute(); conn.prepareStatement(TABLE_MEMBERSHIP).execute();
conn.prepareStatement(TABLE_POLICIES).execute(); conn.prepareStatement(TABLE_POLICIES).execute();
conn.prepareStatement(TABLE_RESERVATIONSHOMESUBCLUSTER).execute();
conn.prepareStatement(SP_REGISTERSUBCLUSTER).execute(); conn.prepareStatement(SP_REGISTERSUBCLUSTER).execute();
conn.prepareStatement(SP_DEREGISTERSUBCLUSTER).execute(); conn.prepareStatement(SP_DEREGISTERSUBCLUSTER).execute();
@ -233,6 +335,12 @@ public class HSQLDBFederationStateStore extends SQLFederationStateStore {
conn.prepareStatement(SP_GETPOLICYCONFIGURATION).execute(); conn.prepareStatement(SP_GETPOLICYCONFIGURATION).execute();
conn.prepareStatement(SP_GETPOLICIESCONFIGURATIONS).execute(); conn.prepareStatement(SP_GETPOLICIESCONFIGURATIONS).execute();
conn.prepareStatement(SP_ADDRESERVATIONHOMESUBCLUSTER).execute();
conn.prepareStatement(SP_GETRESERVATIONHOMESUBCLUSTER).execute();
conn.prepareStatement(SP_GETRESERVATIONSHOMESUBCLUSTER).execute();
conn.prepareStatement(SP_DELETERESERVATIONHOMESUBCLUSTER).execute();
conn.prepareStatement(SP_UPDATERESERVATIONHOMESUBCLUSTER).execute();
LOG.info("Database Init: Complete"); LOG.info("Database Init: Complete");
} catch (SQLException e) { } catch (SQLException e) {
LOG.error("ERROR: failed to inizialize HSQLDB " + e.getMessage()); LOG.error("ERROR: failed to inizialize HSQLDB " + e.getMessage());

View File

@ -17,8 +17,10 @@
package org.apache.hadoop.yarn.server.federation.store.impl; package org.apache.hadoop.yarn.server.federation.store.impl;
import org.apache.commons.lang3.NotImplementedException; import org.apache.hadoop.test.LambdaTestUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
@ -26,14 +28,43 @@ import org.apache.hadoop.yarn.server.federation.store.metrics.FederationStateSto
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.ReservationHomeSubCluster;
import org.apache.hadoop.yarn.server.federation.store.records.AddReservationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.UpdateReservationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.DeleteReservationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.utils.FederationStateStoreUtils;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.sql.CallableStatement;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import static org.apache.hadoop.yarn.server.federation.store.impl.SQLFederationStateStore.CALL_SP_ADD_RESERVATION_HOME_SUBCLUSTER;
import static org.apache.hadoop.yarn.server.federation.store.impl.SQLFederationStateStore.CALL_SP_GET_RESERVATION_HOME_SUBCLUSTER;
import static org.apache.hadoop.yarn.server.federation.store.impl.SQLFederationStateStore.CALL_SP_GET_RESERVATIONS_HOME_SUBCLUSTER;
import static org.apache.hadoop.yarn.server.federation.store.impl.SQLFederationStateStore.CALL_SP_UPDATE_RESERVATION_HOME_SUBCLUSTER;
import static org.apache.hadoop.yarn.server.federation.store.impl.SQLFederationStateStore.CALL_SP_DELETE_RESERVATION_HOME_SUBCLUSTER;
import static org.apache.hadoop.yarn.server.federation.store.impl.HSQLDBFederationStateStore.SP_DROP_ADDRESERVATIONHOMESUBCLUSTER;
import static org.apache.hadoop.yarn.server.federation.store.impl.HSQLDBFederationStateStore.SP_ADDRESERVATIONHOMESUBCLUSTER2;
import static org.apache.hadoop.yarn.server.federation.store.impl.HSQLDBFederationStateStore.SP_DROP_UPDATERESERVATIONHOMESUBCLUSTER;
import static org.apache.hadoop.yarn.server.federation.store.impl.HSQLDBFederationStateStore.SP_UPDATERESERVATIONHOMESUBCLUSTER2;
import static org.apache.hadoop.yarn.server.federation.store.impl.HSQLDBFederationStateStore.SP_DROP_DELETERESERVATIONHOMESUBCLUSTER;
import static org.apache.hadoop.yarn.server.federation.store.impl.HSQLDBFederationStateStore.SP_DELETERESERVATIONHOMESUBCLUSTER2;
/** /**
* Unit tests for SQLFederationStateStore. * Unit tests for SQLFederationStateStore.
*/ */
public class TestSQLFederationStateStore extends FederationStateStoreBaseTest { public class TestSQLFederationStateStore extends FederationStateStoreBaseTest {
public static final Logger LOG =
LoggerFactory.getLogger(TestSQLFederationStateStore.class);
private static final String HSQLDB_DRIVER = "org.hsqldb.jdbc.JDBCDataSource"; private static final String HSQLDB_DRIVER = "org.hsqldb.jdbc.JDBCDataSource";
private static final String DATABASE_URL = "jdbc:hsqldb:mem:state"; private static final String DATABASE_URL = "jdbc:hsqldb:mem:state";
private static final String DATABASE_USERNAME = "SA"; private static final String DATABASE_USERNAME = "SA";
@ -76,38 +107,452 @@ public class TestSQLFederationStateStore extends FederationStateStoreBaseTest {
FederationStateStoreClientMetrics.getNumConnections()); FederationStateStoreClientMetrics.getNumConnections());
} }
@Test(expected = NotImplementedException.class) class ReservationHomeSC {
public void testAddReservationHomeSubCluster() throws Exception { private String reservationId;
super.testAddReservationHomeSubCluster(); private String subHomeClusterId;
private int dbUpdateCount;
ReservationHomeSC(String rId, String subHomeSCId, int dbUpdateCount) {
this.reservationId = rId;
this.subHomeClusterId = subHomeSCId;
this.dbUpdateCount = dbUpdateCount;
}
} }
@Test(expected = NotImplementedException.class) private ReservationHomeSC addReservationHomeSubCluster(
public void testAddReservationHomeSubClusterReservationAlreadyExists() throws Exception { SQLFederationStateStore sqlFederationStateStore, String procedure,
super.testAddReservationHomeSubClusterReservationAlreadyExists(); String reservationId, String subHomeClusterId) throws SQLException, YarnException {
// procedure call parameter preparation
CallableStatement cstmt = sqlFederationStateStore.getCallableStatement(procedure);
cstmt.setString("reservationId_IN", reservationId);
cstmt.setString("homeSubCluster_IN", subHomeClusterId);
cstmt.registerOutParameter("storedHomeSubCluster_OUT", java.sql.Types.VARCHAR);
cstmt.registerOutParameter("rowCount_OUT", java.sql.Types.INTEGER);
// execute procedure
cstmt.executeUpdate();
// get call result
String dbStoredHomeSubCluster = cstmt.getString("storedHomeSubCluster_OUT");
int dbRowCount = cstmt.getInt("rowCount_OUT");
// return cstmt to pool
FederationStateStoreUtils.returnToPool(LOG, cstmt);
return new ReservationHomeSC(reservationId, dbStoredHomeSubCluster, dbRowCount);
} }
@Test(expected = NotImplementedException.class) private ReservationHomeSC getReservationHomeSubCluster(
public void testAddReservationHomeSubClusterAppAlreadyExistsInTheSameSC() throws Exception { SQLFederationStateStore sqlFederationStateStore, String procedure,
super.testAddReservationHomeSubClusterAppAlreadyExistsInTheSameSC(); String reservationId) throws SQLException, YarnException {
// procedure call parameter preparation
CallableStatement cstmt = sqlFederationStateStore.getCallableStatement(procedure);
cstmt.setString("reservationId_IN", reservationId.toString());
cstmt.registerOutParameter("homeSubCluster_OUT", java.sql.Types.VARCHAR);
// execute procedure
cstmt.execute();
// get call result
String dBSubClusterHomeId = cstmt.getString("homeSubCluster_OUT");
// return cstmt to pool
FederationStateStoreUtils.returnToPool(LOG, cstmt);
// returns the ReservationHomeSubCluster object
return new ReservationHomeSC(reservationId, dBSubClusterHomeId, 0);
} }
@Test(expected = NotImplementedException.class) private List<ReservationHomeSC> getReservationsHomeSubCluster(
public void testDeleteReservationHomeSubCluster() throws Exception { SQLFederationStateStore sqlFederationStateStore, String procedure)
super.testDeleteReservationHomeSubCluster(); throws SQLException, IOException, YarnException {
List<ReservationHomeSC> results = new ArrayList<>();
// procedure call parameter preparation
CallableStatement cstmt = sqlFederationStateStore.getCallableStatement(procedure);
// execute procedure
ResultSet rs = cstmt.executeQuery();
while (rs.next()) {
// 1OUT reservationId
String dbReservationId = rs.getString("reservationId");
// 2OUT homeSubCluster
String dbHomeSubCluster = rs.getString("homeSubCluster");
results.add(new ReservationHomeSC(dbReservationId, dbHomeSubCluster, 0));
}
// return cstmt to pool
FederationStateStoreUtils.returnToPool(LOG, cstmt);
// return ReservationHomeSubCluster List
return results;
} }
@Test(expected = NotImplementedException.class) private ReservationHomeSC updateReservationHomeSubCluster(
public void testDeleteReservationHomeSubClusterUnknownApp() throws Exception { SQLFederationStateStore sqlFederationStateStore, String procedure,
super.testDeleteReservationHomeSubClusterUnknownApp(); String reservationId, String subHomeClusterId)
throws SQLException, IOException {
// procedure call parameter preparation
CallableStatement cstmt = sqlFederationStateStore.getCallableStatement(procedure);
// 1IN reservationId_IN varchar(128)
cstmt.setString("reservationId_IN", reservationId);
// 2IN homeSubCluster_IN varchar(256)
cstmt.setString("homeSubCluster_IN", subHomeClusterId);
// 3OUT rowCount_OUT int
cstmt.registerOutParameter("rowCount_OUT", java.sql.Types.INTEGER);
// execute procedure
cstmt.executeUpdate();
// get rowcount
int rowCount = cstmt.getInt("rowCount_OUT");
// returns the ReservationHomeSubCluster object
return new ReservationHomeSC(reservationId, subHomeClusterId, rowCount);
} }
@Test(expected = NotImplementedException.class) private ReservationHomeSC deleteReservationHomeSubCluster(
public void testUpdateReservationHomeSubCluster() throws Exception { SQLFederationStateStore sqlFederationStateStore, String procedure,
super.testUpdateReservationHomeSubCluster(); String reservationId) throws SQLException {
// procedure call parameter preparation
CallableStatement cstmt = sqlFederationStateStore.getCallableStatement(procedure);
// Set the parameters for the stored procedure
// 1IN reservationId_IN varchar(128)
cstmt.setString("reservationId_IN", reservationId);
// 2OUT rowCount_OUT int
cstmt.registerOutParameter("rowCount_OUT", java.sql.Types.INTEGER);
// execute procedure
cstmt.executeUpdate();
// get rowcount
int rowCount = cstmt.getInt("rowCount_OUT");
// returns the ReservationHomeSubCluster object
return new ReservationHomeSC(reservationId, "-", rowCount);
} }
@Test(expected = NotImplementedException.class) /**
public void testUpdateReservationHomeSubClusterUnknownApp() throws Exception { * This test case is used to check whether the procedure
super.testUpdateReservationHomeSubClusterUnknownApp(); * sp_addReservationHomeSubCluster can be executed normally.
*
* This test case will write 1 record to the database, and check returns the result.
*
* @throws Exception when the error occurs
*/
@Test
public void testCheckAddReservationHomeSubCluster() throws Exception {
FederationStateStore stateStore = getStateStore();
Assert.assertTrue(stateStore instanceof SQLFederationStateStore);
SQLFederationStateStore sqlFederationStateStore = (SQLFederationStateStore) stateStore;
// procedure call parameter preparation
ReservationId reservationId = ReservationId.newInstance(Time.now(), 1);
String subHomeClusterId = "SC-1";
ReservationHomeSC resultHC = addReservationHomeSubCluster(sqlFederationStateStore,
CALL_SP_ADD_RESERVATION_HOME_SUBCLUSTER, reservationId.toString(), subHomeClusterId);
// validation results
Assert.assertNotNull(resultHC);
Assert.assertEquals(subHomeClusterId, resultHC.subHomeClusterId);
Assert.assertEquals(1, resultHC.dbUpdateCount);
}
/**
* This test case is used to check whether the procedure
* sp_getReservationHomeSubCluster can be executed normally.
*
* Query according to reservationId, expect accurate query results,
* and check the homeSubCluster field.
*
* @throws Exception when the error occurs
*/
@Test
public void testCheckGetReservationHomeSubCluster() throws Exception {
FederationStateStore stateStore = getStateStore();
Assert.assertTrue(stateStore instanceof SQLFederationStateStore);
SQLFederationStateStore sqlFederationStateStore = (SQLFederationStateStore) stateStore;
// procedure call parameter preparation
ReservationId reservationId = ReservationId.newInstance(Time.now(), 1);
String subHomeClusterId = "SC-1";
addReservationHomeSubCluster(sqlFederationStateStore,
CALL_SP_ADD_RESERVATION_HOME_SUBCLUSTER, reservationId.toString(), subHomeClusterId);
// Call getReservationHomeSubCluster to get the result
ReservationHomeSC resultHC = getReservationHomeSubCluster(sqlFederationStateStore,
CALL_SP_GET_RESERVATION_HOME_SUBCLUSTER, reservationId.toString());
Assert.assertNotNull(resultHC);
Assert.assertEquals(subHomeClusterId, resultHC.subHomeClusterId);
Assert.assertEquals(reservationId.toString(), resultHC.reservationId);
}
/**
* This test case is used to check whether the procedure
* sp_getReservationsHomeSubCluster can be executed normally.
*
* This test case will write 2 record to the database, and check returns the result.
*
* The test case will compare the number of returned records from the database
* and whether the content of each returned record is accurate.
*
* @throws Exception when the error occurs
*/
@Test
public void testCheckGetReservationsHomeSubCluster() throws Exception {
FederationStateStore stateStore = getStateStore();
Assert.assertTrue(stateStore instanceof SQLFederationStateStore);
SQLFederationStateStore sqlFederationStateStore = (SQLFederationStateStore) stateStore;
// add 1st record
ReservationId reservationId1 = ReservationId.newInstance(Time.now(), 1);
String subHomeClusterId1 = "SC-1";
addReservationHomeSubCluster(sqlFederationStateStore,
CALL_SP_ADD_RESERVATION_HOME_SUBCLUSTER, reservationId1.toString(), subHomeClusterId1);
// add 2nd record
ReservationId reservationId2 = ReservationId.newInstance(Time.now(), 2);
String subHomeClusterId2 = "SC-2";
addReservationHomeSubCluster(sqlFederationStateStore,
CALL_SP_ADD_RESERVATION_HOME_SUBCLUSTER, reservationId2.toString(), subHomeClusterId2);
List<ReservationHomeSC> reservationHomeSubClusters = getReservationsHomeSubCluster(
sqlFederationStateStore, CALL_SP_GET_RESERVATIONS_HOME_SUBCLUSTER);
Assert.assertNotNull(reservationHomeSubClusters);
Assert.assertEquals(2, reservationHomeSubClusters.size());
ReservationHomeSC resultHC1 = reservationHomeSubClusters.get(0);
Assert.assertNotNull(resultHC1);
Assert.assertEquals(reservationId1.toString(), resultHC1.reservationId);
Assert.assertEquals(subHomeClusterId1, resultHC1.subHomeClusterId);
ReservationHomeSC resultHC2 = reservationHomeSubClusters.get(1);
Assert.assertNotNull(resultHC2);
Assert.assertEquals(reservationId2.toString(), resultHC2.reservationId);
Assert.assertEquals(subHomeClusterId2, resultHC2.subHomeClusterId);
}
/**
* This test case is used to check whether the procedure
* sp_updateReservationHomeSubCluster can be executed normally.
*
* This test case will first insert 1 record into the database,
* and then update the new SubHomeClusterId according to the reservationId.
*
* It will check whether the SubHomeClusterId is as expected after the addition and update.
* For the first time, the HomeClusterId of the database should be SC-1,
* and for the second time, the HomeClusterId of the database should be SC-2.
*
* @throws Exception when the error occurs
*/
@Test
public void testCheckUpdateReservationHomeSubCluster() throws Exception {
FederationStateStore stateStore = getStateStore();
Assert.assertTrue(stateStore instanceof SQLFederationStateStore);
SQLFederationStateStore sqlFederationStateStore = (SQLFederationStateStore) stateStore;
// procedure call parameter preparation
ReservationId reservationId = ReservationId.newInstance(Time.now(), 1);
String subHomeClusterId = "SC-1";
addReservationHomeSubCluster(sqlFederationStateStore,
CALL_SP_ADD_RESERVATION_HOME_SUBCLUSTER, reservationId.toString(), subHomeClusterId);
// verify that the subHomeClusterId corresponding to reservationId is SC-1
ReservationHomeSC resultHC = getReservationHomeSubCluster(sqlFederationStateStore,
CALL_SP_GET_RESERVATION_HOME_SUBCLUSTER, reservationId.toString());
Assert.assertNotNull(resultHC);
Assert.assertEquals(subHomeClusterId, resultHC.subHomeClusterId);
// prepare to update parameters
String newSubHomeClusterId = "SC-2";
ReservationHomeSC reservationHomeSubCluster =
updateReservationHomeSubCluster(sqlFederationStateStore,
CALL_SP_UPDATE_RESERVATION_HOME_SUBCLUSTER, reservationId.toString(), newSubHomeClusterId);
Assert.assertNotNull(reservationHomeSubCluster);
Assert.assertEquals(1, reservationHomeSubCluster.dbUpdateCount);
// verify that the subHomeClusterId corresponding to reservationId is SC-2
ReservationHomeSC resultHC2 = getReservationHomeSubCluster(sqlFederationStateStore,
CALL_SP_GET_RESERVATION_HOME_SUBCLUSTER, reservationId.toString());
Assert.assertNotNull(resultHC2);
Assert.assertEquals(newSubHomeClusterId, resultHC2.subHomeClusterId);
}
/**
* This test case is used to check whether the procedure
* sp_deleteReservationHomeSubCluster can be executed normally.
*
* This test case will first write 1 record to the database,
* and then delete the corresponding record according to reservationId.
*
* Query the corresponding homeSubCluster according to reservationId,
* we should get a NULL at this time.
*
* @throws Exception when the error occurs
*/
@Test
public void testCheckDeleteReservationHomeSubCluster() throws Exception {
FederationStateStore stateStore = getStateStore();
Assert.assertTrue(stateStore instanceof SQLFederationStateStore);
SQLFederationStateStore sqlFederationStateStore = (SQLFederationStateStore) stateStore;
// procedure call parameter preparation
ReservationId reservationId = ReservationId.newInstance(Time.now(), 1);
String subHomeClusterId = "SC-1";
addReservationHomeSubCluster(sqlFederationStateStore,
CALL_SP_ADD_RESERVATION_HOME_SUBCLUSTER, reservationId.toString(), subHomeClusterId);
// call the delete method of the reservation
ReservationHomeSC resultHC = deleteReservationHomeSubCluster(sqlFederationStateStore,
CALL_SP_DELETE_RESERVATION_HOME_SUBCLUSTER, reservationId.toString());
Assert.assertNotNull(resultHC);
Assert.assertEquals(1, resultHC.dbUpdateCount);
// call getReservationHomeSubCluster to get the result
ReservationHomeSC resultHC1 = getReservationHomeSubCluster(sqlFederationStateStore,
CALL_SP_GET_RESERVATION_HOME_SUBCLUSTER, reservationId.toString());
Assert.assertNotNull(resultHC1);
Assert.assertEquals(null, resultHC1.subHomeClusterId);
}
/**
* This test case is used to verify the processing logic of the incorrect number of
* updated records returned by the database when AddReservationHomeSubCluster is used.
*
* The probability of the database returning an update record greater than 1 is very low.
*
* @throws Exception when the error occurs
*/
@Test
public void testAddReservationHomeSubClusterAbnormalSituation() throws Exception {
FederationStateStore stateStore = getStateStore();
Assert.assertTrue(stateStore instanceof SQLFederationStateStore);
SQLFederationStateStore sqlFederationStateStore = (SQLFederationStateStore) stateStore;
Connection conn = sqlFederationStateStore.conn;
conn.prepareStatement(SP_DROP_ADDRESERVATIONHOMESUBCLUSTER).execute();
conn.prepareStatement(SP_ADDRESERVATIONHOMESUBCLUSTER2).execute();
ReservationId reservationId = ReservationId.newInstance(Time.now(), 1);
SubClusterId subClusterId = SubClusterId.newInstance("SC");
ReservationHomeSubCluster reservationHomeSubCluster =
ReservationHomeSubCluster.newInstance(reservationId, subClusterId);
AddReservationHomeSubClusterRequest request =
AddReservationHomeSubClusterRequest.newInstance(reservationHomeSubCluster);
String errorMsg = String.format(
"Wrong behavior during the insertion of subCluster %s according to reservation %s. " +
"The database expects to insert 1 record, but the number of " +
"inserted changes is greater than 1, " +
"please check the records of the database.", subClusterId, reservationId);
LambdaTestUtils.intercept(YarnException.class, errorMsg,
() -> stateStore.addReservationHomeSubCluster(request));
}
/**
* This test case is used to verify the logic when calling the updateReservationHomeSubCluster
* method if the database returns an inaccurate result.
*
* The probability of the database returning an update record greater than 1 is very low.
*
* @throws Exception when the error occurs
*/
@Test
public void testUpdateReservationHomeSubClusterAbnormalSituation() throws Exception {
FederationStateStore stateStore = getStateStore();
Assert.assertTrue(stateStore instanceof SQLFederationStateStore);
SQLFederationStateStore sqlFederationStateStore = (SQLFederationStateStore) stateStore;
Connection conn = sqlFederationStateStore.conn;
conn.prepareStatement(SP_DROP_UPDATERESERVATIONHOMESUBCLUSTER).execute();
conn.prepareStatement(SP_UPDATERESERVATIONHOMESUBCLUSTER2).execute();
ReservationId reservationId = ReservationId.newInstance(Time.now(), 1);
SubClusterId subClusterId1 = SubClusterId.newInstance("SC");
// add Reservation data.
ReservationHomeSubCluster reservationHomeSubCluster =
ReservationHomeSubCluster.newInstance(reservationId, subClusterId1);
AddReservationHomeSubClusterRequest addRequest =
AddReservationHomeSubClusterRequest.newInstance(reservationHomeSubCluster);
stateStore.addReservationHomeSubCluster(addRequest);
SubClusterId subClusterId2 = SubClusterId.newInstance("SC2");
ReservationHomeSubCluster reservationHomeSubCluster2 =
ReservationHomeSubCluster.newInstance(reservationId, subClusterId2);
UpdateReservationHomeSubClusterRequest updateRequest =
UpdateReservationHomeSubClusterRequest.newInstance(reservationHomeSubCluster2);
String errorMsg = String.format(
"Wrong behavior during update the subCluster %s according to reservation %s. " +
"The database is expected to update 1 record, " +
"but the number of database update records is greater than 1, " +
"the records of the database should be checked.",
subClusterId2, reservationId);
LambdaTestUtils.intercept(YarnException.class, errorMsg,
() -> stateStore.updateReservationHomeSubCluster(updateRequest));
}
/**
* This test case is used to verify the logic when calling the deleteReservationHomeSubCluster
* method if the database returns an inaccurate result.
*
* The probability of the database returning an update record greater than 1 is very low.
*
* @throws Exception when the error occurs
*/
@Test
public void testDeleteReservationHomeSubClusterAbnormalSituation() throws Exception {
FederationStateStore stateStore = getStateStore();
Assert.assertTrue(stateStore instanceof SQLFederationStateStore);
SQLFederationStateStore sqlFederationStateStore = (SQLFederationStateStore) stateStore;
Connection conn = sqlFederationStateStore.conn;
conn.prepareStatement(SP_DROP_DELETERESERVATIONHOMESUBCLUSTER).execute();
conn.prepareStatement(SP_DELETERESERVATIONHOMESUBCLUSTER2).execute();
ReservationId reservationId = ReservationId.newInstance(Time.now(), 1);
SubClusterId subClusterId1 = SubClusterId.newInstance("SC");
// add Reservation data.
ReservationHomeSubCluster reservationHomeSubCluster =
ReservationHomeSubCluster.newInstance(reservationId, subClusterId1);
AddReservationHomeSubClusterRequest addRequest =
AddReservationHomeSubClusterRequest.newInstance(reservationHomeSubCluster);
stateStore.addReservationHomeSubCluster(addRequest);
DeleteReservationHomeSubClusterRequest delRequest =
DeleteReservationHomeSubClusterRequest.newInstance(reservationId);
String errorMsg = String.format(
"Wrong behavior during deleting the reservation %s. " +
"The database is expected to delete 1 record, " +
"but the number of deleted records returned by the database is greater than 1, " +
"indicating that a duplicate reservationId occurred during the deletion process.",
reservationId);
LambdaTestUtils.intercept(YarnException.class, errorMsg,
() -> stateStore.deleteReservationHomeSubCluster(delRequest));
} }
} }