YARN-5603. Metrics for Federation StateStore. (Ellen Hui via asuresh)

(cherry picked from commit 75abc9a8e2)
This commit is contained in:
Arun Suresh 2017-08-21 22:43:08 -07:00 committed by Carlo Curino
parent 261f769d79
commit ac090b38ad
4 changed files with 426 additions and 0 deletions

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreInvalidInputException;
import org.apache.hadoop.yarn.server.federation.store.metrics.FederationStateStoreClientMetrics;
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
@ -72,6 +73,8 @@ import org.apache.hadoop.yarn.server.federation.store.utils.FederationMembership
import org.apache.hadoop.yarn.server.federation.store.utils.FederationPolicyStoreInputValidator;
import org.apache.hadoop.yarn.server.federation.store.utils.FederationStateStoreUtils;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.MonotonicClock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -137,6 +140,7 @@ public class SQLFederationStateStore implements FederationStateStore {
private String url;
private int maximumPoolSize;
private HikariDataSource dataSource = null;
private final Clock clock = new MonotonicClock();
@Override
public void init(Configuration conf) throws YarnException {
@ -203,7 +207,9 @@ public class SQLFederationStateStore implements FederationStateStore {
cstmt.registerOutParameter(9, java.sql.Types.INTEGER);
// Execute the query
long startTime = clock.getTime();
cstmt.executeUpdate();
long stopTime = clock.getTime();
// Check the ROWCOUNT value, if it is equal to 0 it means the call
// did not add a new subcluster into FederationStateStore
@ -222,8 +228,11 @@ public class SQLFederationStateStore implements FederationStateStore {
LOG.info(
"Registered the SubCluster " + subClusterId + " into the StateStore");
FederationStateStoreClientMetrics
.succeededStateStoreCall(stopTime - startTime);
} catch (SQLException e) {
FederationStateStoreClientMetrics.failedStateStoreCall();
FederationStateStoreUtils.logAndThrowRetriableException(LOG,
"Unable to register the SubCluster " + subClusterId
+ " into the StateStore",
@ -260,7 +269,9 @@ public class SQLFederationStateStore implements FederationStateStore {
cstmt.registerOutParameter(3, java.sql.Types.INTEGER);
// Execute the query
long startTime = clock.getTime();
cstmt.executeUpdate();
long stopTime = clock.getTime();
// Check the ROWCOUNT value, if it is equal to 0 it means the call
// did not deregister the subcluster into FederationStateStore
@ -278,8 +289,11 @@ public class SQLFederationStateStore implements FederationStateStore {
LOG.info("Deregistered the SubCluster " + subClusterId + " state to "
+ state.toString());
FederationStateStoreClientMetrics
.succeededStateStoreCall(stopTime - startTime);
} catch (SQLException e) {
FederationStateStoreClientMetrics.failedStateStoreCall();
FederationStateStoreUtils.logAndThrowRetriableException(LOG,
"Unable to deregister the sub-cluster " + subClusterId + " state to "
+ state.toString(),
@ -317,7 +331,9 @@ public class SQLFederationStateStore implements FederationStateStore {
cstmt.registerOutParameter(4, java.sql.Types.INTEGER);
// Execute the query
long startTime = clock.getTime();
cstmt.executeUpdate();
long stopTime = clock.getTime();
// Check the ROWCOUNT value, if it is equal to 0 it means the call
// did not update the subcluster into FederationStateStore
@ -336,8 +352,11 @@ public class SQLFederationStateStore implements FederationStateStore {
LOG.info("Heartbeated the StateStore for the specified SubCluster "
+ subClusterId);
FederationStateStoreClientMetrics
.succeededStateStoreCall(stopTime - startTime);
} catch (SQLException e) {
FederationStateStoreClientMetrics.failedStateStoreCall();
FederationStateStoreUtils.logAndThrowRetriableException(LOG,
"Unable to heartbeat the StateStore for the specified SubCluster "
+ subClusterId,
@ -378,7 +397,9 @@ public class SQLFederationStateStore implements FederationStateStore {
cstmt.registerOutParameter(9, java.sql.Types.VARCHAR);
// Execute the query
long startTime = clock.getTime();
cstmt.execute();
long stopTime = clock.getTime();
String amRMAddress = cstmt.getString(2);
String clientRMAddress = cstmt.getString(3);
@ -403,6 +424,9 @@ public class SQLFederationStateStore implements FederationStateStore {
clientRMAddress, rmAdminAddress, webAppAddress, lastHeartBeat, state,
lastStartTime, capability);
FederationStateStoreClientMetrics
.succeededStateStoreCall(stopTime - startTime);
// Check if the output it is a valid subcluster
try {
FederationMembershipStateStoreInputValidator
@ -417,6 +441,7 @@ public class SQLFederationStateStore implements FederationStateStore {
+ subClusterInfo.toString());
}
} catch (SQLException e) {
FederationStateStoreClientMetrics.failedStateStoreCall();
FederationStateStoreUtils.logAndThrowRetriableException(LOG,
"Unable to obtain the SubCluster information for " + subClusterId, e);
} finally {
@ -439,7 +464,9 @@ public class SQLFederationStateStore implements FederationStateStore {
cstmt = conn.prepareCall(CALL_SP_GET_SUBCLUSTERS);
// Execute the query
long startTime = clock.getTime();
rs = cstmt.executeQuery();
long stopTime = clock.getTime();
while (rs.next()) {
@ -459,6 +486,10 @@ public class SQLFederationStateStore implements FederationStateStore {
amRMAddress, clientRMAddress, rmAdminAddress, webAppAddress,
lastHeartBeat, state, lastStartTime, capability);
FederationStateStoreClientMetrics
.succeededStateStoreCall(stopTime - startTime);
// Check if the output it is a valid subcluster
try {
FederationMembershipStateStoreInputValidator
@ -477,6 +508,7 @@ public class SQLFederationStateStore implements FederationStateStore {
}
} catch (SQLException e) {
FederationStateStoreClientMetrics.failedStateStoreCall();
FederationStateStoreUtils.logAndThrowRetriableException(LOG,
"Unable to obtain the information for all the SubClusters ", e);
} finally {
@ -513,11 +545,16 @@ public class SQLFederationStateStore implements FederationStateStore {
cstmt.registerOutParameter(4, java.sql.Types.INTEGER);
// Execute the query
long startTime = clock.getTime();
cstmt.executeUpdate();
long stopTime = clock.getTime();
subClusterHome = cstmt.getString(3);
SubClusterId subClusterIdHome = SubClusterId.newInstance(subClusterHome);
FederationStateStoreClientMetrics
.succeededStateStoreCall(stopTime - startTime);
// For failover reason, we check the returned SubClusterId.
// If it is equal to the subclusterId we sent, the call added the new
// application into FederationStateStore. If the call returns a different
@ -554,6 +591,7 @@ public class SQLFederationStateStore implements FederationStateStore {
}
} catch (SQLException e) {
FederationStateStoreClientMetrics.failedStateStoreCall();
FederationStateStoreUtils
.logAndThrowRetriableException(LOG,
"Unable to insert the newly generated application "
@ -592,7 +630,9 @@ public class SQLFederationStateStore implements FederationStateStore {
cstmt.registerOutParameter(3, java.sql.Types.INTEGER);
// Execute the query
long startTime = clock.getTime();
cstmt.executeUpdate();
long stopTime = clock.getTime();
// Check the ROWCOUNT value, if it is equal to 0 it means the call
// did not update the application into FederationStateStore
@ -611,8 +651,11 @@ public class SQLFederationStateStore implements FederationStateStore {
LOG.info(
"Update the SubCluster to {} for application {} in the StateStore",
subClusterId, appId);
FederationStateStoreClientMetrics
.succeededStateStoreCall(stopTime - startTime);
} catch (SQLException e) {
FederationStateStoreClientMetrics.failedStateStoreCall();
FederationStateStoreUtils
.logAndThrowRetriableException(LOG,
"Unable to update the application "
@ -645,7 +688,9 @@ public class SQLFederationStateStore implements FederationStateStore {
cstmt.registerOutParameter(2, java.sql.Types.VARCHAR);
// Execute the query
long startTime = clock.getTime();
cstmt.execute();
long stopTime = clock.getTime();
if (cstmt.getString(2) != null) {
homeRM = SubClusterId.newInstance(cstmt.getString(2));
@ -659,7 +704,12 @@ public class SQLFederationStateStore implements FederationStateStore {
LOG.debug("Got the information about the specified application "
+ request.getApplicationId() + ". The AM is running in " + homeRM);
}
FederationStateStoreClientMetrics
.succeededStateStoreCall(stopTime - startTime);
} catch (SQLException e) {
FederationStateStoreClientMetrics.failedStateStoreCall();
FederationStateStoreUtils.logAndThrowRetriableException(LOG,
"Unable to obtain the application information "
+ "for the specified application " + request.getApplicationId(),
@ -688,7 +738,9 @@ public class SQLFederationStateStore implements FederationStateStore {
cstmt = conn.prepareCall(CALL_SP_GET_APPLICATIONS_HOME_SUBCLUSTER);
// Execute the query
long startTime = clock.getTime();
rs = cstmt.executeQuery();
long stopTime = clock.getTime();
while (rs.next()) {
@ -701,7 +753,11 @@ public class SQLFederationStateStore implements FederationStateStore {
SubClusterId.newInstance(homeSubCluster)));
}
FederationStateStoreClientMetrics
.succeededStateStoreCall(stopTime - startTime);
} catch (SQLException e) {
FederationStateStoreClientMetrics.failedStateStoreCall();
FederationStateStoreUtils.logAndThrowRetriableException(LOG,
"Unable to obtain the information for all the applications ", e);
} finally {
@ -731,7 +787,9 @@ public class SQLFederationStateStore implements FederationStateStore {
cstmt.registerOutParameter(2, java.sql.Types.INTEGER);
// Execute the query
long startTime = clock.getTime();
cstmt.executeUpdate();
long stopTime = clock.getTime();
// Check the ROWCOUNT value, if it is equal to 0 it means the call
// did not delete the application from FederationStateStore
@ -750,8 +808,11 @@ public class SQLFederationStateStore implements FederationStateStore {
LOG.info("Delete from the StateStore the application: {}",
request.getApplicationId());
FederationStateStoreClientMetrics
.succeededStateStoreCall(stopTime - startTime);
} catch (SQLException e) {
FederationStateStoreClientMetrics.failedStateStoreCall();
FederationStateStoreUtils.logAndThrowRetriableException(LOG,
"Unable to delete the application " + request.getApplicationId(), e);
} finally {
@ -782,7 +843,9 @@ public class SQLFederationStateStore implements FederationStateStore {
cstmt.registerOutParameter(3, java.sql.Types.VARBINARY);
// Execute the query
long startTime = clock.getTime();
cstmt.executeUpdate();
long stopTime = clock.getTime();
// Check if the output it is a valid policy
if (cstmt.getString(2) != null && cstmt.getBytes(3) != null) {
@ -798,7 +861,11 @@ public class SQLFederationStateStore implements FederationStateStore {
return null;
}
FederationStateStoreClientMetrics
.succeededStateStoreCall(stopTime - startTime);
} catch (SQLException e) {
FederationStateStoreClientMetrics.failedStateStoreCall();
FederationStateStoreUtils.logAndThrowRetriableException(LOG,
"Unable to select the policy for the queue :" + request.getQueue(),
e);
@ -833,7 +900,9 @@ public class SQLFederationStateStore implements FederationStateStore {
cstmt.registerOutParameter(4, java.sql.Types.INTEGER);
// Execute the query
long startTime = clock.getTime();
cstmt.executeUpdate();
long stopTime = clock.getTime();
// Check the ROWCOUNT value, if it is equal to 0 it means the call
// did not add a new policy into FederationStateStore
@ -852,8 +921,11 @@ public class SQLFederationStateStore implements FederationStateStore {
LOG.info("Insert into the state store the policy for the queue: "
+ policyConf.getQueue());
FederationStateStoreClientMetrics
.succeededStateStoreCall(stopTime - startTime);
} catch (SQLException e) {
FederationStateStoreClientMetrics.failedStateStoreCall();
FederationStateStoreUtils.logAndThrowRetriableException(LOG,
"Unable to insert the newly generated policy for the queue :"
+ policyConf.getQueue(),
@ -880,7 +952,9 @@ public class SQLFederationStateStore implements FederationStateStore {
cstmt = conn.prepareCall(CALL_SP_GET_POLICIES_CONFIGURATIONS);
// Execute the query
long startTime = clock.getTime();
rs = cstmt.executeQuery();
long stopTime = clock.getTime();
while (rs.next()) {
@ -894,7 +968,12 @@ public class SQLFederationStateStore implements FederationStateStore {
ByteBuffer.wrap(policyInfo));
policyConfigurations.add(subClusterPolicyConfiguration);
}
FederationStateStoreClientMetrics
.succeededStateStoreCall(stopTime - startTime);
} catch (SQLException e) {
FederationStateStoreClientMetrics.failedStateStoreCall();
FederationStateStoreUtils.logAndThrowRetriableException(LOG,
"Unable to obtain the policy information for all the queues.", e);
} finally {

View File

@ -0,0 +1,184 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.federation.store.metrics;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.metrics2.MetricsCollector;
import org.apache.hadoop.metrics2.MetricsSource;
import org.apache.hadoop.metrics2.annotation.Metric;
import org.apache.hadoop.metrics2.annotation.Metrics;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
import org.apache.hadoop.metrics2.lib.MutableQuantiles;
import org.apache.hadoop.metrics2.lib.MutableRate;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
/**
* Performance metrics for FederationStateStore implementations.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
@Metrics(about = "Performance and usage metrics for Federation StateStore",
context = "fedr")
public final class FederationStateStoreClientMetrics implements MetricsSource {
public static final Logger LOG =
LoggerFactory.getLogger(FederationStateStoreClientMetrics.class);
private static final MetricsRegistry REGISTRY =
new MetricsRegistry("FederationStateStoreClientMetrics");
private final static Method[] STATESTORE_API_METHODS =
FederationStateStore.class.getMethods();
// Map method names to counter objects
private static final Map<String, MutableCounterLong> API_TO_FAILED_CALLS =
new HashMap<String, MutableCounterLong>();
private static final Map<String, MutableRate> API_TO_SUCCESSFUL_CALLS =
new HashMap<String, MutableRate>();
// Provide quantile latency for each api call.
private static final Map<String, MutableQuantiles> API_TO_QUANTILE_METRICS =
new HashMap<String, MutableQuantiles>();
// Error string templates for logging calls from methods not in
// FederationStateStore API
private static final String UNKOWN_FAIL_ERROR_MSG =
"Not recording failed call for unknown FederationStateStore method {}";
private static final String UNKNOWN_SUCCESS_ERROR_MSG =
"Not recording successful call for unknown "
+ "FederationStateStore method {}";
// Aggregate metrics are shared, and don't have to be looked up per call
@Metric("Total number of successful calls and latency(ms)")
private static MutableRate totalSucceededCalls;
@Metric("Total number of failed StateStore calls")
private static MutableCounterLong totalFailedCalls;
// This after the static members are initialized, or the constructor will
// throw a NullPointerException
private static final FederationStateStoreClientMetrics S_INSTANCE =
DefaultMetricsSystem.instance()
.register(new FederationStateStoreClientMetrics());
synchronized public static FederationStateStoreClientMetrics getInstance() {
return S_INSTANCE;
}
private FederationStateStoreClientMetrics() {
// Create the metrics for each method and put them into the map
for (Method m : STATESTORE_API_METHODS) {
String methodName = m.getName();
LOG.debug("Registering Federation StateStore Client metrics for {}",
methodName);
// This metric only records the number of failed calls; it does not
// capture latency information
API_TO_FAILED_CALLS.put(methodName,
REGISTRY.newCounter(methodName + "_numFailedCalls",
"# failed calls to " + methodName, 0L));
// This metric records both the number and average latency of successful
// calls.
API_TO_SUCCESSFUL_CALLS.put(methodName,
REGISTRY.newRate(methodName + "_successfulCalls",
"# successful calls and latency(ms) for" + methodName));
// This metric records the quantile-based latency of each successful call,
// re-sampled every 10 seconds.
API_TO_QUANTILE_METRICS.put(methodName,
REGISTRY.newQuantiles(methodName + "Latency",
"Quantile latency (ms) for " + methodName, "ops", "latency", 10));
}
}
public static void failedStateStoreCall() {
String methodName =
Thread.currentThread().getStackTrace()[2].getMethodName();
MutableCounterLong methodMetric = API_TO_FAILED_CALLS.get(methodName);
if (methodMetric == null) {
LOG.error(UNKOWN_FAIL_ERROR_MSG, methodName);
return;
}
totalFailedCalls.incr();
methodMetric.incr();
}
public static void succeededStateStoreCall(long duration) {
String methodName =
Thread.currentThread().getStackTrace()[2].getMethodName();
MutableRate methodMetric = API_TO_SUCCESSFUL_CALLS.get(methodName);
MutableQuantiles methodQuantileMetric =
API_TO_QUANTILE_METRICS.get(methodName);
if (methodMetric == null || methodQuantileMetric == null) {
LOG.error(UNKNOWN_SUCCESS_ERROR_MSG, methodName);
return;
}
totalSucceededCalls.add(duration);
methodMetric.add(duration);
methodQuantileMetric.add(duration);
}
@Override
public void getMetrics(MetricsCollector collector, boolean all) {
REGISTRY.snapshot(collector.addRecord(REGISTRY.info()), all);
}
// Getters for unit testing
@VisibleForTesting
static long getNumFailedCallsForMethod(String methodName) {
return API_TO_FAILED_CALLS.get(methodName).value();
}
@VisibleForTesting
static long getNumSucceessfulCallsForMethod(String methodName) {
return API_TO_SUCCESSFUL_CALLS.get(methodName).lastStat().numSamples();
}
@VisibleForTesting
static double getLatencySucceessfulCallsForMethod(String methodName) {
return API_TO_SUCCESSFUL_CALLS.get(methodName).lastStat().mean();
}
@VisibleForTesting
static long getNumFailedCalls() {
return totalFailedCalls.value();
}
@VisibleForTesting
static long getNumSucceededCalls() {
return totalSucceededCalls.lastStat().numSamples();
}
@VisibleForTesting
static double getLatencySucceededCalls() {
return totalSucceededCalls.lastStat().mean();
}
}

View File

@ -0,0 +1,17 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.server.federation.store.metrics;

View File

@ -0,0 +1,146 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.server.federation.store.metrics;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Unittests for {@link FederationStateStoreClientMetrics}.
*
*/
public class TestFederationStateStoreClientMetrics {
public static final Logger LOG =
LoggerFactory.getLogger(TestFederationStateStoreClientMetrics.class);
private MockBadFederationStateStore badStateStore =
new MockBadFederationStateStore();
private MockGoodFederationStateStore goodStateStore =
new MockGoodFederationStateStore();
@Test
public void testAggregateMetricInit() {
LOG.info("Test: aggregate metrics are initialized correctly");
Assert.assertEquals(0,
FederationStateStoreClientMetrics.getNumSucceededCalls());
Assert.assertEquals(0,
FederationStateStoreClientMetrics.getNumFailedCalls());
LOG.info("Test: aggregate metrics are updated correctly");
}
@Test
public void testSuccessfulCalls() {
LOG.info("Test: Aggregate and method successful calls updated correctly");
long totalGoodBefore =
FederationStateStoreClientMetrics.getNumSucceededCalls();
long apiGoodBefore = FederationStateStoreClientMetrics
.getNumSucceessfulCallsForMethod("registerSubCluster");
goodStateStore.registerSubCluster(100);
Assert.assertEquals(totalGoodBefore + 1,
FederationStateStoreClientMetrics.getNumSucceededCalls());
Assert.assertEquals(100,
FederationStateStoreClientMetrics.getLatencySucceededCalls(), 0);
Assert.assertEquals(apiGoodBefore + 1,
FederationStateStoreClientMetrics.getNumSucceededCalls());
Assert.assertEquals(100, FederationStateStoreClientMetrics
.getLatencySucceessfulCallsForMethod("registerSubCluster"), 0);
LOG.info("Test: Running stats correctly calculated for 2 metrics");
goodStateStore.registerSubCluster(200);
Assert.assertEquals(totalGoodBefore + 2,
FederationStateStoreClientMetrics.getNumSucceededCalls());
Assert.assertEquals(150,
FederationStateStoreClientMetrics.getLatencySucceededCalls(), 0);
Assert.assertEquals(apiGoodBefore + 2,
FederationStateStoreClientMetrics.getNumSucceededCalls());
Assert.assertEquals(150, FederationStateStoreClientMetrics
.getLatencySucceessfulCallsForMethod("registerSubCluster"), 0);
}
@Test
public void testFailedCalls() {
long totalBadbefore = FederationStateStoreClientMetrics.getNumFailedCalls();
long apiBadBefore = FederationStateStoreClientMetrics
.getNumFailedCallsForMethod("registerSubCluster");
badStateStore.registerSubCluster();
LOG.info("Test: Aggregate and method failed calls updated correctly");
Assert.assertEquals(totalBadbefore + 1,
FederationStateStoreClientMetrics.getNumFailedCalls());
Assert.assertEquals(apiBadBefore + 1, FederationStateStoreClientMetrics
.getNumFailedCallsForMethod("registerSubCluster"));
}
@Test
public void testCallsUnknownMethod() {
long totalBadbefore = FederationStateStoreClientMetrics.getNumFailedCalls();
long apiBadBefore = FederationStateStoreClientMetrics
.getNumFailedCallsForMethod("registerSubCluster");
long totalGoodBefore =
FederationStateStoreClientMetrics.getNumSucceededCalls();
long apiGoodBefore = FederationStateStoreClientMetrics
.getNumSucceessfulCallsForMethod("registerSubCluster");
LOG.info("Calling Metrics class directly");
FederationStateStoreClientMetrics.failedStateStoreCall();
FederationStateStoreClientMetrics.succeededStateStoreCall(100);
LOG.info("Test: Aggregate and method calls did not update");
Assert.assertEquals(totalBadbefore,
FederationStateStoreClientMetrics.getNumFailedCalls());
Assert.assertEquals(apiBadBefore, FederationStateStoreClientMetrics
.getNumFailedCallsForMethod("registerSubCluster"));
Assert.assertEquals(totalGoodBefore,
FederationStateStoreClientMetrics.getNumSucceededCalls());
Assert.assertEquals(apiGoodBefore, FederationStateStoreClientMetrics
.getNumSucceessfulCallsForMethod("registerSubCluster"));
}
// Records failures for all calls
private class MockBadFederationStateStore {
public void registerSubCluster() {
LOG.info("Mocked: failed registerSubCluster call");
FederationStateStoreClientMetrics.failedStateStoreCall();
}
}
// Records successes for all calls
private class MockGoodFederationStateStore {
public void registerSubCluster(long duration) {
LOG.info("Mocked: successful registerSubCluster call with duration {}",
duration);
FederationStateStoreClientMetrics.succeededStateStoreCall(duration);
}
}
}