YARN-11328. Refactoring part of the code of SQLFederationStateStore. (#4976)

This commit is contained in:
slfan1989 2022-10-20 07:11:28 +08:00 committed by GitHub
parent 8aa04b0b24
commit 48b6f9f335
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 351 additions and 387 deletions

View File

@ -24,10 +24,10 @@ IF OBJECT_ID ( '[sp_addApplicationHomeSubCluster]', 'P' ) IS NOT NULL
GO
CREATE PROCEDURE [dbo].[sp_addApplicationHomeSubCluster]
@applicationId VARCHAR(64),
@homeSubCluster VARCHAR(256),
@storedHomeSubCluster VARCHAR(256) OUTPUT,
@rowCount int OUTPUT
@applicationId_IN VARCHAR(64),
@homeSubCluster_IN VARCHAR(256),
@storedHomeSubCluster_OUT VARCHAR(256) OUTPUT,
@rowCount_OUT int OUTPUT
AS BEGIN
DECLARE @errorMessage nvarchar(4000)
@ -37,21 +37,21 @@ AS BEGIN
-- Otherwise don't change the current mapping.
IF NOT EXISTS (SELECT TOP 1 *
FROM [dbo].[applicationsHomeSubCluster]
WHERE [applicationId] = @applicationId)
WHERE [applicationId] = @applicationId_IN)
INSERT INTO [dbo].[applicationsHomeSubCluster] (
[applicationId],
[homeSubCluster])
VALUES (
@applicationId,
@homeSubCluster);
@applicationId_IN,
@homeSubCluster_IN);
-- End of the IF block
SELECT @rowCount = @@ROWCOUNT;
SELECT @rowCount_OUT = @@ROWCOUNT;
SELECT @storedHomeSubCluster = [homeSubCluster]
SELECT @storedHomeSubCluster_OUT = [homeSubCluster]
FROM [dbo].[applicationsHomeSubCluster]
WHERE [applicationId] = @applicationId;
WHERE [applicationId] = @applicationId_IN;
COMMIT TRAN
END TRY
@ -75,9 +75,9 @@ IF OBJECT_ID ( '[sp_updateApplicationHomeSubCluster]', 'P' ) IS NOT NULL
GO
CREATE PROCEDURE [dbo].[sp_updateApplicationHomeSubCluster]
@applicationId VARCHAR(64),
@homeSubCluster VARCHAR(256),
@rowCount int OUTPUT
@applicationId_IN VARCHAR(64),
@homeSubCluster_IN VARCHAR(256),
@rowCount_OUT int OUTPUT
AS BEGIN
DECLARE @errorMessage nvarchar(4000)
@ -85,9 +85,9 @@ AS BEGIN
BEGIN TRAN
UPDATE [dbo].[applicationsHomeSubCluster]
SET [homeSubCluster] = @homeSubCluster
WHERE [applicationId] = @applicationid;
SELECT @rowCount = @@ROWCOUNT;
SET [homeSubCluster] = @homeSubCluster_IN
WHERE [applicationId] = @applicationId_IN;
SELECT @rowCount_OUT = @@ROWCOUNT;
COMMIT TRAN
END TRY
@ -111,8 +111,8 @@ IF OBJECT_ID ( '[sp_getApplicationsHomeSubCluster]', 'P' ) IS NOT NULL
GO
CREATE PROCEDURE [dbo].[sp_getApplicationsHomeSubCluster]
@limit int,
@homeSubCluster VARCHAR(256)
@limit_IN int,
@homeSubCluster_IN VARCHAR(256)
AS BEGIN
DECLARE @errorMessage nvarchar(4000)
@ -128,8 +128,8 @@ AS BEGIN
[createTime],
row_number() over(order by [createTime] desc) AS app_rank
FROM [dbo].[applicationsHomeSubCluster]
WHERE [homeSubCluster] = @homeSubCluster OR @homeSubCluster = '') AS applicationsHomeSubCluster
WHERE app_rank <= @limit;
WHERE [homeSubCluster] = @homeSubCluster_IN OR @homeSubCluster = '') AS applicationsHomeSubCluster
WHERE app_rank <= @limit_IN;
END TRY
@ -150,16 +150,16 @@ IF OBJECT_ID ( '[sp_getApplicationHomeSubCluster]', 'P' ) IS NOT NULL
GO
CREATE PROCEDURE [dbo].[sp_getApplicationHomeSubCluster]
@applicationId VARCHAR(64),
@homeSubCluster VARCHAR(256) OUTPUT
@applicationId_IN VARCHAR(64),
@homeSubCluster_OUT VARCHAR(256) OUTPUT
AS BEGIN
DECLARE @errorMessage nvarchar(4000)
BEGIN TRY
SELECT @homeSubCluster = [homeSubCluster]
SELECT @homeSubCluster_OUT = [homeSubCluster]
FROM [dbo].[applicationsHomeSubCluster]
WHERE [applicationId] = @applicationid;
WHERE [applicationId] = @applicationId_IN;
END TRY
@ -181,8 +181,8 @@ IF OBJECT_ID ( '[sp_deleteApplicationHomeSubCluster]', 'P' ) IS NOT NULL
GO
CREATE PROCEDURE [dbo].[sp_deleteApplicationHomeSubCluster]
@applicationId VARCHAR(64),
@rowCount int OUTPUT
@applicationId_IN VARCHAR(64),
@rowCount_OUT int OUTPUT
AS BEGIN
DECLARE @errorMessage nvarchar(4000)
@ -190,8 +190,8 @@ AS BEGIN
BEGIN TRAN
DELETE FROM [dbo].[applicationsHomeSubCluster]
WHERE [applicationId] = @applicationId;
SELECT @rowCount = @@ROWCOUNT;
WHERE [applicationId] = @applicationId_IN;
SELECT @rowCount_OUT = @@ROWCOUNT;
COMMIT TRAN
END TRY
@ -215,15 +215,15 @@ IF OBJECT_ID ( '[sp_registerSubCluster]', 'P' ) IS NOT NULL
GO
CREATE PROCEDURE [dbo].[sp_registerSubCluster]
@subClusterId VARCHAR(256),
@amRMServiceAddress VARCHAR(256),
@clientRMServiceAddress VARCHAR(256),
@rmAdminServiceAddress VARCHAR(256),
@rmWebServiceAddress VARCHAR(256),
@state VARCHAR(32),
@lastStartTime BIGINT,
@capability VARCHAR(6000),
@rowCount int OUTPUT
@subClusterId_IN VARCHAR(256),
@amRMServiceAddress_IN VARCHAR(256),
@clientRMServiceAddress_IN VARCHAR(256),
@rmAdminServiceAddress_IN VARCHAR(256),
@rmWebServiceAddress_IN VARCHAR(256),
@state_IN VARCHAR(32),
@lastStartTime_IN BIGINT,
@capability_IN VARCHAR(6000),
@rowCount_OUT int OUTPUT
AS BEGIN
DECLARE @errorMessage nvarchar(4000)
@ -231,7 +231,7 @@ AS BEGIN
BEGIN TRAN
DELETE FROM [dbo].[membership]
WHERE [subClusterId] = @subClusterId;
WHERE [subClusterId] = @subClusterId_IN;
INSERT INTO [dbo].[membership] (
[subClusterId],
[amRMServiceAddress],
@ -243,16 +243,16 @@ AS BEGIN
[lastStartTime],
[capability] )
VALUES (
@subClusterId,
@amRMServiceAddress,
@clientRMServiceAddress,
@rmAdminServiceAddress,
@rmWebServiceAddress,
@subClusterId_IN,
@amRMServiceAddress_IN,
@clientRMServiceAddress_IN,
@rmAdminServiceAddress_IN,
@rmWebServiceAddress_IN,
GETUTCDATE(),
@state,
@lastStartTime,
@capability);
SELECT @rowCount = @@ROWCOUNT;
@state_IN,
@lastStartTime_IN,
@capability_IN);
SELECT @rowCount_OUT = @@ROWCOUNT;
COMMIT TRAN
END TRY
@ -303,32 +303,32 @@ IF OBJECT_ID ( '[sp_getSubCluster]', 'P' ) IS NOT NULL
GO
CREATE PROCEDURE [dbo].[sp_getSubCluster]
@subClusterId VARCHAR(256),
@amRMServiceAddress VARCHAR(256) OUTPUT,
@clientRMServiceAddress VARCHAR(256) OUTPUT,
@rmAdminServiceAddress VARCHAR(256) OUTPUT,
@rmWebServiceAddress VARCHAR(256) OUTPUT,
@lastHeartbeat DATETIME2 OUTPUT,
@state VARCHAR(256) OUTPUT,
@lastStartTime BIGINT OUTPUT,
@capability VARCHAR(6000) OUTPUT
@subClusterId_IN VARCHAR(256),
@amRMServiceAddress_OUT VARCHAR(256) OUTPUT,
@clientRMServiceAddress_OUT VARCHAR(256) OUTPUT,
@rmAdminServiceAddress_OUT VARCHAR(256) OUTPUT,
@rmWebServiceAddress_OUT VARCHAR(256) OUTPUT,
@lastHeartBeat_OUT DATETIME2 OUTPUT,
@state_OUT VARCHAR(256) OUTPUT,
@lastStartTime_OUT BIGINT OUTPUT,
@capability_OUT VARCHAR(6000) OUTPUT
AS BEGIN
DECLARE @errorMessage nvarchar(4000)
BEGIN TRY
BEGIN TRAN
SELECT @subClusterId = [subClusterId],
@amRMServiceAddress = [amRMServiceAddress],
@clientRMServiceAddress = [clientRMServiceAddress],
@rmAdminServiceAddress = [rmAdminServiceAddress],
@rmWebServiceAddress = [rmWebServiceAddress],
@lastHeartBeat = [lastHeartBeat],
@state = [state],
@lastStartTime = [lastStartTime],
@capability = [capability]
SELECT @subClusterId_IN = [subClusterId],
@amRMServiceAddress_OUT = [amRMServiceAddress],
@clientRMServiceAddress_OUT = [clientRMServiceAddress],
@rmAdminServiceAddress_OUT = [rmAdminServiceAddress],
@rmWebServiceAddress_OUT = [rmWebServiceAddress],
@lastHeartBeat_OUT = [lastHeartBeat],
@state_OUT = [state],
@lastStartTime_OUT = [lastStartTime],
@capability_OUT = [capability]
FROM [dbo].[membership]
WHERE [subClusterId] = @subClusterId
WHERE [subClusterId] = @subClusterId_IN
COMMIT TRAN
END TRY
@ -353,10 +353,10 @@ IF OBJECT_ID ( '[sp_subClusterHeartbeat]', 'P' ) IS NOT NULL
GO
CREATE PROCEDURE [dbo].[sp_subClusterHeartbeat]
@subClusterId VARCHAR(256),
@state VARCHAR(256),
@capability VARCHAR(6000),
@rowCount int OUTPUT
@subClusterId_IN VARCHAR(256),
@state_IN VARCHAR(256),
@capability_IN VARCHAR(6000),
@rowCount_OUT int OUTPUT
AS BEGIN
DECLARE @errorMessage nvarchar(4000)
@ -364,11 +364,11 @@ AS BEGIN
BEGIN TRAN
UPDATE [dbo].[membership]
SET [state] = @state,
SET [state] = @state_IN,
[lastHeartbeat] = GETUTCDATE(),
[capability] = @capability
WHERE [subClusterId] = @subClusterId;
SELECT @rowCount = @@ROWCOUNT;
[capability] = @capability_IN
WHERE [subClusterId] = @subClusterId_IN;
SELECT @rowCount_OUT = @@ROWCOUNT;
COMMIT TRAN
END TRY
@ -392,9 +392,9 @@ IF OBJECT_ID ( '[sp_deregisterSubCluster]', 'P' ) IS NOT NULL
GO
CREATE PROCEDURE [dbo].[sp_deregisterSubCluster]
@subClusterId VARCHAR(256),
@state VARCHAR(256),
@rowCount int OUTPUT
@subClusterId_IN VARCHAR(256),
@state_IN VARCHAR(256),
@rowCount_OUT int OUTPUT
AS BEGIN
DECLARE @errorMessage nvarchar(4000)
@ -402,9 +402,9 @@ AS BEGIN
BEGIN TRAN
UPDATE [dbo].[membership]
SET [state] = @state
WHERE [subClusterId] = @subClusterId;
SELECT @rowCount = @@ROWCOUNT;
SET [state] = @state_IN
WHERE [subClusterId] = @subClusterId_IN;
SELECT @rowCount_OUT = @@ROWCOUNT;
COMMIT TRAN
END TRY
@ -428,10 +428,10 @@ IF OBJECT_ID ( '[sp_setPolicyConfiguration]', 'P' ) IS NOT NULL
GO
CREATE PROCEDURE [dbo].[sp_setPolicyConfiguration]
@queue VARCHAR(256),
@policyType VARCHAR(256),
@params VARBINARY(512),
@rowCount int OUTPUT
@queue_IN VARCHAR(256),
@policyType_IN VARCHAR(256),
@params_IN VARBINARY(512),
@rowCount_OUT int OUTPUT
AS BEGIN
DECLARE @errorMessage nvarchar(4000)
@ -439,16 +439,16 @@ AS BEGIN
BEGIN TRAN
DELETE FROM [dbo].[policies]
WHERE [queue] = @queue;
WHERE [queue] = @queue_IN;
INSERT INTO [dbo].[policies] (
[queue],
[policyType],
[params])
VALUES (
@queue,
@policyType,
@params);
SELECT @rowCount = @@ROWCOUNT;
@queue_IN,
@policyType_IN,
@params_IN);
SELECT @rowCount_OUT = @@ROWCOUNT;
COMMIT TRAN
END TRY
@ -472,18 +472,18 @@ IF OBJECT_ID ( '[sp_getPolicyConfiguration]', 'P' ) IS NOT NULL
GO
CREATE PROCEDURE [dbo].[sp_getPolicyConfiguration]
@queue VARCHAR(256),
@policyType VARCHAR(256) OUTPUT,
@params VARBINARY(6000) OUTPUT
@queue_IN VARCHAR(256),
@policyType_OUT VARCHAR(256) OUTPUT,
@params_OUT VARBINARY(6000) OUTPUT
AS BEGIN
DECLARE @errorMessage nvarchar(4000)
BEGIN TRY
SELECT @policyType = [policyType],
@params = [params]
SELECT @policyType_OUT = [policyType],
@params_OUT = [params]
FROM [dbo].[policies]
WHERE [queue] = @queue
WHERE [queue] = @queue_IN
END TRY
@ -524,15 +524,15 @@ AS BEGIN
END;
GO
IF OBJECT_ID ( '[sp_addApplicationHomeSubCluster]', 'P' ) IS NOT NULL
DROP PROCEDURE [sp_addApplicationHomeSubCluster];
IF OBJECT_ID ( '[sp_addReservationHomeSubCluster]', 'P' ) IS NOT NULL
DROP PROCEDURE [sp_addReservationHomeSubCluster];
GO
CREATE PROCEDURE [dbo].[sp_addReservationHomeSubCluster]
@reservationId VARCHAR(128),
@homeSubCluster VARCHAR(256),
@storedHomeSubCluster VARCHAR(256) OUTPUT,
@rowCount int OUTPUT
@reservationId_IN VARCHAR(128),
@homeSubCluster_IN VARCHAR(256),
@storedHomeSubCluster_OUT VARCHAR(256) OUTPUT,
@rowCount_OUT int OUTPUT
AS BEGIN
DECLARE @errorMessage nvarchar(4000)
@ -542,21 +542,21 @@ AS BEGIN
-- Otherwise don't change the current mapping.
IF NOT EXISTS (SELECT TOP 1 *
FROM [dbo].[reservationsHomeSubCluster]
WHERE [reservationId] = @reservationId)
WHERE [reservationId] = @reservationId_IN)
INSERT INTO [dbo].[reservationsHomeSubCluster] (
[reservationId],
[homeSubCluster])
VALUES (
@reservationId,
@homeSubCluster);
@reservationId_IN,
@homeSubCluster_IN);
-- End of the IF block
SELECT @rowCount = @@ROWCOUNT;
SELECT @rowCount_OUT = @@ROWCOUNT;
SELECT @storedHomeSubCluster = [homeSubCluster]
SELECT @storedHomeSubCluster_OUT = [homeSubCluster]
FROM [dbo].[reservationsHomeSubCluster]
WHERE [reservationId] = @reservationId;
WHERE [reservationId] = @reservationId_IN;
COMMIT TRAN
END TRY
@ -580,9 +580,9 @@ IF OBJECT_ID ( '[sp_updateReservationHomeSubCluster]', 'P' ) IS NOT NULL
GO
CREATE PROCEDURE [dbo].[sp_updateReservationHomeSubCluster]
@reservationId VARCHAR(128),
@homeSubCluster VARCHAR(256),
@rowCount int OUTPUT
@reservationId_IN VARCHAR(128),
@homeSubCluster_IN VARCHAR(256),
@rowCount_OUT int OUTPUT
AS BEGIN
DECLARE @errorMessage nvarchar(4000)
@ -590,9 +590,9 @@ AS BEGIN
BEGIN TRAN
UPDATE [dbo].[reservationsHomeSubCluster]
SET [homeSubCluster] = @homeSubCluster
WHERE [reservationId] = @reservationId;
SELECT @rowCount = @@ROWCOUNT;
SET [homeSubCluster] = @homeSubCluster_IN
WHERE [reservationId] = @reservationId_IN;
SELECT @rowCount_OUT = @@ROWCOUNT;
COMMIT TRAN
END TRY
@ -641,16 +641,16 @@ IF OBJECT_ID ( '[sp_getReservationHomeSubCluster]', 'P' ) IS NOT NULL
GO
CREATE PROCEDURE [dbo].[sp_getReservationHomeSubCluster]
@reservationId VARCHAR(128),
@homeSubCluster VARCHAR(256) OUTPUT
@reservationId_IN VARCHAR(128),
@homeSubCluster_OUT VARCHAR(256) OUTPUT
AS BEGIN
DECLARE @errorMessage nvarchar(4000)
BEGIN TRY
SELECT @homeSubCluster = [homeSubCluster]
SELECT @homeSubCluster_OUT = [homeSubCluster]
FROM [dbo].[reservationsHomeSubCluster]
WHERE [reservationId] = @reservationId;
WHERE [reservationId] = @reservationId_IN;
END TRY
@ -672,8 +672,8 @@ IF OBJECT_ID ( '[sp_deleteReservationHomeSubCluster]', 'P' ) IS NOT NULL
GO
CREATE PROCEDURE [dbo].[sp_deleteReservationHomeSubCluster]
@reservationId VARCHAR(128),
@rowCount int OUTPUT
@reservationId_IN VARCHAR(128),
@rowCount_OUT int OUTPUT
AS BEGIN
DECLARE @errorMessage nvarchar(4000)
@ -681,8 +681,8 @@ AS BEGIN
BEGIN TRAN
DELETE FROM [dbo].[reservationsHomeSubCluster]
WHERE [reservationId] = @reservationId;
SELECT @rowCount = @@ROWCOUNT;
WHERE [reservationId] = @reservationId_IN;
SELECT @rowCount_OUT = @@ROWCOUNT;
COMMIT TRAN
END TRY

View File

@ -177,7 +177,7 @@ public class SQLFederationStateStore implements FederationStateStore {
private HikariDataSource dataSource = null;
private final Clock clock = new MonotonicClock();
@VisibleForTesting
Connection conn = null;
private Connection conn = null;
private int maxAppsInStateStore;
@Override
@ -197,8 +197,7 @@ public void init(Configuration conf) throws YarnException {
try {
Class.forName(driverClass);
} catch (ClassNotFoundException e) {
FederationStateStoreUtils.logAndThrowException(LOG,
"Driver class not found.", e);
FederationStateStoreUtils.logAndThrowException(LOG, "Driver class not found.", e);
}
// Create the data source to pool connections in a thread-safe manner
@ -209,14 +208,14 @@ public void init(Configuration conf) throws YarnException {
FederationStateStoreUtils.setProperty(dataSource,
FederationStateStoreUtils.FEDERATION_STORE_URL, url);
dataSource.setMaximumPoolSize(maximumPoolSize);
LOG.info("Initialized connection pool to the Federation StateStore "
+ "database at address: " + url);
LOG.info("Initialized connection pool to the Federation StateStore database at address: {}.",
url);
try {
conn = getConnection();
LOG.debug("Connection created");
} catch (SQLException e) {
FederationStateStoreUtils.logAndThrowRetriableException(LOG,
"Not able to get Connection", e);
FederationStateStoreUtils.logAndThrowRetriableException(LOG, "Not able to get Connection", e);
}
maxAppsInStateStore = conf.getInt(
@ -226,32 +225,29 @@ public void init(Configuration conf) throws YarnException {
@Override
public SubClusterRegisterResponse registerSubCluster(
SubClusterRegisterRequest registerSubClusterRequest)
throws YarnException {
SubClusterRegisterRequest registerSubClusterRequest) throws YarnException {
// Input validator
FederationMembershipStateStoreInputValidator
.validate(registerSubClusterRequest);
FederationMembershipStateStoreInputValidator.validate(registerSubClusterRequest);
CallableStatement cstmt = null;
SubClusterInfo subClusterInfo =
registerSubClusterRequest.getSubClusterInfo();
SubClusterInfo subClusterInfo = registerSubClusterRequest.getSubClusterInfo();
SubClusterId subClusterId = subClusterInfo.getSubClusterId();
try {
cstmt = getCallableStatement(CALL_SP_REGISTER_SUBCLUSTER);
// Set the parameters for the stored procedure
cstmt.setString(1, subClusterId.getId());
cstmt.setString(2, subClusterInfo.getAMRMServiceAddress());
cstmt.setString(3, subClusterInfo.getClientRMServiceAddress());
cstmt.setString(4, subClusterInfo.getRMAdminServiceAddress());
cstmt.setString(5, subClusterInfo.getRMWebServiceAddress());
cstmt.setString(6, subClusterInfo.getState().toString());
cstmt.setLong(7, subClusterInfo.getLastStartTime());
cstmt.setString(8, subClusterInfo.getCapability());
cstmt.registerOutParameter(9, java.sql.Types.INTEGER);
cstmt.setString("subClusterId_IN", subClusterId.getId());
cstmt.setString("amRMServiceAddress_IN", subClusterInfo.getAMRMServiceAddress());
cstmt.setString("clientRMServiceAddress_IN", subClusterInfo.getClientRMServiceAddress());
cstmt.setString("rmAdminServiceAddress_IN", subClusterInfo.getRMAdminServiceAddress());
cstmt.setString("rmWebServiceAddress_IN", subClusterInfo.getRMWebServiceAddress());
cstmt.setString("state_IN", subClusterInfo.getState().toString());
cstmt.setLong("lastStartTime_IN", subClusterInfo.getLastStartTime());
cstmt.setString("capability_IN", subClusterInfo.getCapability());
cstmt.registerOutParameter("rowCount_OUT", java.sql.Types.INTEGER);
// Execute the query
long startTime = clock.getTime();
@ -260,30 +256,26 @@ public SubClusterRegisterResponse registerSubCluster(
// Check the ROWCOUNT value, if it is equal to 0 it means the call
// did not add a new subcluster into FederationStateStore
if (cstmt.getInt(9) == 0) {
String errMsg = "SubCluster " + subClusterId
+ " was not registered into the StateStore";
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
int rowCount = cstmt.getInt("rowCount_OUT");
if (rowCount == 0) {
FederationStateStoreUtils.logAndThrowStoreException(LOG,
"SubCluster %s was not registered into the StateStore.", subClusterId);
}
// Check the ROWCOUNT value, if it is different from 1 it means the call
// had a wrong behavior. Maybe the database is not set correctly.
if (cstmt.getInt(9) != 1) {
String errMsg = "Wrong behavior during registration of SubCluster "
+ subClusterId + " into the StateStore";
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
if (rowCount != 1) {
FederationStateStoreUtils.logAndThrowStoreException(LOG,
"Wrong behavior during registration of SubCluster %s into the StateStore",
subClusterId);
}
LOG.info(
"Registered the SubCluster " + subClusterId + " into the StateStore");
FederationStateStoreClientMetrics
.succeededStateStoreCall(stopTime - startTime);
LOG.info("Registered the SubCluster {} into the StateStore.", subClusterId);
FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - startTime);
} catch (SQLException e) {
FederationStateStoreClientMetrics.failedStateStoreCall();
FederationStateStoreUtils.logAndThrowRetriableException(LOG,
"Unable to register the SubCluster " + subClusterId
+ " into the StateStore",
e);
FederationStateStoreUtils.logAndThrowRetriableException(e,
LOG, "Unable to register the SubCluster %s into the StateStore.", subClusterId);
} finally {
// Return to the pool the CallableStatement
FederationStateStoreUtils.returnToPool(LOG, cstmt);
@ -294,12 +286,10 @@ public SubClusterRegisterResponse registerSubCluster(
@Override
public SubClusterDeregisterResponse deregisterSubCluster(
SubClusterDeregisterRequest subClusterDeregisterRequest)
throws YarnException {
SubClusterDeregisterRequest subClusterDeregisterRequest) throws YarnException {
// Input validator
FederationMembershipStateStoreInputValidator
.validate(subClusterDeregisterRequest);
FederationMembershipStateStoreInputValidator.validate(subClusterDeregisterRequest);
CallableStatement cstmt = null;
@ -310,9 +300,9 @@ public SubClusterDeregisterResponse deregisterSubCluster(
cstmt = getCallableStatement(CALL_SP_DEREGISTER_SUBCLUSTER);
// Set the parameters for the stored procedure
cstmt.setString(1, subClusterId.getId());
cstmt.setString(2, state.toString());
cstmt.registerOutParameter(3, java.sql.Types.INTEGER);
cstmt.setString("subClusterId_IN", subClusterId.getId());
cstmt.setString("state_IN", state.toString());
cstmt.registerOutParameter("rowCount_OUT", java.sql.Types.INTEGER);
// Execute the query
long startTime = clock.getTime();
@ -321,29 +311,25 @@ public SubClusterDeregisterResponse deregisterSubCluster(
// Check the ROWCOUNT value, if it is equal to 0 it means the call
// did not deregister the subcluster into FederationStateStore
if (cstmt.getInt(3) == 0) {
String errMsg = "SubCluster " + subClusterId + " not found";
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
int rowCount = cstmt.getInt("rowCount_OUT");
if (rowCount == 0) {
FederationStateStoreUtils.logAndThrowStoreException(LOG,
"SubCluster %s not found.", subClusterId);
}
// Check the ROWCOUNT value, if it is different from 1 it means the call
// had a wrong behavior. Maybe the database is not set correctly.
if (cstmt.getInt(3) != 1) {
String errMsg = "Wrong behavior during deregistration of SubCluster "
+ subClusterId + " from the StateStore";
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
if (rowCount != 1) {
FederationStateStoreUtils.logAndThrowStoreException(LOG,
"Wrong behavior during deregistration of SubCluster %s from the StateStore.",
subClusterId);
}
LOG.info("Deregistered the SubCluster " + subClusterId + " state to "
+ state.toString());
FederationStateStoreClientMetrics
.succeededStateStoreCall(stopTime - startTime);
LOG.info("Deregistered the SubCluster {} state to {}.", subClusterId, state.toString());
FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - startTime);
} catch (SQLException e) {
FederationStateStoreClientMetrics.failedStateStoreCall();
FederationStateStoreUtils.logAndThrowRetriableException(LOG,
"Unable to deregister the sub-cluster " + subClusterId + " state to "
+ state.toString(),
e);
FederationStateStoreUtils.logAndThrowRetriableException(e, LOG,
"Unable to deregister the sub-cluster %s state to %s.", subClusterId, state.toString());
} finally {
// Return to the pool the CallableStatement
FederationStateStoreUtils.returnToPool(LOG, cstmt);
@ -353,12 +339,10 @@ public SubClusterDeregisterResponse deregisterSubCluster(
@Override
public SubClusterHeartbeatResponse subClusterHeartbeat(
SubClusterHeartbeatRequest subClusterHeartbeatRequest)
throws YarnException {
SubClusterHeartbeatRequest subClusterHeartbeatRequest) throws YarnException {
// Input validator
FederationMembershipStateStoreInputValidator
.validate(subClusterHeartbeatRequest);
FederationMembershipStateStoreInputValidator.validate(subClusterHeartbeatRequest);
CallableStatement cstmt = null;
@ -369,10 +353,10 @@ public SubClusterHeartbeatResponse subClusterHeartbeat(
cstmt = getCallableStatement(CALL_SP_SUBCLUSTER_HEARTBEAT);
// Set the parameters for the stored procedure
cstmt.setString(1, subClusterId.getId());
cstmt.setString(2, state.toString());
cstmt.setString(3, subClusterHeartbeatRequest.getCapability());
cstmt.registerOutParameter(4, java.sql.Types.INTEGER);
cstmt.setString("subClusterId_IN", subClusterId.getId());
cstmt.setString("state_IN", state.toString());
cstmt.setString("capability_IN", subClusterHeartbeatRequest.getCapability());
cstmt.registerOutParameter("rowCount_OUT", java.sql.Types.INTEGER);
// Execute the query
long startTime = clock.getTime();
@ -381,30 +365,25 @@ public SubClusterHeartbeatResponse subClusterHeartbeat(
// Check the ROWCOUNT value, if it is equal to 0 it means the call
// did not update the subcluster into FederationStateStore
if (cstmt.getInt(4) == 0) {
String errMsg = "SubCluster " + subClusterId.toString()
+ " does not exist; cannot heartbeat";
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
int rowCount = cstmt.getInt("rowCount_OUT");
if (rowCount == 0) {
FederationStateStoreUtils.logAndThrowStoreException(LOG,
"SubCluster %s does not exist; cannot heartbeat.", subClusterId);
}
// Check the ROWCOUNT value, if it is different from 1 it means the call
// had a wrong behavior. Maybe the database is not set correctly.
if (cstmt.getInt(4) != 1) {
String errMsg =
"Wrong behavior during the heartbeat of SubCluster " + subClusterId;
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
if (rowCount != 1) {
FederationStateStoreUtils.logAndThrowStoreException(LOG,
"Wrong behavior during the heartbeat of SubCluster %s.", subClusterId);
}
LOG.info("Heartbeated the StateStore for the specified SubCluster "
+ subClusterId);
FederationStateStoreClientMetrics
.succeededStateStoreCall(stopTime - startTime);
LOG.info("Heartbeated the StateStore for the specified SubCluster {}.", subClusterId);
FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - startTime);
} catch (SQLException e) {
FederationStateStoreClientMetrics.failedStateStoreCall();
FederationStateStoreUtils.logAndThrowRetriableException(LOG,
"Unable to heartbeat the StateStore for the specified SubCluster "
+ subClusterId,
e);
FederationStateStoreUtils.logAndThrowRetriableException(e, LOG,
"Unable to heartbeat the StateStore for the specified SubCluster %s.", subClusterId);
} finally {
// Return to the pool the CallableStatement
FederationStateStoreUtils.returnToPool(LOG, cstmt);
@ -426,27 +405,27 @@ public GetSubClusterInfoResponse getSubCluster(
try {
cstmt = getCallableStatement(CALL_SP_GET_SUBCLUSTER);
cstmt.setString(1, subClusterId.getId());
cstmt.setString("subClusterId_IN", subClusterId.getId());
// Set the parameters for the stored procedure
cstmt.registerOutParameter(2, java.sql.Types.VARCHAR);
cstmt.registerOutParameter(3, java.sql.Types.VARCHAR);
cstmt.registerOutParameter(4, java.sql.Types.VARCHAR);
cstmt.registerOutParameter(5, java.sql.Types.VARCHAR);
cstmt.registerOutParameter(6, java.sql.Types.TIMESTAMP);
cstmt.registerOutParameter(7, java.sql.Types.VARCHAR);
cstmt.registerOutParameter(8, java.sql.Types.BIGINT);
cstmt.registerOutParameter(9, java.sql.Types.VARCHAR);
cstmt.registerOutParameter("amRMServiceAddress_OUT", java.sql.Types.VARCHAR);
cstmt.registerOutParameter("clientRMServiceAddress_OUT", java.sql.Types.VARCHAR);
cstmt.registerOutParameter("rmAdminServiceAddress_OUT", java.sql.Types.VARCHAR);
cstmt.registerOutParameter("rmWebServiceAddress_OUT", java.sql.Types.VARCHAR);
cstmt.registerOutParameter("lastHeartBeat_OUT", java.sql.Types.TIMESTAMP);
cstmt.registerOutParameter("state_OUT", java.sql.Types.VARCHAR);
cstmt.registerOutParameter("lastStartTime_OUT", java.sql.Types.BIGINT);
cstmt.registerOutParameter("capability_OUT", java.sql.Types.VARCHAR);
// Execute the query
long startTime = clock.getTime();
cstmt.execute();
long stopTime = clock.getTime();
String amRMAddress = cstmt.getString(2);
String clientRMAddress = cstmt.getString(3);
String rmAdminAddress = cstmt.getString(4);
String webAppAddress = cstmt.getString(5);
String amRMAddress = cstmt.getString("amRMServiceAddress_OUT");
String clientRMAddress = cstmt.getString("clientRMServiceAddress_OUT");
String rmAdminAddress = cstmt.getString("rmAdminServiceAddress_OUT");
String webAppAddress = cstmt.getString("rmWebServiceAddress_OUT");
// first check if the subCluster exists
if((amRMAddress == null) || (clientRMAddress == null)) {
@ -454,36 +433,31 @@ public GetSubClusterInfoResponse getSubCluster(
return null;
}
Timestamp heartBeatTimeStamp = cstmt.getTimestamp(6, utcCalendar);
long lastHeartBeat =
heartBeatTimeStamp != null ? heartBeatTimeStamp.getTime() : 0;
Timestamp heartBeatTimeStamp = cstmt.getTimestamp("lastHeartBeat_OUT", utcCalendar);
long lastHeartBeat = heartBeatTimeStamp != null ? heartBeatTimeStamp.getTime() : 0;
SubClusterState state = SubClusterState.fromString(cstmt.getString(7));
long lastStartTime = cstmt.getLong(8);
String capability = cstmt.getString(9);
SubClusterState state = SubClusterState.fromString(cstmt.getString("state_OUT"));
long lastStartTime = cstmt.getLong("lastStartTime_OUT");
String capability = cstmt.getString("capability_OUT");
subClusterInfo = SubClusterInfo.newInstance(subClusterId, amRMAddress,
clientRMAddress, rmAdminAddress, webAppAddress, lastHeartBeat, state,
lastStartTime, capability);
FederationStateStoreClientMetrics
.succeededStateStoreCall(stopTime - startTime);
FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - startTime);
// Check if the output it is a valid subcluster
try {
FederationMembershipStateStoreInputValidator
.checkSubClusterInfo(subClusterInfo);
FederationMembershipStateStoreInputValidator.checkSubClusterInfo(subClusterInfo);
} catch (FederationStateStoreInvalidInputException e) {
String errMsg =
"SubCluster " + subClusterId.toString() + " does not exist";
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
FederationStateStoreUtils.logAndThrowStoreException(e, LOG,
"SubCluster %s does not exist.", subClusterId);
}
LOG.debug("Got the information about the specified SubCluster {}",
subClusterInfo);
LOG.debug("Got the information about the specified SubCluster {}", subClusterInfo);
} catch (SQLException e) {
FederationStateStoreClientMetrics.failedStateStoreCall();
FederationStateStoreUtils.logAndThrowRetriableException(LOG,
"Unable to obtain the SubCluster information for " + subClusterId, e);
FederationStateStoreUtils.logAndThrowRetriableException(e, LOG,
"Unable to obtain the SubCluster information for %s.", subClusterId);
} finally {
// Return to the pool the CallableStatement
FederationStateStoreUtils.returnToPool(LOG, cstmt);
@ -496,7 +470,7 @@ public GetSubClustersInfoResponse getSubClusters(
GetSubClustersInfoRequest subClustersRequest) throws YarnException {
CallableStatement cstmt = null;
ResultSet rs = null;
List<SubClusterInfo> subClusters = new ArrayList<SubClusterInfo>();
List<SubClusterInfo> subClusters = new ArrayList<>();
try {
cstmt = getCallableStatement(CALL_SP_GET_SUBCLUSTERS);
@ -509,15 +483,15 @@ public GetSubClustersInfoResponse getSubClusters(
while (rs.next()) {
// Extract the output for each tuple
String subClusterName = rs.getString(1);
String amRMAddress = rs.getString(2);
String clientRMAddress = rs.getString(3);
String rmAdminAddress = rs.getString(4);
String webAppAddress = rs.getString(5);
long lastHeartBeat = rs.getTimestamp(6, utcCalendar).getTime();
SubClusterState state = SubClusterState.fromString(rs.getString(7));
long lastStartTime = rs.getLong(8);
String capability = rs.getString(9);
String subClusterName = rs.getString("subClusterId");
String amRMAddress = rs.getString("amRMServiceAddress");
String clientRMAddress = rs.getString("clientRMServiceAddress");
String rmAdminAddress = rs.getString("rmAdminServiceAddress");
String webAppAddress = rs.getString("rmWebServiceAddress");
long lastHeartBeat = rs.getTimestamp("lastHeartBeat", utcCalendar).getTime();
SubClusterState state = SubClusterState.fromString(rs.getString("state"));
long lastStartTime = rs.getLong("lastStartTime");
String capability = rs.getString("capability");
SubClusterId subClusterId = SubClusterId.newInstance(subClusterName);
SubClusterInfo subClusterInfo = SubClusterInfo.newInstance(subClusterId,
@ -527,15 +501,12 @@ public GetSubClustersInfoResponse getSubClusters(
FederationStateStoreClientMetrics
.succeededStateStoreCall(stopTime - startTime);
// Check if the output it is a valid subcluster
try {
FederationMembershipStateStoreInputValidator
.checkSubClusterInfo(subClusterInfo);
FederationMembershipStateStoreInputValidator.checkSubClusterInfo(subClusterInfo);
} catch (FederationStateStoreInvalidInputException e) {
String errMsg =
"SubCluster " + subClusterId.toString() + " is not valid";
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
FederationStateStoreUtils.logAndThrowStoreException(e, LOG,
"SubCluster %s is not valid.", subClusterId);
}
// Filter the inactive
@ -575,68 +546,61 @@ public AddApplicationHomeSubClusterResponse addApplicationHomeSubCluster(
cstmt = getCallableStatement(CALL_SP_ADD_APPLICATION_HOME_SUBCLUSTER);
// Set the parameters for the stored procedure
cstmt.setString(1, appId.toString());
cstmt.setString(2, subClusterId.getId());
cstmt.registerOutParameter(3, java.sql.Types.VARCHAR);
cstmt.registerOutParameter(4, java.sql.Types.INTEGER);
cstmt.setString("applicationId_IN", appId.toString());
cstmt.setString("homeSubCluster_IN", subClusterId.getId());
cstmt.registerOutParameter("storedHomeSubCluster_OUT", java.sql.Types.VARCHAR);
cstmt.registerOutParameter("rowCount_OUT", java.sql.Types.INTEGER);
// Execute the query
long startTime = clock.getTime();
cstmt.executeUpdate();
long stopTime = clock.getTime();
subClusterHome = cstmt.getString(3);
subClusterHome = cstmt.getString("storedHomeSubCluster_OUT");
SubClusterId subClusterIdHome = SubClusterId.newInstance(subClusterHome);
FederationStateStoreClientMetrics
.succeededStateStoreCall(stopTime - startTime);
FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - startTime);
// For failover reason, we check the returned SubClusterId.
// If it is equal to the subclusterId we sent, the call added the new
// application into FederationStateStore. If the call returns a different
// SubClusterId it means we already tried to insert this application but a
// component (Router/StateStore/RM) failed during the submission.
int rowCount = cstmt.getInt("rowCount_OUT");
if (subClusterId.equals(subClusterIdHome)) {
// Check the ROWCOUNT value, if it is equal to 0 it means the call
// did not add a new application into FederationStateStore
if (cstmt.getInt(4) == 0) {
LOG.info(
"The application {} was not inserted in the StateStore because it"
+ " was already present in SubCluster {}",
appId, subClusterHome);
} else if (cstmt.getInt(4) != 1) {
if (rowCount == 0) {
LOG.info("The application {} was not inserted in the StateStore because it"
+ " was already present in SubCluster {}", appId, subClusterHome);
} else if (cstmt.getInt("rowCount_OUT") != 1) {
// Check the ROWCOUNT value, if it is different from 1 it means the
// call had a wrong behavior. Maybe the database is not set correctly.
String errMsg = "Wrong behavior during the insertion of SubCluster "
+ subClusterId;
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
FederationStateStoreUtils.logAndThrowStoreException(LOG,
"Wrong behavior during the insertion of SubCluster %s.", subClusterId);
}
LOG.info("Insert into the StateStore the application: " + appId
+ " in SubCluster: " + subClusterHome);
LOG.info("Insert into the StateStore the application: {} in SubCluster: {}.",
appId, subClusterHome);
} else {
// Check the ROWCOUNT value, if it is different from 0 it means the call
// did edited the table
if (cstmt.getInt(4) != 0) {
String errMsg =
"The application " + appId + " does exist but was overwritten";
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
if (rowCount != 0) {
FederationStateStoreUtils.logAndThrowStoreException(LOG,
"The application %s does exist but was overwritten.", appId);
}
LOG.info("Application: " + appId + " already present with SubCluster: "
+ subClusterHome);
LOG.info("Application: {} already present with SubCluster: {}.", appId, subClusterHome);
}
} catch (SQLException e) {
FederationStateStoreClientMetrics.failedStateStoreCall();
FederationStateStoreUtils
.logAndThrowRetriableException(LOG,
"Unable to insert the newly generated application "
+ request.getApplicationHomeSubCluster().getApplicationId(),
e);
FederationStateStoreUtils.logAndThrowRetriableException(e, LOG,
"Unable to insert the newly generated application %s.", appId);
} finally {
// Return to the pool the CallableStatement
FederationStateStoreUtils.returnToPool(LOG, cstmt);
}
return AddApplicationHomeSubClusterResponse
.newInstance(SubClusterId.newInstance(subClusterHome));
}
@ -659,9 +623,9 @@ public UpdateApplicationHomeSubClusterResponse updateApplicationHomeSubCluster(
cstmt = getCallableStatement(CALL_SP_UPDATE_APPLICATION_HOME_SUBCLUSTER);
// Set the parameters for the stored procedure
cstmt.setString(1, appId.toString());
cstmt.setString(2, subClusterId.getId());
cstmt.registerOutParameter(3, java.sql.Types.INTEGER);
cstmt.setString("applicationId_IN", appId.toString());
cstmt.setString("homeSubCluster_IN", subClusterId.getId());
cstmt.registerOutParameter("rowCount_OUT", java.sql.Types.INTEGER);
// Execute the query
long startTime = clock.getTime();
@ -670,31 +634,25 @@ public UpdateApplicationHomeSubClusterResponse updateApplicationHomeSubCluster(
// Check the ROWCOUNT value, if it is equal to 0 it means the call
// did not update the application into FederationStateStore
if (cstmt.getInt(3) == 0) {
String errMsg = "Application " + appId + " does not exist";
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
int rowCount = cstmt.getInt("rowCount_OUT");
if (rowCount == 0) {
FederationStateStoreUtils.logAndThrowStoreException(LOG,
"Application %s does not exist.", appId);
}
// Check the ROWCOUNT value, if it is different from 1 it means the call
// had a wrong behavior. Maybe the database is not set correctly.
if (cstmt.getInt(3) != 1) {
String errMsg =
"Wrong behavior during the update of SubCluster " + subClusterId;
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
if (cstmt.getInt("rowCount_OUT") != 1) {
FederationStateStoreUtils.logAndThrowStoreException(LOG,
"Wrong behavior during the update of SubCluster %s.", subClusterId);
}
LOG.info(
"Update the SubCluster to {} for application {} in the StateStore",
LOG.info("Update the SubCluster to {} for application {} in the StateStore",
subClusterId, appId);
FederationStateStoreClientMetrics
.succeededStateStoreCall(stopTime - startTime);
FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - startTime);
} catch (SQLException e) {
FederationStateStoreClientMetrics.failedStateStoreCall();
FederationStateStoreUtils
.logAndThrowRetriableException(LOG,
"Unable to update the application "
+ request.getApplicationHomeSubCluster().getApplicationId(),
e);
FederationStateStoreUtils.logAndThrowRetriableException(e, LOG,
"Unable to update the application %s.", appId);
} finally {
// Return to the pool the CallableStatement
FederationStateStoreUtils.returnToPool(LOG, cstmt);
@ -712,44 +670,43 @@ public GetApplicationHomeSubClusterResponse getApplicationHomeSubCluster(
SubClusterId homeRM = null;
ApplicationId applicationId = request.getApplicationId();
try {
cstmt = getCallableStatement(CALL_SP_GET_APPLICATION_HOME_SUBCLUSTER);
// Set the parameters for the stored procedure
cstmt.setString(1, request.getApplicationId().toString());
cstmt.registerOutParameter(2, java.sql.Types.VARCHAR);
cstmt.setString("applicationId_IN", applicationId.toString());
cstmt.registerOutParameter("homeSubCluster_OUT", java.sql.Types.VARCHAR);
// Execute the query
long startTime = clock.getTime();
cstmt.execute();
long stopTime = clock.getTime();
if (cstmt.getString(2) != null) {
homeRM = SubClusterId.newInstance(cstmt.getString(2));
String homeSubCluster = cstmt.getString("homeSubCluster_OUT");
if (homeSubCluster != null) {
homeRM = SubClusterId.newInstance(homeSubCluster);
} else {
String errMsg =
"Application " + request.getApplicationId() + " does not exist";
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
FederationStateStoreUtils.logAndThrowStoreException(LOG,
"Application %s does not exist.", applicationId);
}
LOG.debug("Got the information about the specified application {}."
+ " The AM is running in {}", request.getApplicationId(), homeRM);
+ " The AM is running in {}", applicationId, homeRM);
FederationStateStoreClientMetrics
.succeededStateStoreCall(stopTime - startTime);
FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - startTime);
} catch (SQLException e) {
FederationStateStoreClientMetrics.failedStateStoreCall();
FederationStateStoreUtils.logAndThrowRetriableException(LOG,
"Unable to obtain the application information "
+ "for the specified application " + request.getApplicationId(),
e);
FederationStateStoreUtils.logAndThrowRetriableException(e, LOG,
"Unable to obtain the application information for the specified application %s.",
applicationId);
} finally {
// Return to the pool the CallableStatement
FederationStateStoreUtils.returnToPool(LOG, cstmt);
}
return GetApplicationHomeSubClusterResponse
.newInstance(request.getApplicationId(), homeRM);
return GetApplicationHomeSubClusterResponse.newInstance(request.getApplicationId(), homeRM);
}
@Override
@ -790,8 +747,7 @@ public GetApplicationsHomeSubClusterResponse getApplicationsHomeSubCluster(
SubClusterId.newInstance(homeSubCluster)));
}
FederationStateStoreClientMetrics
.succeededStateStoreCall(stopTime - startTime);
FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - startTime);
} catch (SQLException e) {
FederationStateStoreClientMetrics.failedStateStoreCall();
@ -813,13 +769,13 @@ public DeleteApplicationHomeSubClusterResponse deleteApplicationHomeSubCluster(
FederationApplicationHomeSubClusterStoreInputValidator.validate(request);
CallableStatement cstmt = null;
ApplicationId applicationId = request.getApplicationId();
try {
cstmt = getCallableStatement(CALL_SP_DELETE_APPLICATION_HOME_SUBCLUSTER);
// Set the parameters for the stored procedure
cstmt.setString(1, request.getApplicationId().toString());
cstmt.registerOutParameter(2, java.sql.Types.INTEGER);
cstmt.setString("applicationId_IN", applicationId.toString());
cstmt.registerOutParameter("rowCount_OUT", java.sql.Types.INTEGER);
// Execute the query
long startTime = clock.getTime();
@ -828,28 +784,25 @@ public DeleteApplicationHomeSubClusterResponse deleteApplicationHomeSubCluster(
// Check the ROWCOUNT value, if it is equal to 0 it means the call
// did not delete the application from FederationStateStore
if (cstmt.getInt(2) == 0) {
String errMsg =
"Application " + request.getApplicationId() + " does not exist";
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
int rowCount = cstmt.getInt("rowCount_OUT");
if (rowCount == 0) {
FederationStateStoreUtils.logAndThrowStoreException(LOG,
"Application %s does not exist.", applicationId);
}
// Check the ROWCOUNT value, if it is different from 1 it means the call
// had a wrong behavior. Maybe the database is not set correctly.
if (cstmt.getInt(2) != 1) {
String errMsg = "Wrong behavior during deleting the application "
+ request.getApplicationId();
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
if (cstmt.getInt("rowCount_OUT") != 1) {
FederationStateStoreUtils.logAndThrowStoreException(LOG,
"Wrong behavior during deleting the application %s.", applicationId);
}
LOG.info("Delete from the StateStore the application: {}",
request.getApplicationId());
FederationStateStoreClientMetrics
.succeededStateStoreCall(stopTime - startTime);
LOG.info("Delete from the StateStore the application: {}", request.getApplicationId());
FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - startTime);
} catch (SQLException e) {
FederationStateStoreClientMetrics.failedStateStoreCall();
FederationStateStoreUtils.logAndThrowRetriableException(LOG,
"Unable to delete the application " + request.getApplicationId(), e);
FederationStateStoreUtils.logAndThrowRetriableException(e, LOG,
"Unable to delete the application %s.", applicationId);
} finally {
// Return to the pool the CallableStatement
FederationStateStoreUtils.returnToPool(LOG, cstmt);
@ -871,9 +824,9 @@ public GetSubClusterPolicyConfigurationResponse getPolicyConfiguration(
cstmt = getCallableStatement(CALL_SP_GET_POLICY_CONFIGURATION);
// Set the parameters for the stored procedure
cstmt.setString(1, request.getQueue());
cstmt.registerOutParameter(2, java.sql.Types.VARCHAR);
cstmt.registerOutParameter(3, java.sql.Types.VARBINARY);
cstmt.setString("queue_IN", request.getQueue());
cstmt.registerOutParameter("policyType_OUT", java.sql.Types.VARCHAR);
cstmt.registerOutParameter("params_OUT", java.sql.Types.VARBINARY);
// Execute the query
long startTime = clock.getTime();
@ -881,10 +834,11 @@ public GetSubClusterPolicyConfigurationResponse getPolicyConfiguration(
long stopTime = clock.getTime();
// Check if the output it is a valid policy
if (cstmt.getString(2) != null && cstmt.getBytes(3) != null) {
subClusterPolicyConfiguration =
SubClusterPolicyConfiguration.newInstance(request.getQueue(),
cstmt.getString(2), ByteBuffer.wrap(cstmt.getBytes(3)));
String policyType = cstmt.getString("policyType_OUT");
byte[] param = cstmt.getBytes("params_OUT");
if (policyType != null && param != null) {
subClusterPolicyConfiguration = SubClusterPolicyConfiguration.newInstance(
request.getQueue(), policyType, ByteBuffer.wrap(param));
LOG.debug("Selected from StateStore the policy for the queue: {}",
subClusterPolicyConfiguration);
} else {
@ -892,20 +846,17 @@ public GetSubClusterPolicyConfigurationResponse getPolicyConfiguration(
return null;
}
FederationStateStoreClientMetrics
.succeededStateStoreCall(stopTime - startTime);
FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - startTime);
} catch (SQLException e) {
FederationStateStoreClientMetrics.failedStateStoreCall();
FederationStateStoreUtils.logAndThrowRetriableException(LOG,
"Unable to select the policy for the queue :" + request.getQueue(),
e);
FederationStateStoreUtils.logAndThrowRetriableException(e, LOG,
"Unable to select the policy for the queue : %s." + request.getQueue());
} finally {
// Return to the pool the CallableStatement
FederationStateStoreUtils.returnToPool(LOG, cstmt);
}
return GetSubClusterPolicyConfigurationResponse
.newInstance(subClusterPolicyConfiguration);
return GetSubClusterPolicyConfigurationResponse.newInstance(subClusterPolicyConfiguration);
}
@Override
@ -923,10 +874,10 @@ public SetSubClusterPolicyConfigurationResponse setPolicyConfiguration(
cstmt = getCallableStatement(CALL_SP_SET_POLICY_CONFIGURATION);
// Set the parameters for the stored procedure
cstmt.setString(1, policyConf.getQueue());
cstmt.setString(2, policyConf.getType());
cstmt.setBytes(3, getByteArray(policyConf.getParams()));
cstmt.registerOutParameter(4, java.sql.Types.INTEGER);
cstmt.setString("queue_IN", policyConf.getQueue());
cstmt.setString("policyType_IN", policyConf.getType());
cstmt.setBytes("params_IN", getByteArray(policyConf.getParams()));
cstmt.registerOutParameter("rowCount_OUT", java.sql.Types.INTEGER);
// Execute the query
long startTime = clock.getTime();
@ -935,30 +886,25 @@ public SetSubClusterPolicyConfigurationResponse setPolicyConfiguration(
// Check the ROWCOUNT value, if it is equal to 0 it means the call
// did not add a new policy into FederationStateStore
if (cstmt.getInt(4) == 0) {
String errMsg = "The policy " + policyConf.getQueue()
+ " was not insert into the StateStore";
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
int rowCount = cstmt.getInt("rowCount_OUT");
if (rowCount == 0) {
FederationStateStoreUtils.logAndThrowStoreException(LOG,
"The policy %s was not insert into the StateStore.", policyConf.getQueue());
}
// Check the ROWCOUNT value, if it is different from 1 it means the call
// had a wrong behavior. Maybe the database is not set correctly.
if (cstmt.getInt(4) != 1) {
String errMsg =
"Wrong behavior during insert the policy " + policyConf.getQueue();
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
if (rowCount != 1) {
FederationStateStoreUtils.logAndThrowStoreException(LOG,
"Wrong behavior during insert the policy %s.", policyConf.getQueue());
}
LOG.info("Insert into the state store the policy for the queue: "
+ policyConf.getQueue());
FederationStateStoreClientMetrics
.succeededStateStoreCall(stopTime - startTime);
LOG.info("Insert into the state store the policy for the queue: {}.", policyConf.getQueue());
FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - startTime);
} catch (SQLException e) {
FederationStateStoreClientMetrics.failedStateStoreCall();
FederationStateStoreUtils.logAndThrowRetriableException(LOG,
"Unable to insert the newly generated policy for the queue :"
+ policyConf.getQueue(),
e);
FederationStateStoreUtils.logAndThrowRetriableException(e, LOG,
"Unable to insert the newly generated policy for the queue : %s.", policyConf.getQueue());
} finally {
// Return to the pool the CallableStatement
FederationStateStoreUtils.returnToPool(LOG, cstmt);
@ -972,8 +918,7 @@ public GetSubClusterPoliciesConfigurationsResponse getPoliciesConfigurations(
CallableStatement cstmt = null;
ResultSet rs = null;
List<SubClusterPolicyConfiguration> policyConfigurations =
new ArrayList<SubClusterPolicyConfiguration>();
List<SubClusterPolicyConfiguration> policyConfigurations = new ArrayList<>();
try {
cstmt = getCallableStatement(CALL_SP_GET_POLICIES_CONFIGURATIONS);
@ -984,20 +929,17 @@ public GetSubClusterPoliciesConfigurationsResponse getPoliciesConfigurations(
long stopTime = clock.getTime();
while (rs.next()) {
// Extract the output for each tuple
String queue = rs.getString(1);
String type = rs.getString(2);
byte[] policyInfo = rs.getBytes(3);
String queue = rs.getString("queue");
String type = rs.getString("policyType");
byte[] policyInfo = rs.getBytes("params");
SubClusterPolicyConfiguration subClusterPolicyConfiguration =
SubClusterPolicyConfiguration.newInstance(queue, type,
ByteBuffer.wrap(policyInfo));
SubClusterPolicyConfiguration.newInstance(queue, type, ByteBuffer.wrap(policyInfo));
policyConfigurations.add(subClusterPolicyConfiguration);
}
FederationStateStoreClientMetrics
.succeededStateStoreCall(stopTime - startTime);
FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - startTime);
} catch (SQLException e) {
FederationStateStoreClientMetrics.failedStateStoreCall();
@ -1008,8 +950,7 @@ public GetSubClusterPoliciesConfigurationsResponse getPoliciesConfigurations(
FederationStateStoreUtils.returnToPool(LOG, cstmt, null, rs);
}
return GetSubClusterPoliciesConfigurationsResponse
.newInstance(policyConfigurations);
return GetSubClusterPoliciesConfigurationsResponse.newInstance(policyConfigurations);
}
@Override

View File

@ -162,6 +162,29 @@ public static void logAndThrowStoreException(Logger log, String errMsgFormat, Ob
throw new FederationStateStoreException(errMsg);
}
/**
* Throws an <code>FederationStateStoreException</code> due to an error in
* <code>FederationStateStore</code>.
*
* @param t the throwable raised in the called class.
* @param log the logger interface.
* @param errMsgFormat the error message format string.
* @param args referenced by the format specifiers in the format string.
* @throws YarnException on failure
*/
public static void logAndThrowStoreException(
Throwable t, Logger log, String errMsgFormat, Object... args) throws YarnException {
String errMsg = String.format(errMsgFormat, args);
if (t != null) {
log.error(errMsg, t);
throw new FederationStateStoreException(errMsg, t);
} else {
log.error(errMsg);
throw new FederationStateStoreException(errMsg);
}
}
/**
* Throws an <code>FederationStateStoreInvalidInputException</code> due to an
* error in <code>FederationStateStore</code>.

View File

@ -325,7 +325,7 @@ public void init(Configuration conf) {
try {
conf.setInt(YarnConfiguration.FEDERATION_STATESTORE_MAX_APPLICATIONS, 10);
super.init(conf);
conn = super.conn;
conn = super.getConn();
LOG.info("Database Init: Start");
@ -365,7 +365,7 @@ public void init(Configuration conf) {
public void initConnection(Configuration conf) {
try {
super.init(conf);
conn = super.conn;
conn = super.getConn();
} catch (YarnException e1) {
LOG.error("ERROR: failed open connection to HSQLDB DB {}.", e1.getMessage());
}

View File

@ -447,7 +447,7 @@ public void testAddReservationHomeSubClusterAbnormalSituation() throws Exception
SQLFederationStateStore sqlFederationStateStore = (SQLFederationStateStore) stateStore;
Connection conn = sqlFederationStateStore.conn;
Connection conn = sqlFederationStateStore.getConn();
conn.prepareStatement(SP_DROP_ADDRESERVATIONHOMESUBCLUSTER).execute();
conn.prepareStatement(SP_ADDRESERVATIONHOMESUBCLUSTER2).execute();
@ -484,7 +484,7 @@ public void testUpdateReservationHomeSubClusterAbnormalSituation() throws Except
SQLFederationStateStore sqlFederationStateStore = (SQLFederationStateStore) stateStore;
Connection conn = sqlFederationStateStore.conn;
Connection conn = sqlFederationStateStore.getConn();
conn.prepareStatement(SP_DROP_UPDATERESERVATIONHOMESUBCLUSTER).execute();
conn.prepareStatement(SP_UPDATERESERVATIONHOMESUBCLUSTER2).execute();
@ -530,7 +530,7 @@ public void testDeleteReservationHomeSubClusterAbnormalSituation() throws Except
SQLFederationStateStore sqlFederationStateStore = (SQLFederationStateStore) stateStore;
Connection conn = sqlFederationStateStore.conn;
Connection conn = sqlFederationStateStore.getConn();
conn.prepareStatement(SP_DROP_DELETERESERVATIONHOMESUBCLUSTER).execute();
conn.prepareStatement(SP_DELETERESERVATIONHOMESUBCLUSTER2).execute();