YARN-11478. [Federation] SQLFederationStateStore Support Store ApplicationSubmitData. (#5663)

This commit is contained in:
slfan1989 2023-05-25 02:43:20 +08:00 committed by GitHub
parent e6b54f7f68
commit b977065cc4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 126 additions and 101 deletions

View File

@ -18,7 +18,7 @@
-- Script to generate all the stored procedures for the Federation StateStore in MySQL -- Script to generate all the stored procedures for the Federation StateStore in MySQL
USE FederationStateStore USE FederationStateStore;
DELIMITER // DELIMITER //
@ -89,11 +89,12 @@ END //
CREATE PROCEDURE sp_addApplicationHomeSubCluster( CREATE PROCEDURE sp_addApplicationHomeSubCluster(
IN applicationId_IN varchar(64), IN homeSubCluster_IN varchar(256), IN applicationId_IN varchar(64), IN homeSubCluster_IN varchar(256),
IN applicationContext_IN BLOB,
OUT storedHomeSubCluster_OUT varchar(256), OUT rowCount_OUT int) OUT storedHomeSubCluster_OUT varchar(256), OUT rowCount_OUT int)
BEGIN BEGIN
INSERT INTO applicationsHomeSubCluster INSERT INTO applicationsHomeSubCluster
(applicationId,homeSubCluster) (applicationId, homeSubCluster, createTime, applicationContext)
(SELECT applicationId_IN, homeSubCluster_IN (SELECT applicationId_IN, homeSubCluster_IN, NOW(), applicationContext_IN
FROM applicationsHomeSubCluster FROM applicationsHomeSubCluster
WHERE applicationId = applicationId_IN WHERE applicationId = applicationId_IN
HAVING COUNT(*) = 0 ); HAVING COUNT(*) = 0 );
@ -105,19 +106,23 @@ END //
CREATE PROCEDURE sp_updateApplicationHomeSubCluster( CREATE PROCEDURE sp_updateApplicationHomeSubCluster(
IN applicationId_IN varchar(64), IN applicationId_IN varchar(64),
IN homeSubCluster_IN varchar(256), OUT rowCount_OUT int) IN homeSubCluster_IN varchar(256), IN applicationContext_IN BLOB, OUT rowCount_OUT int)
BEGIN BEGIN
UPDATE applicationsHomeSubCluster UPDATE applicationsHomeSubCluster
SET homeSubCluster = homeSubCluster_IN SET homeSubCluster = homeSubCluster_IN,
applicationContext = applicationContext_IN
WHERE applicationId = applicationId_IN; WHERE applicationId = applicationId_IN;
SELECT ROW_COUNT() INTO rowCount_OUT; SELECT ROW_COUNT() INTO rowCount_OUT;
END // END //
CREATE PROCEDURE sp_getApplicationHomeSubCluster( CREATE PROCEDURE sp_getApplicationHomeSubCluster(
IN applicationId_IN varchar(64), IN applicationId_IN varchar(64),
OUT homeSubCluster_OUT varchar(256)) OUT homeSubCluster_OUT varchar(256),
OUT createTime_OUT datetime,
OUT applicationContext_OUT BLOB)
BEGIN BEGIN
SELECT homeSubCluster INTO homeSubCluster_OUT SELECT homeSubCluster, applicationContext, createTime
INTO homeSubCluster_OUT, applicationContext_OUT, createTime_OUT
FROM applicationsHomeSubCluster FROM applicationsHomeSubCluster
WHERE applicationId = applicationID_IN; WHERE applicationId = applicationID_IN;
END // END //

View File

@ -18,12 +18,13 @@
-- Script to generate all the tables for the Federation StateStore in MySQL -- Script to generate all the tables for the Federation StateStore in MySQL
USE FederationStateStore USE FederationStateStore;
CREATE TABLE applicationsHomeSubCluster( CREATE TABLE applicationsHomeSubCluster(
applicationId varchar(64) NOT NULL, applicationId varchar(64) NOT NULL,
homeSubCluster varchar(256) NOT NULL, homeSubCluster varchar(256) NOT NULL,
createTime datetime NOT NULL, createTime datetime NOT NULL,
applicationContext BLOB NULL,
CONSTRAINT pk_applicationId PRIMARY KEY (applicationId) CONSTRAINT pk_applicationId PRIMARY KEY (applicationId)
); );

View File

@ -26,6 +26,7 @@ GO
CREATE PROCEDURE [dbo].[sp_addApplicationHomeSubCluster] CREATE PROCEDURE [dbo].[sp_addApplicationHomeSubCluster]
@applicationId_IN VARCHAR(64), @applicationId_IN VARCHAR(64),
@homeSubCluster_IN VARCHAR(256), @homeSubCluster_IN VARCHAR(256),
@applicationContext_IN VARBINARY(MAX),
@storedHomeSubCluster_OUT VARCHAR(256) OUTPUT, @storedHomeSubCluster_OUT VARCHAR(256) OUTPUT,
@rowCount_OUT int OUTPUT @rowCount_OUT int OUTPUT
AS BEGIN AS BEGIN
@ -41,10 +42,14 @@ AS BEGIN
INSERT INTO [dbo].[applicationsHomeSubCluster] ( INSERT INTO [dbo].[applicationsHomeSubCluster] (
[applicationId], [applicationId],
[homeSubCluster]) [homeSubCluster],
[createTime],
[applicationContext])
VALUES ( VALUES (
@applicationId_IN, @applicationId_IN,
@homeSubCluster_IN); @homeSubCluster_IN,
GETUTCDATE(),
@applicationContext_IN);
-- End of the IF block -- End of the IF block
SELECT @rowCount_OUT = @@ROWCOUNT; SELECT @rowCount_OUT = @@ROWCOUNT;
@ -77,6 +82,7 @@ GO
CREATE PROCEDURE [dbo].[sp_updateApplicationHomeSubCluster] CREATE PROCEDURE [dbo].[sp_updateApplicationHomeSubCluster]
@applicationId_IN VARCHAR(64), @applicationId_IN VARCHAR(64),
@homeSubCluster_IN VARCHAR(256), @homeSubCluster_IN VARCHAR(256),
@applicationContext_IN VARBINARY(MAX),
@rowCount_OUT int OUTPUT @rowCount_OUT int OUTPUT
AS BEGIN AS BEGIN
DECLARE @errorMessage nvarchar(4000) DECLARE @errorMessage nvarchar(4000)
@ -85,7 +91,8 @@ AS BEGIN
BEGIN TRAN BEGIN TRAN
UPDATE [dbo].[applicationsHomeSubCluster] UPDATE [dbo].[applicationsHomeSubCluster]
SET [homeSubCluster] = @homeSubCluster_IN SET [homeSubCluster] = @homeSubCluster_IN,
[applicationContext] = @applicationContext_IN
WHERE [applicationId] = @applicationId_IN; WHERE [applicationId] = @applicationId_IN;
SELECT @rowCount_OUT = @@ROWCOUNT; SELECT @rowCount_OUT = @@ROWCOUNT;
@ -151,13 +158,17 @@ GO
CREATE PROCEDURE [dbo].[sp_getApplicationHomeSubCluster] CREATE PROCEDURE [dbo].[sp_getApplicationHomeSubCluster]
@applicationId_IN VARCHAR(64), @applicationId_IN VARCHAR(64),
@homeSubCluster_OUT VARCHAR(256) OUTPUT @homeSubCluster_OUT VARCHAR(256) OUTPUT,
@createTime_OUT datetime OUT,
@applicationContext_OUT VARBINARY(MAX) OUTPUT
AS BEGIN AS BEGIN
DECLARE @errorMessage nvarchar(4000) DECLARE @errorMessage nvarchar(4000)
BEGIN TRY BEGIN TRY
SELECT @homeSubCluster_OUT = [homeSubCluster] SELECT @homeSubCluster_OUT = [homeSubCluster],
@createTime_OUT = [createTime],
@applicationContext_OUT = [applicationContext]
FROM [dbo].[applicationsHomeSubCluster] FROM [dbo].[applicationsHomeSubCluster]
WHERE [applicationId] = @applicationId_IN; WHERE [applicationId] = @applicationId_IN;

View File

@ -35,7 +35,7 @@ IF NOT EXISTS ( SELECT * FROM [FederationStateStore].sys.tables
applicationId VARCHAR(64) COLLATE Latin1_General_100_BIN2 NOT NULL, applicationId VARCHAR(64) COLLATE Latin1_General_100_BIN2 NOT NULL,
homeSubCluster VARCHAR(256) NOT NULL, homeSubCluster VARCHAR(256) NOT NULL,
createTime DATETIME2 NOT NULL CONSTRAINT ts_createAppTime DEFAULT GETUTCDATE(), createTime DATETIME2 NOT NULL CONSTRAINT ts_createAppTime DEFAULT GETUTCDATE(),
applicationContext VARBINARY(MAX) NULL,
CONSTRAINT [pk_applicationId] PRIMARY KEY CONSTRAINT [pk_applicationId] PRIMARY KEY
( (
[applicationId] [applicationId]

View File

@ -18,13 +18,16 @@
package org.apache.hadoop.yarn.server.federation.store.impl; package org.apache.hadoop.yarn.server.federation.store.impl;
import java.io.ByteArrayInputStream;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.sql.CallableStatement; import java.sql.CallableStatement;
import java.sql.Connection; import java.sql.Connection;
import java.sql.ResultSet; import java.sql.ResultSet;
import java.sql.SQLException; import java.sql.SQLException;
import java.sql.Blob;
import java.sql.Timestamp; import java.sql.Timestamp;
import java.sql.Types;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Calendar; import java.util.Calendar;
import java.util.List; import java.util.List;
@ -35,10 +38,13 @@
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationSubmissionContextProto;
import org.apache.hadoop.yarn.security.client.YARNDelegationTokenIdentifier; import org.apache.hadoop.yarn.security.client.YARNDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreInvalidInputException; import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreInvalidInputException;
@ -145,16 +151,16 @@ public class SQLFederationStateStore implements FederationStateStore {
"{call sp_subClusterHeartbeat(?, ?, ?, ?)}"; "{call sp_subClusterHeartbeat(?, ?, ?, ?)}";
private static final String CALL_SP_ADD_APPLICATION_HOME_SUBCLUSTER = private static final String CALL_SP_ADD_APPLICATION_HOME_SUBCLUSTER =
"{call sp_addApplicationHomeSubCluster(?, ?, ?, ?)}"; "{call sp_addApplicationHomeSubCluster(?, ?, ?, ?, ?)}";
private static final String CALL_SP_UPDATE_APPLICATION_HOME_SUBCLUSTER = private static final String CALL_SP_UPDATE_APPLICATION_HOME_SUBCLUSTER =
"{call sp_updateApplicationHomeSubCluster(?, ?, ?)}"; "{call sp_updateApplicationHomeSubCluster(?, ?, ?, ?)}";
private static final String CALL_SP_DELETE_APPLICATION_HOME_SUBCLUSTER = private static final String CALL_SP_DELETE_APPLICATION_HOME_SUBCLUSTER =
"{call sp_deleteApplicationHomeSubCluster(?, ?)}"; "{call sp_deleteApplicationHomeSubCluster(?, ?)}";
private static final String CALL_SP_GET_APPLICATION_HOME_SUBCLUSTER = private static final String CALL_SP_GET_APPLICATION_HOME_SUBCLUSTER =
"{call sp_getApplicationHomeSubCluster(?, ?)}"; "{call sp_getApplicationHomeSubCluster(?, ?, ?, ?)}";
private static final String CALL_SP_GET_APPLICATIONS_HOME_SUBCLUSTER = private static final String CALL_SP_GET_APPLICATIONS_HOME_SUBCLUSTER =
"{call sp_getApplicationsHomeSubCluster(?, ?)}"; "{call sp_getApplicationsHomeSubCluster(?, ?)}";
@ -610,10 +616,12 @@ public AddApplicationHomeSubClusterResponse addApplicationHomeSubCluster(
CallableStatement cstmt = null; CallableStatement cstmt = null;
String subClusterHome = null; String subClusterHome = null;
ApplicationId appId = ApplicationHomeSubCluster applicationHomeSubCluster =
request.getApplicationHomeSubCluster().getApplicationId(); request.getApplicationHomeSubCluster();
SubClusterId subClusterId = ApplicationId appId = applicationHomeSubCluster.getApplicationId();
request.getApplicationHomeSubCluster().getHomeSubCluster(); SubClusterId subClusterId = applicationHomeSubCluster.getHomeSubCluster();
ApplicationSubmissionContext appSubmissionContext =
applicationHomeSubCluster.getApplicationSubmissionContext();
try { try {
cstmt = getCallableStatement(CALL_SP_ADD_APPLICATION_HOME_SUBCLUSTER); cstmt = getCallableStatement(CALL_SP_ADD_APPLICATION_HOME_SUBCLUSTER);
@ -621,6 +629,12 @@ public AddApplicationHomeSubClusterResponse addApplicationHomeSubCluster(
// Set the parameters for the stored procedure // Set the parameters for the stored procedure
cstmt.setString("applicationId_IN", appId.toString()); cstmt.setString("applicationId_IN", appId.toString());
cstmt.setString("homeSubCluster_IN", subClusterId.getId()); cstmt.setString("homeSubCluster_IN", subClusterId.getId());
if (appSubmissionContext != null) {
cstmt.setBlob("applicationContext_IN", new ByteArrayInputStream(
((ApplicationSubmissionContextPBImpl) appSubmissionContext).getProto().toByteArray()));
} else {
cstmt.setNull("applicationContext_IN", Types.BLOB);
}
cstmt.registerOutParameter("storedHomeSubCluster_OUT", VARCHAR); cstmt.registerOutParameter("storedHomeSubCluster_OUT", VARCHAR);
cstmt.registerOutParameter("rowCount_OUT", INTEGER); cstmt.registerOutParameter("rowCount_OUT", INTEGER);
@ -687,10 +701,12 @@ public UpdateApplicationHomeSubClusterResponse updateApplicationHomeSubCluster(
CallableStatement cstmt = null; CallableStatement cstmt = null;
ApplicationId appId = ApplicationHomeSubCluster applicationHomeSubCluster =
request.getApplicationHomeSubCluster().getApplicationId(); request.getApplicationHomeSubCluster();
SubClusterId subClusterId = ApplicationId appId = applicationHomeSubCluster.getApplicationId();
request.getApplicationHomeSubCluster().getHomeSubCluster(); SubClusterId subClusterId = applicationHomeSubCluster.getHomeSubCluster();
ApplicationSubmissionContext appSubmissionContext =
applicationHomeSubCluster.getApplicationSubmissionContext();
try { try {
cstmt = getCallableStatement(CALL_SP_UPDATE_APPLICATION_HOME_SUBCLUSTER); cstmt = getCallableStatement(CALL_SP_UPDATE_APPLICATION_HOME_SUBCLUSTER);
@ -698,6 +714,12 @@ public UpdateApplicationHomeSubClusterResponse updateApplicationHomeSubCluster(
// Set the parameters for the stored procedure // Set the parameters for the stored procedure
cstmt.setString("applicationId_IN", appId.toString()); cstmt.setString("applicationId_IN", appId.toString());
cstmt.setString("homeSubCluster_IN", subClusterId.getId()); cstmt.setString("homeSubCluster_IN", subClusterId.getId());
if (appSubmissionContext != null) {
cstmt.setBlob("applicationContext_IN", new ByteArrayInputStream(
((ApplicationSubmissionContextPBImpl) appSubmissionContext).getProto().toByteArray()));
} else {
cstmt.setNull("applicationContext_IN", Types.BLOB);
}
cstmt.registerOutParameter("rowCount_OUT", INTEGER); cstmt.registerOutParameter("rowCount_OUT", INTEGER);
// Execute the query // Execute the query
@ -742,8 +764,9 @@ public GetApplicationHomeSubClusterResponse getApplicationHomeSubCluster(
CallableStatement cstmt = null; CallableStatement cstmt = null;
SubClusterId homeRM = null; SubClusterId homeRM = null;
Long createTime = 0L;
ApplicationId applicationId = request.getApplicationId(); ApplicationId applicationId = request.getApplicationId();
ApplicationSubmissionContext appSubmissionContext = null;
try { try {
cstmt = getCallableStatement(CALL_SP_GET_APPLICATION_HOME_SUBCLUSTER); cstmt = getCallableStatement(CALL_SP_GET_APPLICATION_HOME_SUBCLUSTER);
@ -751,6 +774,8 @@ public GetApplicationHomeSubClusterResponse getApplicationHomeSubCluster(
// Set the parameters for the stored procedure // Set the parameters for the stored procedure
cstmt.setString("applicationId_IN", applicationId.toString()); cstmt.setString("applicationId_IN", applicationId.toString());
cstmt.registerOutParameter("homeSubCluster_OUT", VARCHAR); cstmt.registerOutParameter("homeSubCluster_OUT", VARCHAR);
cstmt.registerOutParameter("createTime_OUT", java.sql.Types.TIMESTAMP);
cstmt.registerOutParameter("applicationContext_OUT", Types.BLOB);
// Execute the query // Execute the query
long startTime = clock.getTime(); long startTime = clock.getTime();
@ -765,6 +790,15 @@ public GetApplicationHomeSubClusterResponse getApplicationHomeSubCluster(
"Application %s does not exist.", applicationId); "Application %s does not exist.", applicationId);
} }
Timestamp createTimeStamp = cstmt.getTimestamp("createTime_OUT", utcCalendar);
createTime = createTimeStamp != null ? createTimeStamp.getTime() : 0;
Blob blobAppContextData = cstmt.getBlob("applicationContext_OUT");
if (blobAppContextData != null && request.getContainsAppSubmissionContext()) {
appSubmissionContext = new ApplicationSubmissionContextPBImpl(
ApplicationSubmissionContextProto.parseFrom(blobAppContextData.getBinaryStream()));
}
LOG.debug("Got the information about the specified application {}." LOG.debug("Got the information about the specified application {}."
+ " The AM is running in {}", applicationId, homeRM); + " The AM is running in {}", applicationId, homeRM);
@ -775,11 +809,17 @@ public GetApplicationHomeSubClusterResponse getApplicationHomeSubCluster(
FederationStateStoreUtils.logAndThrowRetriableException(e, LOG, FederationStateStoreUtils.logAndThrowRetriableException(e, LOG,
"Unable to obtain the application information for the specified application %s.", "Unable to obtain the application information for the specified application %s.",
applicationId); applicationId);
} catch (IOException e) {
FederationStateStoreClientMetrics.failedStateStoreCall();
FederationStateStoreUtils.logAndThrowRetriableException(e, LOG,
"Unable to obtain the application information for the specified application %s.",
applicationId);
} finally { } finally {
// Return to the pool the CallableStatement // Return to the pool the CallableStatement
FederationStateStoreUtils.returnToPool(LOG, cstmt); FederationStateStoreUtils.returnToPool(LOG, cstmt);
} }
return GetApplicationHomeSubClusterResponse.newInstance(request.getApplicationId(), homeRM); return GetApplicationHomeSubClusterResponse.newInstance(applicationId, homeRM,
createTime, appSubmissionContext);
} }
@Override @Override

View File

@ -183,8 +183,9 @@ public void setCreateTime(long time) {
@Override @Override
public void setApplicationSubmissionContext(ApplicationSubmissionContext context) { public void setApplicationSubmissionContext(ApplicationSubmissionContext context) {
maybeInitBuilder(); maybeInitBuilder();
if (applicationSubmissionContext == null) { if (context == null) {
builder.clearAppSubmitContext(); builder.clearAppSubmitContext();
return;
} }
this.applicationSubmissionContext = context; this.applicationSubmissionContext = context;
builder.setAppSubmitContext(convertToProtoFormat(context)); builder.setAppSubmitContext(convertToProtoFormat(context));

View File

@ -34,6 +34,8 @@
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
@ -1086,4 +1088,28 @@ public void testLoadVersion() throws Exception {
public void testCheckVersion() throws Exception { public void testCheckVersion() throws Exception {
stateStore.checkVersion(); stateStore.checkVersion();
} }
@Test
public void testGetApplicationHomeSubClusterWithContext() throws Exception {
FederationStateStore federationStateStore = this.getStateStore();
ApplicationId appId = ApplicationId.newInstance(1, 3);
SubClusterId subClusterId = SubClusterId.newInstance("SC");
ApplicationSubmissionContext context =
ApplicationSubmissionContext.newInstance(appId, "test", "default",
Priority.newInstance(0), null, true, true,
2, Resource.newInstance(10, 2), "test");
addApplicationHomeSC(appId, subClusterId, context);
GetApplicationHomeSubClusterRequest getRequest =
GetApplicationHomeSubClusterRequest.newInstance(appId, true);
GetApplicationHomeSubClusterResponse result =
federationStateStore.getApplicationHomeSubCluster(getRequest);
ApplicationHomeSubCluster applicationHomeSubCluster = result.getApplicationHomeSubCluster();
assertEquals(appId, applicationHomeSubCluster.getApplicationId());
assertEquals(subClusterId, applicationHomeSubCluster.getHomeSubCluster());
assertEquals(context, applicationHomeSubCluster.getApplicationSubmissionContext());
}
} }

View File

@ -52,6 +52,7 @@ public class HSQLDBFederationStateStore extends SQLFederationStateStore {
+ " applicationId varchar(64) NOT NULL," + " applicationId varchar(64) NOT NULL,"
+ " homeSubCluster varchar(256) NOT NULL," + " homeSubCluster varchar(256) NOT NULL,"
+ " createTime datetime NOT NULL," + " createTime datetime NOT NULL,"
+ " applicationContext BLOB NULL,"
+ " CONSTRAINT pk_applicationId PRIMARY KEY (applicationId))"; + " CONSTRAINT pk_applicationId PRIMARY KEY (applicationId))";
private static final String TABLE_MEMBERSHIP = private static final String TABLE_MEMBERSHIP =
@ -173,12 +174,14 @@ public class HSQLDBFederationStateStore extends SQLFederationStateStore {
"CREATE PROCEDURE sp_addApplicationHomeSubCluster(" "CREATE PROCEDURE sp_addApplicationHomeSubCluster("
+ " IN applicationId_IN varchar(64)," + " IN applicationId_IN varchar(64),"
+ " IN homeSubCluster_IN varchar(256)," + " IN homeSubCluster_IN varchar(256),"
+ " IN applicationContext_IN BLOB,"
+ " OUT storedHomeSubCluster_OUT varchar(256), OUT rowCount_OUT int)" + " OUT storedHomeSubCluster_OUT varchar(256), OUT rowCount_OUT int)"
+ " MODIFIES SQL DATA BEGIN ATOMIC" + " MODIFIES SQL DATA BEGIN ATOMIC"
+ " INSERT INTO applicationsHomeSubCluster " + " INSERT INTO applicationsHomeSubCluster "
+ " (applicationId,homeSubCluster,createTime) " + " (applicationId,homeSubCluster,createTime,applicationContext) "
+ " (SELECT applicationId_IN, homeSubCluster_IN, " + " (SELECT applicationId_IN, homeSubCluster_IN, "
+ " NOW() AT TIME ZONE INTERVAL '0:00' HOUR TO MINUTE" + " NOW() AT TIME ZONE INTERVAL '0:00' HOUR TO MINUTE, "
+ " applicationContext_IN "
+ " FROM applicationsHomeSubCluster" + " FROM applicationsHomeSubCluster"
+ " WHERE applicationId = applicationId_IN" + " WHERE applicationId = applicationId_IN"
+ " HAVING COUNT(*) = 0 );" + " HAVING COUNT(*) = 0 );"
@ -190,19 +193,24 @@ public class HSQLDBFederationStateStore extends SQLFederationStateStore {
private static final String SP_UPDATEAPPLICATIONHOMESUBCLUSTER = private static final String SP_UPDATEAPPLICATIONHOMESUBCLUSTER =
"CREATE PROCEDURE sp_updateApplicationHomeSubCluster(" "CREATE PROCEDURE sp_updateApplicationHomeSubCluster("
+ " IN applicationId_IN varchar(64)," + " IN applicationId_IN varchar(64),"
+ " IN homeSubCluster_IN varchar(256), OUT rowCount_OUT int)" + " IN homeSubCluster_IN varchar(256), "
+ " IN applicationContext_IN BLOB, OUT rowCount_OUT int)"
+ " MODIFIES SQL DATA BEGIN ATOMIC" + " MODIFIES SQL DATA BEGIN ATOMIC"
+ " UPDATE applicationsHomeSubCluster" + " UPDATE applicationsHomeSubCluster"
+ " SET homeSubCluster = homeSubCluster_IN" + " SET homeSubCluster = homeSubCluster_IN, "
+ " applicationContext = applicationContext_IN "
+ " WHERE applicationId = applicationId_IN;" + " WHERE applicationId = applicationId_IN;"
+ " GET DIAGNOSTICS rowCount_OUT = ROW_COUNT; END"; + " GET DIAGNOSTICS rowCount_OUT = ROW_COUNT; END";
private static final String SP_GETAPPLICATIONHOMESUBCLUSTER = private static final String SP_GETAPPLICATIONHOMESUBCLUSTER =
"CREATE PROCEDURE sp_getApplicationHomeSubCluster(" "CREATE PROCEDURE sp_getApplicationHomeSubCluster("
+ " IN applicationId_IN varchar(64)," + " IN applicationId_IN varchar(64),"
+ " OUT homeSubCluster_OUT varchar(256))" + " OUT homeSubCluster_OUT varchar(256),"
+ " OUT createTime_OUT datetime,"
+ " OUT applicationContext_OUT BLOB)"
+ " MODIFIES SQL DATA BEGIN ATOMIC" + " MODIFIES SQL DATA BEGIN ATOMIC"
+ " SELECT homeSubCluster INTO homeSubCluster_OUT" + " SELECT homeSubCluster, applicationContext, createTime "
+ " INTO homeSubCluster_OUT, applicationContext_OUT, createTime_OUT "
+ " FROM applicationsHomeSubCluster" + " FROM applicationsHomeSubCluster"
+ " WHERE applicationId = applicationID_IN; END"; + " WHERE applicationId = applicationID_IN; END";

View File

@ -19,10 +19,6 @@
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
@ -31,10 +27,6 @@
import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKey; import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKey;
import org.apache.hadoop.yarn.server.federation.store.records.RouterRMDTSecretManagerState; import org.apache.hadoop.yarn.server.federation.store.records.RouterRMDTSecretManagerState;
import org.apache.hadoop.yarn.server.federation.store.records.RouterStoreToken; import org.apache.hadoop.yarn.server.federation.store.records.RouterStoreToken;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse;
import org.junit.Test;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
@ -96,30 +88,4 @@ protected void checkRouterStoreToken(RMDelegationTokenIdentifier identifier,
assertTrue(tokenIdentifier instanceof RMDelegationTokenIdentifier); assertTrue(tokenIdentifier instanceof RMDelegationTokenIdentifier);
assertEquals(identifier, tokenIdentifier); assertEquals(identifier, tokenIdentifier);
} }
@Test
public void testGetApplicationHomeSubClusterWithContext() throws Exception {
MemoryFederationStateStore memoryStateStore =
MemoryFederationStateStore.class.cast(this.getStateStore());
ApplicationId appId = ApplicationId.newInstance(1, 3);
SubClusterId subClusterId = SubClusterId.newInstance("SC");
ApplicationSubmissionContext context =
ApplicationSubmissionContext.newInstance(appId, "test", "default",
Priority.newInstance(0), null, true, true,
2, Resource.newInstance(10, 2), "test");
addApplicationHomeSC(appId, subClusterId, context);
GetApplicationHomeSubClusterRequest getRequest =
GetApplicationHomeSubClusterRequest.newInstance(appId, true);
GetApplicationHomeSubClusterResponse result =
memoryStateStore.getApplicationHomeSubCluster(getRequest);
assertEquals(appId,
result.getApplicationHomeSubCluster().getApplicationId());
assertEquals(subClusterId,
result.getApplicationHomeSubCluster().getHomeSubCluster());
assertEquals(context,
result.getApplicationHomeSubCluster().getApplicationSubmissionContext());
}
} }

View File

@ -33,10 +33,6 @@
import org.apache.hadoop.metrics2.impl.MetricsRecords; import org.apache.hadoop.metrics2.impl.MetricsRecords;
import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
@ -45,10 +41,6 @@
import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKeyRequest; import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKeyRequest;
import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKeyResponse; import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKeyResponse;
import org.apache.hadoop.yarn.server.federation.store.records.RouterStoreToken; import org.apache.hadoop.yarn.server.federation.store.records.RouterStoreToken;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse;
import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Records;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
@ -284,29 +276,4 @@ protected void checkRouterStoreToken(RMDelegationTokenIdentifier identifier,
assertNotNull(zkRouterStoreToken); assertNotNull(zkRouterStoreToken);
assertEquals(token, zkRouterStoreToken); assertEquals(token, zkRouterStoreToken);
} }
@Test
public void testGetApplicationHomeSubClusterWithContext() throws Exception {
ZookeeperFederationStateStore zkFederationStateStore =
(ZookeeperFederationStateStore) this.getStateStore();
ApplicationId appId = ApplicationId.newInstance(1, 3);
SubClusterId subClusterId = SubClusterId.newInstance("SC");
ApplicationSubmissionContext context =
ApplicationSubmissionContext.newInstance(appId, "test", "default",
Priority.newInstance(0), null, true, true,
2, Resource.newInstance(10, 2), "test");
addApplicationHomeSC(appId, subClusterId, context);
GetApplicationHomeSubClusterRequest getRequest =
GetApplicationHomeSubClusterRequest.newInstance(appId, true);
GetApplicationHomeSubClusterResponse result =
zkFederationStateStore.getApplicationHomeSubCluster(getRequest);
ApplicationHomeSubCluster applicationHomeSubCluster = result.getApplicationHomeSubCluster();
assertEquals(appId, applicationHomeSubCluster.getApplicationId());
assertEquals(subClusterId, applicationHomeSubCluster.getHomeSubCluster());
assertEquals(context, applicationHomeSubCluster.getApplicationSubmissionContext());
}
} }