YARN-11217. [Federation] Add dumpSchedulerLogs REST APIs for Router. (#5272)
This commit is contained in:
parent
08f58ecf07
commit
af20841fb1
|
@ -135,6 +135,8 @@ public final class RouterMetrics {
|
|||
private MutableGaugeInt numRenewDelegationTokenFailedRetrieved;
|
||||
@Metric("# of renewDelegationToken failed to be retrieved")
|
||||
private MutableGaugeInt numCancelDelegationTokenFailedRetrieved;
|
||||
@Metric("# of dumpSchedulerLogs failed to be retrieved")
|
||||
private MutableGaugeInt numDumpSchedulerLogsFailedRetrieved;
|
||||
@Metric("# of getActivities failed to be retrieved")
|
||||
private MutableGaugeInt numGetActivitiesFailedRetrieved;
|
||||
@Metric("# of getBulkActivities failed to be retrieved")
|
||||
|
@ -241,6 +243,8 @@ public final class RouterMetrics {
|
|||
private MutableRate totalSucceededRenewDelegationTokenRetrieved;
|
||||
@Metric("Total number of successful Retrieved CancelDelegationToken and latency(ms)")
|
||||
private MutableRate totalSucceededCancelDelegationTokenRetrieved;
|
||||
@Metric("Total number of successful Retrieved DumpSchedulerLogs and latency(ms)")
|
||||
private MutableRate totalSucceededDumpSchedulerLogsRetrieved;
|
||||
@Metric("Total number of successful Retrieved GetActivities and latency(ms)")
|
||||
private MutableRate totalSucceededGetActivitiesRetrieved;
|
||||
@Metric("Total number of successful Retrieved GetBulkActivities and latency(ms)")
|
||||
|
@ -303,6 +307,7 @@ public final class RouterMetrics {
|
|||
private MutableQuantiles getDelegationTokenLatency;
|
||||
private MutableQuantiles renewDelegationTokenLatency;
|
||||
private MutableQuantiles cancelDelegationTokenLatency;
|
||||
private MutableQuantiles dumpSchedulerLogsLatency;
|
||||
private MutableQuantiles getActivitiesLatency;
|
||||
private MutableQuantiles getBulkActivitiesLatency;
|
||||
private MutableQuantiles getSchedulerInfoRetrievedLatency;
|
||||
|
@ -482,6 +487,9 @@ public final class RouterMetrics {
|
|||
cancelDelegationTokenLatency = registry.newQuantiles("cancelDelegationTokenLatency",
|
||||
"latency of cancel delegation token timeouts", "ops", "latency", 10);
|
||||
|
||||
dumpSchedulerLogsLatency = registry.newQuantiles("dumpSchedulerLogsLatency",
|
||||
"latency of dump scheduler logs timeouts", "ops", "latency", 10);
|
||||
|
||||
getActivitiesLatency = registry.newQuantiles("getActivitiesLatency",
|
||||
"latency of get activities timeouts", "ops", "latency", 10);
|
||||
|
||||
|
@ -752,6 +760,11 @@ public final class RouterMetrics {
|
|||
return totalSucceededCancelDelegationTokenRetrieved.lastStat().numSamples();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public long getNumSucceededDumpSchedulerLogsRetrieved() {
|
||||
return totalSucceededDumpSchedulerLogsRetrieved.lastStat().numSamples();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public long getNumSucceededGetActivitiesRetrieved() {
|
||||
return totalSucceededGetActivitiesRetrieved.lastStat().numSamples();
|
||||
|
@ -1007,6 +1020,11 @@ public final class RouterMetrics {
|
|||
return totalSucceededCancelDelegationTokenRetrieved.lastStat().mean();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public double getLatencySucceededDumpSchedulerLogsRetrieved() {
|
||||
return totalSucceededDumpSchedulerLogsRetrieved.lastStat().mean();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public double getLatencySucceededGetActivitiesRetrieved() {
|
||||
return totalSucceededGetActivitiesRetrieved.lastStat().mean();
|
||||
|
@ -1245,6 +1263,10 @@ public final class RouterMetrics {
|
|||
return numCancelDelegationTokenFailedRetrieved.value();
|
||||
}
|
||||
|
||||
public int getDumpSchedulerLogsFailedRetrieved() {
|
||||
return numDumpSchedulerLogsFailedRetrieved.value();
|
||||
}
|
||||
|
||||
public int getActivitiesFailedRetrieved() {
|
||||
return numGetActivitiesFailedRetrieved.value();
|
||||
}
|
||||
|
@ -1492,6 +1514,11 @@ public final class RouterMetrics {
|
|||
cancelDelegationTokenLatency.add(duration);
|
||||
}
|
||||
|
||||
public void succeededDumpSchedulerLogsRetrieved(long duration) {
|
||||
totalSucceededDumpSchedulerLogsRetrieved.add(duration);
|
||||
dumpSchedulerLogsLatency.add(duration);
|
||||
}
|
||||
|
||||
public void succeededGetActivitiesLatencyRetrieved(long duration) {
|
||||
totalSucceededGetActivitiesRetrieved.add(duration);
|
||||
getActivitiesLatency.add(duration);
|
||||
|
@ -1713,6 +1740,10 @@ public final class RouterMetrics {
|
|||
numCancelDelegationTokenFailedRetrieved.incr();
|
||||
}
|
||||
|
||||
public void incrDumpSchedulerLogsFailedRetrieved() {
|
||||
numDumpSchedulerLogsFailedRetrieved.incr();
|
||||
}
|
||||
|
||||
public void incrGetActivitiesFailedRetrieved() {
|
||||
numGetActivitiesFailedRetrieved.incr();
|
||||
}
|
||||
|
|
|
@ -1183,10 +1183,70 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
|
|||
throw new RuntimeException("getSchedulerInfo error.");
|
||||
}
|
||||
|
||||
/**
|
||||
* This method dumps the scheduler logs for the time got in input, and it is
|
||||
* reachable by using {@link RMWSConsts#SCHEDULER_LOGS}.
|
||||
*
|
||||
* @param time the period of time. It is a FormParam.
|
||||
* @param hsr the servlet request
|
||||
* @return the result of the operation
|
||||
* @throws IOException when it cannot create dump log file
|
||||
*/
|
||||
@Override
|
||||
public String dumpSchedulerLogs(String time, HttpServletRequest hsr)
|
||||
throws IOException {
|
||||
throw new NotImplementedException("Code is not implemented");
|
||||
|
||||
// Step1. We will check the time parameter to
|
||||
// ensure that the time parameter is not empty and greater than 0.
|
||||
|
||||
if (StringUtils.isBlank(time)) {
|
||||
routerMetrics.incrDumpSchedulerLogsFailedRetrieved();
|
||||
throw new IllegalArgumentException("Parameter error, the time is empty or null.");
|
||||
}
|
||||
|
||||
try {
|
||||
int period = Integer.parseInt(time);
|
||||
if (period <= 0) {
|
||||
throw new IllegalArgumentException("time must be greater than 0.");
|
||||
}
|
||||
} catch (NumberFormatException e) {
|
||||
routerMetrics.incrDumpSchedulerLogsFailedRetrieved();
|
||||
throw new IllegalArgumentException("time must be a number.");
|
||||
} catch (IllegalArgumentException e) {
|
||||
routerMetrics.incrDumpSchedulerLogsFailedRetrieved();
|
||||
throw e;
|
||||
}
|
||||
|
||||
// Step2. Call dumpSchedulerLogs of each subcluster.
|
||||
try {
|
||||
long startTime = clock.getTime();
|
||||
Map<SubClusterId, SubClusterInfo> subClustersActive = getActiveSubclusters();
|
||||
final HttpServletRequest hsrCopy = clone(hsr);
|
||||
Class[] argsClasses = new Class[]{String.class, HttpServletRequest.class};
|
||||
Object[] args = new Object[]{time, hsrCopy};
|
||||
ClientMethod remoteMethod = new ClientMethod("dumpSchedulerLogs", argsClasses, args);
|
||||
Map<SubClusterInfo, String> dumpSchedulerLogsMap = invokeConcurrent(
|
||||
subClustersActive.values(), remoteMethod, String.class);
|
||||
StringBuilder stringBuilder = new StringBuilder();
|
||||
dumpSchedulerLogsMap.forEach((subClusterInfo, msg) -> {
|
||||
SubClusterId subClusterId = subClusterInfo.getSubClusterId();
|
||||
stringBuilder.append("subClusterId" + subClusterId + " : " + msg + "; ");
|
||||
});
|
||||
long stopTime = clock.getTime();
|
||||
routerMetrics.succeededDumpSchedulerLogsRetrieved(stopTime - startTime);
|
||||
return stringBuilder.toString();
|
||||
} catch (IllegalArgumentException e) {
|
||||
routerMetrics.incrDumpSchedulerLogsFailedRetrieved();
|
||||
RouterServerUtil.logAndThrowRunTimeException(e,
|
||||
"Unable to dump SchedulerLogs by time: %s.", time);
|
||||
} catch (YarnException e) {
|
||||
routerMetrics.incrDumpSchedulerLogsFailedRetrieved();
|
||||
RouterServerUtil.logAndThrowRunTimeException(e,
|
||||
"dumpSchedulerLogs by time = %s error .", time);
|
||||
}
|
||||
|
||||
routerMetrics.incrDumpSchedulerLogsFailedRetrieved();
|
||||
throw new RuntimeException("dumpSchedulerLogs Failed.");
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -534,6 +534,11 @@ public class TestRouterMetrics {
|
|||
metrics.incrRenewDelegationTokenFailedRetrieved();
|
||||
}
|
||||
|
||||
public void getDumpSchedulerLogsFailed() {
|
||||
LOG.info("Mocked: failed DumpSchedulerLogs call");
|
||||
metrics.incrDumpSchedulerLogsFailedRetrieved();
|
||||
}
|
||||
|
||||
public void getActivitiesFailed() {
|
||||
LOG.info("Mocked: failed getBulkActivitie call");
|
||||
metrics.incrGetActivitiesFailedRetrieved();
|
||||
|
@ -774,6 +779,11 @@ public class TestRouterMetrics {
|
|||
metrics.succeededRenewDelegationTokenRetrieved(duration);
|
||||
}
|
||||
|
||||
public void getDumpSchedulerLogsRetrieved(long duration) {
|
||||
LOG.info("Mocked: successful DumpSchedulerLogs call with duration {}", duration);
|
||||
metrics.succeededDumpSchedulerLogsRetrieved(duration);
|
||||
}
|
||||
|
||||
public void getActivitiesRetrieved(long duration) {
|
||||
LOG.info("Mocked: successful GetActivities call with duration {}", duration);
|
||||
metrics.succeededGetActivitiesLatencyRetrieved(duration);
|
||||
|
@ -1618,6 +1628,29 @@ public class TestRouterMetrics {
|
|||
metrics.getRenewDelegationTokenFailedRetrieved());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDumpSchedulerLogsRetrieved() {
|
||||
long totalGoodBefore = metrics.getNumSucceededDumpSchedulerLogsRetrieved();
|
||||
goodSubCluster.getDumpSchedulerLogsRetrieved(150);
|
||||
Assert.assertEquals(totalGoodBefore + 1,
|
||||
metrics.getNumSucceededDumpSchedulerLogsRetrieved());
|
||||
Assert.assertEquals(150,
|
||||
metrics.getLatencySucceededDumpSchedulerLogsRetrieved(), ASSERT_DOUBLE_DELTA);
|
||||
goodSubCluster.getDumpSchedulerLogsRetrieved(300);
|
||||
Assert.assertEquals(totalGoodBefore + 2,
|
||||
metrics.getNumSucceededDumpSchedulerLogsRetrieved());
|
||||
Assert.assertEquals(225,
|
||||
metrics.getLatencySucceededDumpSchedulerLogsRetrieved(), ASSERT_DOUBLE_DELTA);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDumpSchedulerLogsRetrievedFailed() {
|
||||
long totalBadBefore = metrics.getDumpSchedulerLogsFailedRetrieved();
|
||||
badSubCluster.getDumpSchedulerLogsFailed();
|
||||
Assert.assertEquals(totalBadBefore + 1,
|
||||
metrics.getDumpSchedulerLogsFailedRetrieved());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetActivitiesRetrieved() {
|
||||
long totalGoodBefore = metrics.getNumSucceededGetActivitiesRetrieved();
|
||||
|
|
|
@ -1214,6 +1214,26 @@ public class MockDefaultRequestInterceptorREST
|
|||
|
||||
return new RMQueueAclInfo(true, user.getUserName(), "");
|
||||
}
|
||||
|
||||
public String dumpSchedulerLogs(String time, HttpServletRequest hsr)
|
||||
throws IOException {
|
||||
|
||||
int period = Integer.parseInt(time);
|
||||
if (period <= 0) {
|
||||
throw new BadRequestException("Period must be greater than 0");
|
||||
}
|
||||
|
||||
return "Capacity scheduler logs are being created.";
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String dumpSchedulerLogs(String time, HttpServletRequest hsr) throws IOException {
|
||||
ResourceManager mockResourceManager = mock(ResourceManager.class);
|
||||
Configuration conf = new YarnConfiguration();
|
||||
MockRMWebServices webSvc = new MockRMWebServices(mockResourceManager, conf,
|
||||
mock(HttpServletResponse.class));
|
||||
return webSvc.dumpSchedulerLogs(time, hsr);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -134,7 +134,6 @@ import org.junit.Test;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
||||
import static org.apache.hadoop.yarn.conf.YarnConfiguration.RM_DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT;
|
||||
import static org.apache.hadoop.yarn.conf.YarnConfiguration.RM_DELEGATION_KEY_UPDATE_INTERVAL_KEY;
|
||||
import static org.apache.hadoop.yarn.conf.YarnConfiguration.RM_DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT;
|
||||
|
@ -1785,6 +1784,41 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest {
|
|||
Assert.assertEquals(response.getStatus(), Status.OK.getStatusCode());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDumpSchedulerLogs() throws Exception {
|
||||
HttpServletRequest mockHsr = mockHttpServletRequestByUserName("admin");
|
||||
String dumpSchedulerLogsMsg = interceptor.dumpSchedulerLogs("1", mockHsr);
|
||||
|
||||
// We cannot guarantee the calling order of the sub-clusters,
|
||||
// We guarantee that the returned result contains the information of each subCluster.
|
||||
Assert.assertNotNull(dumpSchedulerLogsMsg);
|
||||
subClusters.stream().forEach(subClusterId -> {
|
||||
String subClusterMsg =
|
||||
"subClusterId" + subClusterId + " : Capacity scheduler logs are being created.; ";
|
||||
Assert.assertTrue(dumpSchedulerLogsMsg.contains(subClusterMsg));
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDumpSchedulerLogsError() throws Exception {
|
||||
HttpServletRequest mockHsr = mockHttpServletRequestByUserName("admin");
|
||||
|
||||
// time is empty
|
||||
LambdaTestUtils.intercept(IllegalArgumentException.class,
|
||||
"Parameter error, the time is empty or null.",
|
||||
() -> interceptor.dumpSchedulerLogs(null, mockHsr));
|
||||
|
||||
// time is negative
|
||||
LambdaTestUtils.intercept(IllegalArgumentException.class,
|
||||
"time must be greater than 0.",
|
||||
() -> interceptor.dumpSchedulerLogs("-1", mockHsr));
|
||||
|
||||
// time is non-numeric
|
||||
LambdaTestUtils.intercept(IllegalArgumentException.class,
|
||||
"time must be a number.",
|
||||
() -> interceptor.dumpSchedulerLogs("abc", mockHsr));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetActivitiesNormal() {
|
||||
ActivitiesInfo activitiesInfo = interceptor.getActivities(null, "1", "DIAGNOSTIC");
|
||||
|
|
Loading…
Reference in New Issue