YARN-6526. Refactoring SQLFederationStateStore by avoiding to recreate a connection at every call. COntributed by Bilwa S T.
This commit is contained in:
parent
e0c1d8a969
commit
2c03524fa4
|
@ -78,6 +78,7 @@ import org.apache.hadoop.yarn.util.MonotonicClock;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.zaxxer.hikari.HikariDataSource;
|
||||
|
||||
/**
|
||||
|
@ -141,6 +142,8 @@ public class SQLFederationStateStore implements FederationStateStore {
|
|||
private int maximumPoolSize;
|
||||
private HikariDataSource dataSource = null;
|
||||
private final Clock clock = new MonotonicClock();
|
||||
@VisibleForTesting
|
||||
Connection conn = null;
|
||||
|
||||
@Override
|
||||
public void init(Configuration conf) throws YarnException {
|
||||
|
@ -173,6 +176,13 @@ public class SQLFederationStateStore implements FederationStateStore {
|
|||
dataSource.setMaximumPoolSize(maximumPoolSize);
|
||||
LOG.info("Initialized connection pool to the Federation StateStore "
|
||||
+ "database at address: " + url);
|
||||
try {
|
||||
conn = getConnection();
|
||||
LOG.debug("Connection created");
|
||||
} catch (SQLException e) {
|
||||
FederationStateStoreUtils.logAndThrowRetriableException(LOG,
|
||||
"Not able to get Connection", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -185,15 +195,13 @@ public class SQLFederationStateStore implements FederationStateStore {
|
|||
.validate(registerSubClusterRequest);
|
||||
|
||||
CallableStatement cstmt = null;
|
||||
Connection conn = null;
|
||||
|
||||
SubClusterInfo subClusterInfo =
|
||||
registerSubClusterRequest.getSubClusterInfo();
|
||||
SubClusterId subClusterId = subClusterInfo.getSubClusterId();
|
||||
|
||||
try {
|
||||
conn = getConnection();
|
||||
cstmt = conn.prepareCall(CALL_SP_REGISTER_SUBCLUSTER);
|
||||
cstmt = getCallableStatement(CALL_SP_REGISTER_SUBCLUSTER);
|
||||
|
||||
// Set the parameters for the stored procedure
|
||||
cstmt.setString(1, subClusterId.getId());
|
||||
|
@ -238,9 +246,10 @@ public class SQLFederationStateStore implements FederationStateStore {
|
|||
+ " into the StateStore",
|
||||
e);
|
||||
} finally {
|
||||
// Return to the pool the CallableStatement and the Connection
|
||||
FederationStateStoreUtils.returnToPool(LOG, cstmt, conn);
|
||||
// Return to the pool the CallableStatement
|
||||
FederationStateStoreUtils.returnToPool(LOG, cstmt);
|
||||
}
|
||||
|
||||
return SubClusterRegisterResponse.newInstance();
|
||||
}
|
||||
|
||||
|
@ -254,14 +263,12 @@ public class SQLFederationStateStore implements FederationStateStore {
|
|||
.validate(subClusterDeregisterRequest);
|
||||
|
||||
CallableStatement cstmt = null;
|
||||
Connection conn = null;
|
||||
|
||||
SubClusterId subClusterId = subClusterDeregisterRequest.getSubClusterId();
|
||||
SubClusterState state = subClusterDeregisterRequest.getState();
|
||||
|
||||
try {
|
||||
conn = getConnection();
|
||||
cstmt = conn.prepareCall(CALL_SP_DEREGISTER_SUBCLUSTER);
|
||||
cstmt = getCallableStatement(CALL_SP_DEREGISTER_SUBCLUSTER);
|
||||
|
||||
// Set the parameters for the stored procedure
|
||||
cstmt.setString(1, subClusterId.getId());
|
||||
|
@ -299,8 +306,8 @@ public class SQLFederationStateStore implements FederationStateStore {
|
|||
+ state.toString(),
|
||||
e);
|
||||
} finally {
|
||||
// Return to the pool the CallableStatement and the Connection
|
||||
FederationStateStoreUtils.returnToPool(LOG, cstmt, conn);
|
||||
// Return to the pool the CallableStatement
|
||||
FederationStateStoreUtils.returnToPool(LOG, cstmt);
|
||||
}
|
||||
return SubClusterDeregisterResponse.newInstance();
|
||||
}
|
||||
|
@ -315,14 +322,12 @@ public class SQLFederationStateStore implements FederationStateStore {
|
|||
.validate(subClusterHeartbeatRequest);
|
||||
|
||||
CallableStatement cstmt = null;
|
||||
Connection conn = null;
|
||||
|
||||
SubClusterId subClusterId = subClusterHeartbeatRequest.getSubClusterId();
|
||||
SubClusterState state = subClusterHeartbeatRequest.getState();
|
||||
|
||||
try {
|
||||
conn = getConnection();
|
||||
cstmt = conn.prepareCall(CALL_SP_SUBCLUSTER_HEARTBEAT);
|
||||
cstmt = getCallableStatement(CALL_SP_SUBCLUSTER_HEARTBEAT);
|
||||
|
||||
// Set the parameters for the stored procedure
|
||||
cstmt.setString(1, subClusterId.getId());
|
||||
|
@ -362,8 +367,8 @@ public class SQLFederationStateStore implements FederationStateStore {
|
|||
+ subClusterId,
|
||||
e);
|
||||
} finally {
|
||||
// Return to the pool the CallableStatement and the Connection
|
||||
FederationStateStoreUtils.returnToPool(LOG, cstmt, conn);
|
||||
// Return to the pool the CallableStatement
|
||||
FederationStateStoreUtils.returnToPool(LOG, cstmt);
|
||||
}
|
||||
return SubClusterHeartbeatResponse.newInstance();
|
||||
}
|
||||
|
@ -376,14 +381,12 @@ public class SQLFederationStateStore implements FederationStateStore {
|
|||
FederationMembershipStateStoreInputValidator.validate(subClusterRequest);
|
||||
|
||||
CallableStatement cstmt = null;
|
||||
Connection conn = null;
|
||||
|
||||
SubClusterInfo subClusterInfo = null;
|
||||
SubClusterId subClusterId = subClusterRequest.getSubClusterId();
|
||||
|
||||
try {
|
||||
conn = getConnection();
|
||||
cstmt = conn.prepareCall(CALL_SP_GET_SUBCLUSTER);
|
||||
cstmt = getCallableStatement(CALL_SP_GET_SUBCLUSTER);
|
||||
cstmt.setString(1, subClusterId.getId());
|
||||
|
||||
// Set the parameters for the stored procedure
|
||||
|
@ -443,8 +446,8 @@ public class SQLFederationStateStore implements FederationStateStore {
|
|||
FederationStateStoreUtils.logAndThrowRetriableException(LOG,
|
||||
"Unable to obtain the SubCluster information for " + subClusterId, e);
|
||||
} finally {
|
||||
// Return to the pool the CallableStatement and the Connection
|
||||
FederationStateStoreUtils.returnToPool(LOG, cstmt, conn);
|
||||
// Return to the pool the CallableStatement
|
||||
FederationStateStoreUtils.returnToPool(LOG, cstmt);
|
||||
}
|
||||
return GetSubClusterInfoResponse.newInstance(subClusterInfo);
|
||||
}
|
||||
|
@ -453,13 +456,11 @@ public class SQLFederationStateStore implements FederationStateStore {
|
|||
public GetSubClustersInfoResponse getSubClusters(
|
||||
GetSubClustersInfoRequest subClustersRequest) throws YarnException {
|
||||
CallableStatement cstmt = null;
|
||||
Connection conn = null;
|
||||
ResultSet rs = null;
|
||||
List<SubClusterInfo> subClusters = new ArrayList<SubClusterInfo>();
|
||||
|
||||
try {
|
||||
conn = getConnection();
|
||||
cstmt = conn.prepareCall(CALL_SP_GET_SUBCLUSTERS);
|
||||
cstmt = getCallableStatement(CALL_SP_GET_SUBCLUSTERS);
|
||||
|
||||
// Execute the query
|
||||
long startTime = clock.getTime();
|
||||
|
@ -510,8 +511,8 @@ public class SQLFederationStateStore implements FederationStateStore {
|
|||
FederationStateStoreUtils.logAndThrowRetriableException(LOG,
|
||||
"Unable to obtain the information for all the SubClusters ", e);
|
||||
} finally {
|
||||
// Return to the pool the CallableStatement and the Connection
|
||||
FederationStateStoreUtils.returnToPool(LOG, cstmt, conn, rs);
|
||||
// Return to the pool the CallableStatement
|
||||
FederationStateStoreUtils.returnToPool(LOG, cstmt, null, rs);
|
||||
}
|
||||
return GetSubClustersInfoResponse.newInstance(subClusters);
|
||||
}
|
||||
|
@ -524,7 +525,6 @@ public class SQLFederationStateStore implements FederationStateStore {
|
|||
FederationApplicationHomeSubClusterStoreInputValidator.validate(request);
|
||||
|
||||
CallableStatement cstmt = null;
|
||||
Connection conn = null;
|
||||
|
||||
String subClusterHome = null;
|
||||
ApplicationId appId =
|
||||
|
@ -533,8 +533,7 @@ public class SQLFederationStateStore implements FederationStateStore {
|
|||
request.getApplicationHomeSubCluster().getHomeSubCluster();
|
||||
|
||||
try {
|
||||
conn = getConnection();
|
||||
cstmt = conn.prepareCall(CALL_SP_ADD_APPLICATION_HOME_SUBCLUSTER);
|
||||
cstmt = getCallableStatement(CALL_SP_ADD_APPLICATION_HOME_SUBCLUSTER);
|
||||
|
||||
// Set the parameters for the stored procedure
|
||||
cstmt.setString(1, appId.toString());
|
||||
|
@ -596,8 +595,8 @@ public class SQLFederationStateStore implements FederationStateStore {
|
|||
+ request.getApplicationHomeSubCluster().getApplicationId(),
|
||||
e);
|
||||
} finally {
|
||||
// Return to the pool the CallableStatement and the Connection
|
||||
FederationStateStoreUtils.returnToPool(LOG, cstmt, conn);
|
||||
// Return to the pool the CallableStatement
|
||||
FederationStateStoreUtils.returnToPool(LOG, cstmt);
|
||||
}
|
||||
return AddApplicationHomeSubClusterResponse
|
||||
.newInstance(SubClusterId.newInstance(subClusterHome));
|
||||
|
@ -611,7 +610,6 @@ public class SQLFederationStateStore implements FederationStateStore {
|
|||
FederationApplicationHomeSubClusterStoreInputValidator.validate(request);
|
||||
|
||||
CallableStatement cstmt = null;
|
||||
Connection conn = null;
|
||||
|
||||
ApplicationId appId =
|
||||
request.getApplicationHomeSubCluster().getApplicationId();
|
||||
|
@ -619,8 +617,7 @@ public class SQLFederationStateStore implements FederationStateStore {
|
|||
request.getApplicationHomeSubCluster().getHomeSubCluster();
|
||||
|
||||
try {
|
||||
conn = getConnection();
|
||||
cstmt = conn.prepareCall(CALL_SP_UPDATE_APPLICATION_HOME_SUBCLUSTER);
|
||||
cstmt = getCallableStatement(CALL_SP_UPDATE_APPLICATION_HOME_SUBCLUSTER);
|
||||
|
||||
// Set the parameters for the stored procedure
|
||||
cstmt.setString(1, appId.toString());
|
||||
|
@ -660,8 +657,8 @@ public class SQLFederationStateStore implements FederationStateStore {
|
|||
+ request.getApplicationHomeSubCluster().getApplicationId(),
|
||||
e);
|
||||
} finally {
|
||||
// Return to the pool the CallableStatement and the Connection
|
||||
FederationStateStoreUtils.returnToPool(LOG, cstmt, conn);
|
||||
// Return to the pool the CallableStatement
|
||||
FederationStateStoreUtils.returnToPool(LOG, cstmt);
|
||||
}
|
||||
return UpdateApplicationHomeSubClusterResponse.newInstance();
|
||||
}
|
||||
|
@ -673,13 +670,11 @@ public class SQLFederationStateStore implements FederationStateStore {
|
|||
FederationApplicationHomeSubClusterStoreInputValidator.validate(request);
|
||||
|
||||
CallableStatement cstmt = null;
|
||||
Connection conn = null;
|
||||
|
||||
SubClusterId homeRM = null;
|
||||
|
||||
try {
|
||||
conn = getConnection();
|
||||
cstmt = conn.prepareCall(CALL_SP_GET_APPLICATION_HOME_SUBCLUSTER);
|
||||
cstmt = getCallableStatement(CALL_SP_GET_APPLICATION_HOME_SUBCLUSTER);
|
||||
|
||||
// Set the parameters for the stored procedure
|
||||
cstmt.setString(1, request.getApplicationId().toString());
|
||||
|
@ -711,9 +706,8 @@ public class SQLFederationStateStore implements FederationStateStore {
|
|||
+ "for the specified application " + request.getApplicationId(),
|
||||
e);
|
||||
} finally {
|
||||
|
||||
// Return to the pool the CallableStatement and the Connection
|
||||
FederationStateStoreUtils.returnToPool(LOG, cstmt, conn);
|
||||
// Return to the pool the CallableStatement
|
||||
FederationStateStoreUtils.returnToPool(LOG, cstmt);
|
||||
}
|
||||
return GetApplicationHomeSubClusterResponse
|
||||
.newInstance(ApplicationHomeSubCluster
|
||||
|
@ -724,14 +718,12 @@ public class SQLFederationStateStore implements FederationStateStore {
|
|||
public GetApplicationsHomeSubClusterResponse getApplicationsHomeSubCluster(
|
||||
GetApplicationsHomeSubClusterRequest request) throws YarnException {
|
||||
CallableStatement cstmt = null;
|
||||
Connection conn = null;
|
||||
ResultSet rs = null;
|
||||
List<ApplicationHomeSubCluster> appsHomeSubClusters =
|
||||
new ArrayList<ApplicationHomeSubCluster>();
|
||||
|
||||
try {
|
||||
conn = getConnection();
|
||||
cstmt = conn.prepareCall(CALL_SP_GET_APPLICATIONS_HOME_SUBCLUSTER);
|
||||
cstmt = getCallableStatement(CALL_SP_GET_APPLICATIONS_HOME_SUBCLUSTER);
|
||||
|
||||
// Execute the query
|
||||
long startTime = clock.getTime();
|
||||
|
@ -757,8 +749,8 @@ public class SQLFederationStateStore implements FederationStateStore {
|
|||
FederationStateStoreUtils.logAndThrowRetriableException(LOG,
|
||||
"Unable to obtain the information for all the applications ", e);
|
||||
} finally {
|
||||
// Return to the pool the CallableStatement and the Connection
|
||||
FederationStateStoreUtils.returnToPool(LOG, cstmt, conn, rs);
|
||||
// Return to the pool the CallableStatement
|
||||
FederationStateStoreUtils.returnToPool(LOG, cstmt, null, rs);
|
||||
}
|
||||
return GetApplicationsHomeSubClusterResponse
|
||||
.newInstance(appsHomeSubClusters);
|
||||
|
@ -772,11 +764,9 @@ public class SQLFederationStateStore implements FederationStateStore {
|
|||
FederationApplicationHomeSubClusterStoreInputValidator.validate(request);
|
||||
|
||||
CallableStatement cstmt = null;
|
||||
Connection conn = null;
|
||||
|
||||
try {
|
||||
conn = getConnection();
|
||||
cstmt = conn.prepareCall(CALL_SP_DELETE_APPLICATION_HOME_SUBCLUSTER);
|
||||
cstmt = getCallableStatement(CALL_SP_DELETE_APPLICATION_HOME_SUBCLUSTER);
|
||||
|
||||
// Set the parameters for the stored procedure
|
||||
cstmt.setString(1, request.getApplicationId().toString());
|
||||
|
@ -812,8 +802,8 @@ public class SQLFederationStateStore implements FederationStateStore {
|
|||
FederationStateStoreUtils.logAndThrowRetriableException(LOG,
|
||||
"Unable to delete the application " + request.getApplicationId(), e);
|
||||
} finally {
|
||||
// Return to the pool the CallableStatement and the Connection
|
||||
FederationStateStoreUtils.returnToPool(LOG, cstmt, conn);
|
||||
// Return to the pool the CallableStatement
|
||||
FederationStateStoreUtils.returnToPool(LOG, cstmt);
|
||||
}
|
||||
return DeleteApplicationHomeSubClusterResponse.newInstance();
|
||||
}
|
||||
|
@ -826,12 +816,10 @@ public class SQLFederationStateStore implements FederationStateStore {
|
|||
FederationPolicyStoreInputValidator.validate(request);
|
||||
|
||||
CallableStatement cstmt = null;
|
||||
Connection conn = null;
|
||||
SubClusterPolicyConfiguration subClusterPolicyConfiguration = null;
|
||||
|
||||
try {
|
||||
conn = getConnection();
|
||||
cstmt = conn.prepareCall(CALL_SP_GET_POLICY_CONFIGURATION);
|
||||
cstmt = getCallableStatement(CALL_SP_GET_POLICY_CONFIGURATION);
|
||||
|
||||
// Set the parameters for the stored procedure
|
||||
cstmt.setString(1, request.getQueue());
|
||||
|
@ -864,8 +852,8 @@ public class SQLFederationStateStore implements FederationStateStore {
|
|||
"Unable to select the policy for the queue :" + request.getQueue(),
|
||||
e);
|
||||
} finally {
|
||||
// Return to the pool the CallableStatement and the Connection
|
||||
FederationStateStoreUtils.returnToPool(LOG, cstmt, conn);
|
||||
// Return to the pool the CallableStatement
|
||||
FederationStateStoreUtils.returnToPool(LOG, cstmt);
|
||||
}
|
||||
return GetSubClusterPolicyConfigurationResponse
|
||||
.newInstance(subClusterPolicyConfiguration);
|
||||
|
@ -879,13 +867,11 @@ public class SQLFederationStateStore implements FederationStateStore {
|
|||
FederationPolicyStoreInputValidator.validate(request);
|
||||
|
||||
CallableStatement cstmt = null;
|
||||
Connection conn = null;
|
||||
|
||||
SubClusterPolicyConfiguration policyConf = request.getPolicyConfiguration();
|
||||
|
||||
try {
|
||||
conn = getConnection();
|
||||
cstmt = conn.prepareCall(CALL_SP_SET_POLICY_CONFIGURATION);
|
||||
cstmt = getCallableStatement(CALL_SP_SET_POLICY_CONFIGURATION);
|
||||
|
||||
// Set the parameters for the stored procedure
|
||||
cstmt.setString(1, policyConf.getQueue());
|
||||
|
@ -925,8 +911,8 @@ public class SQLFederationStateStore implements FederationStateStore {
|
|||
+ policyConf.getQueue(),
|
||||
e);
|
||||
} finally {
|
||||
// Return to the pool the CallableStatement and the Connection
|
||||
FederationStateStoreUtils.returnToPool(LOG, cstmt, conn);
|
||||
// Return to the pool the CallableStatement
|
||||
FederationStateStoreUtils.returnToPool(LOG, cstmt);
|
||||
}
|
||||
return SetSubClusterPolicyConfigurationResponse.newInstance();
|
||||
}
|
||||
|
@ -936,14 +922,12 @@ public class SQLFederationStateStore implements FederationStateStore {
|
|||
GetSubClusterPoliciesConfigurationsRequest request) throws YarnException {
|
||||
|
||||
CallableStatement cstmt = null;
|
||||
Connection conn = null;
|
||||
ResultSet rs = null;
|
||||
List<SubClusterPolicyConfiguration> policyConfigurations =
|
||||
new ArrayList<SubClusterPolicyConfiguration>();
|
||||
|
||||
try {
|
||||
conn = getConnection();
|
||||
cstmt = conn.prepareCall(CALL_SP_GET_POLICIES_CONFIGURATIONS);
|
||||
cstmt = getCallableStatement(CALL_SP_GET_POLICIES_CONFIGURATIONS);
|
||||
|
||||
// Execute the query
|
||||
long startTime = clock.getTime();
|
||||
|
@ -971,8 +955,8 @@ public class SQLFederationStateStore implements FederationStateStore {
|
|||
FederationStateStoreUtils.logAndThrowRetriableException(LOG,
|
||||
"Unable to obtain the policy information for all the queues.", e);
|
||||
} finally {
|
||||
// Return to the pool the CallableStatement and the Connection
|
||||
FederationStateStoreUtils.returnToPool(LOG, cstmt, conn, rs);
|
||||
// Return to the pool the CallableStatement
|
||||
FederationStateStoreUtils.returnToPool(LOG, cstmt, null, rs);
|
||||
}
|
||||
|
||||
return GetSubClusterPoliciesConfigurationsResponse
|
||||
|
@ -993,6 +977,8 @@ public class SQLFederationStateStore implements FederationStateStore {
|
|||
public void close() throws Exception {
|
||||
if (dataSource != null) {
|
||||
dataSource.close();
|
||||
LOG.debug("Connection closed");
|
||||
FederationStateStoreClientMetrics.decrConnections();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1003,9 +989,15 @@ public class SQLFederationStateStore implements FederationStateStore {
|
|||
* @throws SQLException on failure
|
||||
*/
|
||||
public Connection getConnection() throws SQLException {
|
||||
FederationStateStoreClientMetrics.incrConnections();
|
||||
return dataSource.getConnection();
|
||||
}
|
||||
|
||||
private CallableStatement getCallableStatement(String procedure)
|
||||
throws SQLException {
|
||||
return conn.prepareCall(procedure);
|
||||
}
|
||||
|
||||
private static byte[] getByteArray(ByteBuffer bb) {
|
||||
byte[] ba = new byte[bb.limit()];
|
||||
bb.get(ba);
|
||||
|
|
|
@ -31,6 +31,7 @@ import org.apache.hadoop.metrics2.annotation.Metrics;
|
|||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
|
||||
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
|
||||
import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
|
||||
import org.apache.hadoop.metrics2.lib.MutableQuantiles;
|
||||
import org.apache.hadoop.metrics2.lib.MutableRate;
|
||||
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
|
||||
|
@ -80,6 +81,9 @@ public final class FederationStateStoreClientMetrics implements MetricsSource {
|
|||
@Metric("Total number of failed StateStore calls")
|
||||
private static MutableCounterLong totalFailedCalls;
|
||||
|
||||
@Metric("Total number of Connections")
|
||||
private static MutableGaugeInt totalConnections;
|
||||
|
||||
// This after the static members are initialized, or the constructor will
|
||||
// throw a NullPointerException
|
||||
private static final FederationStateStoreClientMetrics S_INSTANCE =
|
||||
|
@ -146,6 +150,14 @@ public final class FederationStateStoreClientMetrics implements MetricsSource {
|
|||
methodQuantileMetric.add(duration);
|
||||
}
|
||||
|
||||
public static void incrConnections() {
|
||||
totalConnections.incr();
|
||||
}
|
||||
|
||||
public static void decrConnections() {
|
||||
totalConnections.decr();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void getMetrics(MetricsCollector collector, boolean all) {
|
||||
REGISTRY.snapshot(collector.addRecord(REGISTRY.info()), all);
|
||||
|
@ -181,4 +193,10 @@ public final class FederationStateStoreClientMetrics implements MetricsSource {
|
|||
static double getLatencySucceededCalls() {
|
||||
return totalSucceededCalls.lastStat().mean();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public static int getNumConnections() {
|
||||
return totalConnections.value();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
|
|||
import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreException;
|
||||
import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreInvalidInputException;
|
||||
import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreRetriableException;
|
||||
import org.apache.hadoop.yarn.server.federation.store.metrics.FederationStateStoreClientMetrics;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -69,6 +70,7 @@ public final class FederationStateStoreUtils {
|
|||
if (conn != null) {
|
||||
try {
|
||||
conn.close();
|
||||
FederationStateStoreClientMetrics.decrConnections();
|
||||
} catch (SQLException e) {
|
||||
logAndThrowException(log, "Exception while trying to close Connection",
|
||||
e);
|
||||
|
@ -98,6 +100,18 @@ public final class FederationStateStoreUtils {
|
|||
returnToPool(log, cstmt, conn, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the SQL <code>FederationStateStore</code> connections to the pool.
|
||||
*
|
||||
* @param log the logger interface
|
||||
* @param cstmt the interface used to execute SQL stored procedures
|
||||
* @throws YarnException on failure
|
||||
*/
|
||||
public static void returnToPool(Logger log, CallableStatement cstmt)
|
||||
throws YarnException {
|
||||
returnToPool(log, cstmt, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Throws an exception due to an error in <code>FederationStateStore</code>.
|
||||
*
|
||||
|
|
|
@ -68,7 +68,7 @@ import org.junit.Test;
|
|||
public abstract class FederationStateStoreBaseTest {
|
||||
|
||||
private static final MonotonicClock CLOCK = new MonotonicClock();
|
||||
private FederationStateStore stateStore = createStateStore();
|
||||
private FederationStateStore stateStore;
|
||||
|
||||
protected abstract FederationStateStore createStateStore();
|
||||
|
||||
|
@ -76,6 +76,7 @@ public abstract class FederationStateStoreBaseTest {
|
|||
|
||||
@Before
|
||||
public void before() throws IOException, YarnException {
|
||||
stateStore = createStateStore();
|
||||
stateStore.init(conf);
|
||||
}
|
||||
|
||||
|
@ -516,7 +517,7 @@ public abstract class FederationStateStoreBaseTest {
|
|||
|
||||
// Convenience methods
|
||||
|
||||
private SubClusterInfo createSubClusterInfo(SubClusterId subClusterId) {
|
||||
SubClusterInfo createSubClusterInfo(SubClusterId subClusterId) {
|
||||
|
||||
String amRMAddress = "1.2.3.4:1";
|
||||
String clientRMAddress = "1.2.3.4:2";
|
||||
|
@ -535,7 +536,7 @@ public abstract class FederationStateStoreBaseTest {
|
|||
return SubClusterPolicyConfiguration.newInstance(queueName, policyType, bb);
|
||||
}
|
||||
|
||||
private void addApplicationHomeSC(ApplicationId appId,
|
||||
void addApplicationHomeSC(ApplicationId appId,
|
||||
SubClusterId subClusterId) throws YarnException {
|
||||
ApplicationHomeSubCluster ahsc =
|
||||
ApplicationHomeSubCluster.newInstance(appId, subClusterId);
|
||||
|
@ -558,14 +559,14 @@ public abstract class FederationStateStoreBaseTest {
|
|||
SubClusterRegisterRequest.newInstance(subClusterInfo));
|
||||
}
|
||||
|
||||
private SubClusterInfo querySubClusterInfo(SubClusterId subClusterId)
|
||||
SubClusterInfo querySubClusterInfo(SubClusterId subClusterId)
|
||||
throws YarnException {
|
||||
GetSubClusterInfoRequest request =
|
||||
GetSubClusterInfoRequest.newInstance(subClusterId);
|
||||
return stateStore.getSubCluster(request).getSubClusterInfo();
|
||||
}
|
||||
|
||||
private SubClusterId queryApplicationHomeSC(ApplicationId appId)
|
||||
SubClusterId queryApplicationHomeSC(ApplicationId appId)
|
||||
throws YarnException {
|
||||
GetApplicationHomeSubClusterRequest request =
|
||||
GetApplicationHomeSubClusterRequest.newInstance(appId);
|
||||
|
@ -594,4 +595,8 @@ public abstract class FederationStateStoreBaseTest {
|
|||
return conf;
|
||||
}
|
||||
|
||||
protected FederationStateStore getStateStore() {
|
||||
return stateStore;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -209,7 +209,7 @@ public class HSQLDBFederationStateStore extends SQLFederationStateStore {
|
|||
LOG.error("ERROR: failed to init HSQLDB " + e1.getMessage());
|
||||
}
|
||||
try {
|
||||
conn = getConnection();
|
||||
conn = super.conn;
|
||||
|
||||
LOG.info("Database Init: Start");
|
||||
|
||||
|
@ -234,7 +234,6 @@ public class HSQLDBFederationStateStore extends SQLFederationStateStore {
|
|||
conn.prepareStatement(SP_GETPOLICIESCONFIGURATIONS).execute();
|
||||
|
||||
LOG.info("Database Init: Complete");
|
||||
conn.close();
|
||||
} catch (SQLException e) {
|
||||
LOG.error("ERROR: failed to inizialize HSQLDB " + e.getMessage());
|
||||
}
|
||||
|
|
|
@ -17,8 +17,16 @@
|
|||
|
||||
package org.apache.hadoop.yarn.server.federation.store.impl;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
|
||||
import org.apache.hadoop.yarn.server.federation.store.metrics.FederationStateStoreClientMetrics;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
* Unit tests for SQLFederationStateStore.
|
||||
|
@ -46,4 +54,24 @@ public class TestSQLFederationStateStore extends FederationStateStoreBaseTest {
|
|||
super.setConf(conf);
|
||||
return new HSQLDBFederationStateStore();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSqlConnectionsCreatedCount() throws YarnException {
|
||||
FederationStateStore stateStore = getStateStore();
|
||||
SubClusterId subClusterId = SubClusterId.newInstance("SC");
|
||||
ApplicationId appId = ApplicationId.newInstance(1, 1);
|
||||
|
||||
SubClusterInfo subClusterInfo = createSubClusterInfo(subClusterId);
|
||||
|
||||
stateStore.registerSubCluster(
|
||||
SubClusterRegisterRequest.newInstance(subClusterInfo));
|
||||
Assert.assertEquals(subClusterInfo, querySubClusterInfo(subClusterId));
|
||||
|
||||
addApplicationHomeSC(appId, subClusterId);
|
||||
Assert.assertEquals(subClusterId, queryApplicationHomeSC(appId));
|
||||
|
||||
// Verify if connection is created only once at statestore init
|
||||
Assert.assertEquals(1,
|
||||
FederationStateStoreClientMetrics.getNumConnections());
|
||||
}
|
||||
}
|
|
@ -72,7 +72,6 @@ public class TestZookeeperFederationStateStore
|
|||
@After
|
||||
public void after() throws Exception {
|
||||
super.after();
|
||||
|
||||
curatorFramework.close();
|
||||
try {
|
||||
curatorTestingServer.stop();
|
||||
|
@ -82,8 +81,7 @@ public class TestZookeeperFederationStateStore
|
|||
|
||||
@Override
|
||||
protected FederationStateStore createStateStore() {
|
||||
Configuration conf = new Configuration();
|
||||
super.setConf(conf);
|
||||
super.setConf(getConf());
|
||||
return new ZookeeperFederationStateStore();
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue