YARN-3671. Integrate Federation services with ResourceManager. Contributed by Subru Krishnan
This commit is contained in:
parent
3307564a5f
commit
8573c286e2
|
@ -2568,9 +2568,6 @@ public class YarnConfiguration extends Configuration {
|
||||||
FEDERATION_PREFIX + "failover.enabled";
|
FEDERATION_PREFIX + "failover.enabled";
|
||||||
public static final boolean DEFAULT_FEDERATION_FAILOVER_ENABLED = true;
|
public static final boolean DEFAULT_FEDERATION_FAILOVER_ENABLED = true;
|
||||||
|
|
||||||
public static final String FEDERATION_SUBCLUSTER_ID =
|
|
||||||
FEDERATION_PREFIX + "sub-cluster.id";
|
|
||||||
|
|
||||||
public static final String FEDERATION_STATESTORE_CLIENT_CLASS =
|
public static final String FEDERATION_STATESTORE_CLIENT_CLASS =
|
||||||
FEDERATION_PREFIX + "state-store.class";
|
FEDERATION_PREFIX + "state-store.class";
|
||||||
|
|
||||||
|
@ -2583,6 +2580,14 @@ public class YarnConfiguration extends Configuration {
|
||||||
// 5 minutes
|
// 5 minutes
|
||||||
public static final int DEFAULT_FEDERATION_CACHE_TIME_TO_LIVE_SECS = 5 * 60;
|
public static final int DEFAULT_FEDERATION_CACHE_TIME_TO_LIVE_SECS = 5 * 60;
|
||||||
|
|
||||||
|
public static final String FEDERATION_STATESTORE_HEARTBEAT_INTERVAL_SECS =
|
||||||
|
FEDERATION_PREFIX + "state-store.heartbeat-interval-secs";
|
||||||
|
|
||||||
|
// 5 minutes
|
||||||
|
public static final int
|
||||||
|
DEFAULT_FEDERATION_STATESTORE_HEARTBEAT_INTERVAL_SECS =
|
||||||
|
5 * 60;
|
||||||
|
|
||||||
public static final String FEDERATION_MACHINE_LIST =
|
public static final String FEDERATION_MACHINE_LIST =
|
||||||
FEDERATION_PREFIX + "machine-list";
|
FEDERATION_PREFIX + "machine-list";
|
||||||
|
|
||||||
|
|
|
@ -71,10 +71,10 @@ public class TestYarnConfigurationFields extends TestConfigurationFieldsBase {
|
||||||
// Federation default configs to be ignored
|
// Federation default configs to be ignored
|
||||||
configurationPropsToSkipCompare
|
configurationPropsToSkipCompare
|
||||||
.add(YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_CLIENT_CLASS);
|
.add(YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_CLIENT_CLASS);
|
||||||
configurationPropsToSkipCompare
|
|
||||||
.add(YarnConfiguration.FEDERATION_SUBCLUSTER_ID);
|
|
||||||
configurationPropsToSkipCompare
|
configurationPropsToSkipCompare
|
||||||
.add(YarnConfiguration.FEDERATION_FAILOVER_ENABLED);
|
.add(YarnConfiguration.FEDERATION_FAILOVER_ENABLED);
|
||||||
|
configurationPropsToSkipCompare
|
||||||
|
.add(YarnConfiguration.FEDERATION_STATESTORE_HEARTBEAT_INTERVAL_SECS);
|
||||||
|
|
||||||
// Ignore blacklisting nodes for AM failures feature since it is still a
|
// Ignore blacklisting nodes for AM failures feature since it is still a
|
||||||
// "work in progress"
|
// "work in progress"
|
||||||
|
|
|
@ -134,7 +134,7 @@ public final class FederationProxyProviderUtil {
|
||||||
// are based out of conf
|
// are based out of conf
|
||||||
private static void updateConf(Configuration conf,
|
private static void updateConf(Configuration conf,
|
||||||
SubClusterId subClusterId) {
|
SubClusterId subClusterId) {
|
||||||
conf.set(YarnConfiguration.FEDERATION_SUBCLUSTER_ID, subClusterId.getId());
|
conf.set(YarnConfiguration.RM_CLUSTER_ID, subClusterId.getId());
|
||||||
// In a Federation setting, we will connect to not just the local cluster RM
|
// In a Federation setting, we will connect to not just the local cluster RM
|
||||||
// but also multiple external RMs. The membership information of all the RMs
|
// but also multiple external RMs. The membership information of all the RMs
|
||||||
// that are currently
|
// that are currently
|
||||||
|
|
|
@ -74,8 +74,8 @@ public class FederationRMFailoverProxyProvider<T>
|
||||||
this.protocol = proto;
|
this.protocol = proto;
|
||||||
this.rmProxy.checkAllowedProtocols(this.protocol);
|
this.rmProxy.checkAllowedProtocols(this.protocol);
|
||||||
String clusterId =
|
String clusterId =
|
||||||
configuration.get(YarnConfiguration.FEDERATION_SUBCLUSTER_ID);
|
configuration.get(YarnConfiguration.RM_CLUSTER_ID);
|
||||||
Preconditions.checkNotNull(clusterId, "Missing Federation SubClusterId");
|
Preconditions.checkNotNull(clusterId, "Missing RM ClusterId");
|
||||||
this.subClusterId = SubClusterId.newInstance(clusterId);
|
this.subClusterId = SubClusterId.newInstance(clusterId);
|
||||||
this.facade = facade.getInstance();
|
this.facade = facade.getInstance();
|
||||||
if (configuration instanceof YarnConfiguration) {
|
if (configuration instanceof YarnConfiguration) {
|
||||||
|
|
|
@ -159,7 +159,10 @@ public final class FederationMembershipStateStoreInputValidator {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Validate if the SubCluster Info are present or not.
|
* Validate if all the required fields on {@link SubClusterInfo} are present
|
||||||
|
* or not. {@code Capability} will be empty as the corresponding
|
||||||
|
* {@code ResourceManager} is in the process of initialization during
|
||||||
|
* registration.
|
||||||
*
|
*
|
||||||
* @param subClusterInfo the information of the subcluster to be verified
|
* @param subClusterInfo the information of the subcluster to be verified
|
||||||
* @throws FederationStateStoreInvalidInputException if the SubCluster Info
|
* @throws FederationStateStoreInvalidInputException if the SubCluster Info
|
||||||
|
@ -194,8 +197,6 @@ public final class FederationMembershipStateStoreInputValidator {
|
||||||
// validate subcluster state
|
// validate subcluster state
|
||||||
checkSubClusterState(subClusterInfo.getState());
|
checkSubClusterState(subClusterInfo.getState());
|
||||||
|
|
||||||
// validate subcluster capability
|
|
||||||
checkCapability(subClusterInfo.getCapability());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -242,11 +242,8 @@ public class TestFederationStateStoreInputValidator {
|
||||||
SubClusterRegisterRequest.newInstance(subClusterInfo);
|
SubClusterRegisterRequest.newInstance(subClusterInfo);
|
||||||
FederationMembershipStateStoreInputValidator
|
FederationMembershipStateStoreInputValidator
|
||||||
.validateSubClusterRegisterRequest(request);
|
.validateSubClusterRegisterRequest(request);
|
||||||
Assert.fail();
|
|
||||||
} catch (FederationStateStoreInvalidInputException e) {
|
} catch (FederationStateStoreInvalidInputException e) {
|
||||||
LOG.info(e.getMessage());
|
Assert.fail(e.getMessage());
|
||||||
Assert.assertTrue(
|
|
||||||
e.getMessage().startsWith("Invalid capability information."));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Execution with Empty Capability
|
// Execution with Empty Capability
|
||||||
|
@ -260,11 +257,8 @@ public class TestFederationStateStoreInputValidator {
|
||||||
SubClusterRegisterRequest.newInstance(subClusterInfo);
|
SubClusterRegisterRequest.newInstance(subClusterInfo);
|
||||||
FederationMembershipStateStoreInputValidator
|
FederationMembershipStateStoreInputValidator
|
||||||
.validateSubClusterRegisterRequest(request);
|
.validateSubClusterRegisterRequest(request);
|
||||||
Assert.fail();
|
|
||||||
} catch (FederationStateStoreInvalidInputException e) {
|
} catch (FederationStateStoreInvalidInputException e) {
|
||||||
LOG.info(e.getMessage());
|
Assert.fail(e.getMessage());
|
||||||
Assert.assertTrue(
|
|
||||||
e.getMessage().startsWith("Invalid capability information."));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -64,6 +64,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
|
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
|
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
|
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.federation.FederationStateStoreService;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.metrics.NoOpSystemMetricPublisher;
|
import org.apache.hadoop.yarn.server.resourcemanager.metrics.NoOpSystemMetricPublisher;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
|
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.metrics.TimelineServiceV1Publisher;
|
import org.apache.hadoop.yarn.server.resourcemanager.metrics.TimelineServiceV1Publisher;
|
||||||
|
@ -185,6 +186,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
||||||
protected RMAppManager rmAppManager;
|
protected RMAppManager rmAppManager;
|
||||||
protected ApplicationACLsManager applicationACLsManager;
|
protected ApplicationACLsManager applicationACLsManager;
|
||||||
protected QueueACLsManager queueACLsManager;
|
protected QueueACLsManager queueACLsManager;
|
||||||
|
private FederationStateStoreService federationStateStoreService;
|
||||||
private WebApp webApp;
|
private WebApp webApp;
|
||||||
private AppReportFetcher fetcher = null;
|
private AppReportFetcher fetcher = null;
|
||||||
protected ResourceTrackerService resourceTracker;
|
protected ResourceTrackerService resourceTracker;
|
||||||
|
@ -499,6 +501,10 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
||||||
return new RMTimelineCollectorManager(this);
|
return new RMTimelineCollectorManager(this);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private FederationStateStoreService createFederationStateStoreService() {
|
||||||
|
return new FederationStateStoreService(rmContext);
|
||||||
|
}
|
||||||
|
|
||||||
protected SystemMetricsPublisher createSystemMetricsPublisher() {
|
protected SystemMetricsPublisher createSystemMetricsPublisher() {
|
||||||
SystemMetricsPublisher publisher;
|
SystemMetricsPublisher publisher;
|
||||||
if (YarnConfiguration.timelineServiceEnabled(conf) &&
|
if (YarnConfiguration.timelineServiceEnabled(conf) &&
|
||||||
|
@ -724,6 +730,20 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
||||||
delegationTokenRenewer.setRMContext(rmContext);
|
delegationTokenRenewer.setRMContext(rmContext);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if(HAUtil.isFederationEnabled(conf)) {
|
||||||
|
String cId = YarnConfiguration.getClusterId(conf);
|
||||||
|
if (cId.isEmpty()) {
|
||||||
|
String errMsg =
|
||||||
|
"Cannot initialize RM as Federation is enabled"
|
||||||
|
+ " but cluster id is not configured.";
|
||||||
|
LOG.error(errMsg);
|
||||||
|
throw new YarnRuntimeException(errMsg);
|
||||||
|
}
|
||||||
|
federationStateStoreService = createFederationStateStoreService();
|
||||||
|
addIfService(federationStateStoreService);
|
||||||
|
LOG.info("Initialized Federation membership.");
|
||||||
|
}
|
||||||
|
|
||||||
new RMNMInfo(rmContext, scheduler);
|
new RMNMInfo(rmContext, scheduler);
|
||||||
|
|
||||||
super.serviceInit(conf);
|
super.serviceInit(conf);
|
||||||
|
@ -1349,6 +1369,12 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
||||||
return this.queueACLsManager;
|
return this.queueACLsManager;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Private
|
||||||
|
@VisibleForTesting
|
||||||
|
public FederationStateStoreService getFederationStateStoreService() {
|
||||||
|
return this.federationStateStoreService;
|
||||||
|
}
|
||||||
|
|
||||||
@Private
|
@Private
|
||||||
WebApp getWebapp() {
|
WebApp getWebapp() {
|
||||||
return this.webApp;
|
return this.webApp;
|
||||||
|
|
|
@ -0,0 +1,108 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.federation;
|
||||||
|
|
||||||
|
import java.io.StringWriter;
|
||||||
|
|
||||||
|
import javax.xml.bind.JAXBException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatRequest;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import com.sun.jersey.api.json.JSONConfiguration;
|
||||||
|
import com.sun.jersey.api.json.JSONJAXBContext;
|
||||||
|
import com.sun.jersey.api.json.JSONMarshaller;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Periodic heart beat from a <code>ResourceManager</code> participating in
|
||||||
|
* federation to indicate liveliness. The heart beat publishes the current
|
||||||
|
* capabilities as represented by {@link ClusterMetricsInfo} of the sub cluster.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public class FederationStateStoreHeartbeat implements Runnable {
|
||||||
|
|
||||||
|
private static final Logger LOG =
|
||||||
|
LoggerFactory.getLogger(FederationStateStoreHeartbeat.class);
|
||||||
|
|
||||||
|
private SubClusterId subClusterId;
|
||||||
|
private FederationStateStore stateStoreService;
|
||||||
|
|
||||||
|
private final ResourceScheduler rs;
|
||||||
|
|
||||||
|
private StringWriter currentClusterState;
|
||||||
|
private JSONJAXBContext jc;
|
||||||
|
private JSONMarshaller marshaller;
|
||||||
|
private String capability;
|
||||||
|
|
||||||
|
public FederationStateStoreHeartbeat(SubClusterId subClusterId,
|
||||||
|
FederationStateStore stateStoreClient, ResourceScheduler scheduler) {
|
||||||
|
this.stateStoreService = stateStoreClient;
|
||||||
|
this.subClusterId = subClusterId;
|
||||||
|
this.rs = scheduler;
|
||||||
|
// Initialize the JAXB Marshaller
|
||||||
|
this.currentClusterState = new StringWriter();
|
||||||
|
try {
|
||||||
|
this.jc = new JSONJAXBContext(
|
||||||
|
JSONConfiguration.mapped().rootUnwrapping(false).build(),
|
||||||
|
ClusterMetricsInfo.class);
|
||||||
|
marshaller = jc.createJSONMarshaller();
|
||||||
|
} catch (JAXBException e) {
|
||||||
|
LOG.warn("Exception while trying to initialize JAXB context.", e);
|
||||||
|
}
|
||||||
|
LOG.info("Initialized Federation membership for cluster with timestamp: "
|
||||||
|
+ ResourceManager.getClusterTimeStamp());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the current cluster state as a JSON string representation of the
|
||||||
|
* {@link ClusterMetricsInfo}.
|
||||||
|
*/
|
||||||
|
private void updateClusterState() {
|
||||||
|
try {
|
||||||
|
// get the current state
|
||||||
|
currentClusterState.getBuffer().setLength(0);
|
||||||
|
ClusterMetricsInfo clusterMetricsInfo = new ClusterMetricsInfo(rs);
|
||||||
|
marshaller.marshallToJSON(clusterMetricsInfo, currentClusterState);
|
||||||
|
capability = currentClusterState.toString();
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.warn("Exception while trying to generate cluster state,"
|
||||||
|
+ " so reverting to last know state.", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized void run() {
|
||||||
|
try {
|
||||||
|
updateClusterState();
|
||||||
|
SubClusterHeartbeatRequest request = SubClusterHeartbeatRequest
|
||||||
|
.newInstance(subClusterId, SubClusterState.SC_RUNNING, capability);
|
||||||
|
stateStoreService.subClusterHeartbeat(request);
|
||||||
|
LOG.debug("Sending the heartbeat with capability: {}", capability);
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.warn("Exception when trying to heartbeat: ", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,304 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.federation;
|
||||||
|
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.io.retry.RetryPolicy;
|
||||||
|
import org.apache.hadoop.net.NetUtils;
|
||||||
|
import org.apache.hadoop.service.AbstractService;
|
||||||
|
import org.apache.hadoop.util.concurrent.HadoopExecutors;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterRequest;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterRequest;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsRequest;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationRequest;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationRequest;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatRequest;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterRequest;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
|
||||||
|
import org.apache.hadoop.yarn.server.records.Version;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||||
|
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Implements {@link FederationStateStore} and provides a service for
|
||||||
|
* participating in the federation membership.
|
||||||
|
*/
|
||||||
|
public class FederationStateStoreService extends AbstractService
|
||||||
|
implements FederationStateStore {
|
||||||
|
|
||||||
|
public static final Logger LOG =
|
||||||
|
LoggerFactory.getLogger(FederationStateStoreService.class);
|
||||||
|
|
||||||
|
private Configuration config;
|
||||||
|
private ScheduledExecutorService scheduledExecutorService;
|
||||||
|
private FederationStateStoreHeartbeat stateStoreHeartbeat;
|
||||||
|
private FederationStateStore stateStoreClient = null;
|
||||||
|
private SubClusterId subClusterId;
|
||||||
|
private long heartbeatInterval;
|
||||||
|
private RMContext rmContext;
|
||||||
|
|
||||||
|
public FederationStateStoreService(RMContext rmContext) {
|
||||||
|
super(FederationStateStoreService.class.getName());
|
||||||
|
LOG.info("FederationStateStoreService initialized");
|
||||||
|
this.rmContext = rmContext;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void serviceInit(Configuration conf) throws Exception {
|
||||||
|
|
||||||
|
this.config = conf;
|
||||||
|
|
||||||
|
RetryPolicy retryPolicy =
|
||||||
|
FederationStateStoreFacade.createRetryPolicy(conf);
|
||||||
|
|
||||||
|
this.stateStoreClient =
|
||||||
|
(FederationStateStore) FederationStateStoreFacade.createRetryInstance(
|
||||||
|
conf, YarnConfiguration.FEDERATION_STATESTORE_CLIENT_CLASS,
|
||||||
|
YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_CLIENT_CLASS,
|
||||||
|
FederationStateStore.class, retryPolicy);
|
||||||
|
this.stateStoreClient.init(conf);
|
||||||
|
LOG.info("Initialized state store client class");
|
||||||
|
|
||||||
|
this.subClusterId =
|
||||||
|
SubClusterId.newInstance(YarnConfiguration.getClusterId(conf));
|
||||||
|
|
||||||
|
heartbeatInterval = conf.getLong(
|
||||||
|
YarnConfiguration.FEDERATION_STATESTORE_HEARTBEAT_INTERVAL_SECS,
|
||||||
|
YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_HEARTBEAT_INTERVAL_SECS);
|
||||||
|
if (heartbeatInterval <= 0) {
|
||||||
|
heartbeatInterval =
|
||||||
|
YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_HEARTBEAT_INTERVAL_SECS;
|
||||||
|
}
|
||||||
|
LOG.info("Initialized federation membership service.");
|
||||||
|
|
||||||
|
super.serviceInit(conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void serviceStart() throws Exception {
|
||||||
|
|
||||||
|
registerAndInitializeHeartbeat();
|
||||||
|
|
||||||
|
super.serviceStart();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void serviceStop() throws Exception {
|
||||||
|
Exception ex = null;
|
||||||
|
try {
|
||||||
|
if (this.scheduledExecutorService != null
|
||||||
|
&& !this.scheduledExecutorService.isShutdown()) {
|
||||||
|
this.scheduledExecutorService.shutdown();
|
||||||
|
LOG.info("Stopped federation membership heartbeat");
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.error("Failed to shutdown ScheduledExecutorService", e);
|
||||||
|
ex = e;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (this.stateStoreClient != null) {
|
||||||
|
try {
|
||||||
|
deregisterSubCluster(SubClusterDeregisterRequest
|
||||||
|
.newInstance(subClusterId, SubClusterState.SC_UNREGISTERED));
|
||||||
|
} finally {
|
||||||
|
this.stateStoreClient.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (ex != null) {
|
||||||
|
throw ex;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Return a client accessible string representation of the service address.
|
||||||
|
private String getServiceAddress(InetSocketAddress address) {
|
||||||
|
InetSocketAddress socketAddress = NetUtils.getConnectAddress(address);
|
||||||
|
return socketAddress.getAddress().getHostAddress() + ":"
|
||||||
|
+ socketAddress.getPort();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void registerAndInitializeHeartbeat() {
|
||||||
|
String clientRMAddress =
|
||||||
|
getServiceAddress(rmContext.getClientRMService().getBindAddress());
|
||||||
|
String amRMAddress = getServiceAddress(
|
||||||
|
rmContext.getApplicationMasterService().getBindAddress());
|
||||||
|
String rmAdminAddress = getServiceAddress(
|
||||||
|
config.getSocketAddr(YarnConfiguration.RM_ADMIN_ADDRESS,
|
||||||
|
YarnConfiguration.DEFAULT_RM_ADMIN_ADDRESS,
|
||||||
|
YarnConfiguration.DEFAULT_RM_ADMIN_PORT));
|
||||||
|
String webAppAddress =
|
||||||
|
WebAppUtils.getResolvedRemoteRMWebAppURLWithoutScheme(config);
|
||||||
|
|
||||||
|
SubClusterInfo subClusterInfo = SubClusterInfo.newInstance(subClusterId,
|
||||||
|
amRMAddress, clientRMAddress, rmAdminAddress, webAppAddress,
|
||||||
|
SubClusterState.SC_NEW, ResourceManager.getClusterTimeStamp(), "");
|
||||||
|
try {
|
||||||
|
registerSubCluster(SubClusterRegisterRequest.newInstance(subClusterInfo));
|
||||||
|
LOG.info("Successfully registered for federation subcluster: {}",
|
||||||
|
subClusterInfo);
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new YarnRuntimeException(
|
||||||
|
"Failed to register Federation membership with the StateStore", e);
|
||||||
|
}
|
||||||
|
stateStoreHeartbeat = new FederationStateStoreHeartbeat(subClusterId,
|
||||||
|
stateStoreClient, rmContext.getScheduler());
|
||||||
|
scheduledExecutorService =
|
||||||
|
HadoopExecutors.newSingleThreadScheduledExecutor();
|
||||||
|
scheduledExecutorService.scheduleWithFixedDelay(stateStoreHeartbeat,
|
||||||
|
heartbeatInterval, heartbeatInterval, TimeUnit.SECONDS);
|
||||||
|
LOG.info("Started federation membership heartbeat with interval: {}",
|
||||||
|
heartbeatInterval);
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public FederationStateStore getStateStoreClient() {
|
||||||
|
return stateStoreClient;
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public FederationStateStoreHeartbeat getStateStoreHeartbeatThread() {
|
||||||
|
return stateStoreHeartbeat;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Version getCurrentVersion() {
|
||||||
|
return stateStoreClient.getCurrentVersion();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Version loadVersion() {
|
||||||
|
return stateStoreClient.getCurrentVersion();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public GetSubClusterPolicyConfigurationResponse getPolicyConfiguration(
|
||||||
|
GetSubClusterPolicyConfigurationRequest request) throws YarnException {
|
||||||
|
return stateStoreClient.getPolicyConfiguration(request);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public SetSubClusterPolicyConfigurationResponse setPolicyConfiguration(
|
||||||
|
SetSubClusterPolicyConfigurationRequest request) throws YarnException {
|
||||||
|
return stateStoreClient.setPolicyConfiguration(request);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public GetSubClusterPoliciesConfigurationsResponse getPoliciesConfigurations(
|
||||||
|
GetSubClusterPoliciesConfigurationsRequest request) throws YarnException {
|
||||||
|
return stateStoreClient.getPoliciesConfigurations(request);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public SubClusterRegisterResponse registerSubCluster(
|
||||||
|
SubClusterRegisterRequest registerSubClusterRequest)
|
||||||
|
throws YarnException {
|
||||||
|
return stateStoreClient.registerSubCluster(registerSubClusterRequest);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public SubClusterDeregisterResponse deregisterSubCluster(
|
||||||
|
SubClusterDeregisterRequest subClusterDeregisterRequest)
|
||||||
|
throws YarnException {
|
||||||
|
return stateStoreClient.deregisterSubCluster(subClusterDeregisterRequest);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public SubClusterHeartbeatResponse subClusterHeartbeat(
|
||||||
|
SubClusterHeartbeatRequest subClusterHeartbeatRequest)
|
||||||
|
throws YarnException {
|
||||||
|
return stateStoreClient.subClusterHeartbeat(subClusterHeartbeatRequest);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public GetSubClusterInfoResponse getSubCluster(
|
||||||
|
GetSubClusterInfoRequest subClusterRequest) throws YarnException {
|
||||||
|
return stateStoreClient.getSubCluster(subClusterRequest);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public GetSubClustersInfoResponse getSubClusters(
|
||||||
|
GetSubClustersInfoRequest subClustersRequest) throws YarnException {
|
||||||
|
return stateStoreClient.getSubClusters(subClustersRequest);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public AddApplicationHomeSubClusterResponse addApplicationHomeSubCluster(
|
||||||
|
AddApplicationHomeSubClusterRequest request) throws YarnException {
|
||||||
|
return stateStoreClient.addApplicationHomeSubCluster(request);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public UpdateApplicationHomeSubClusterResponse updateApplicationHomeSubCluster(
|
||||||
|
UpdateApplicationHomeSubClusterRequest request) throws YarnException {
|
||||||
|
return stateStoreClient.updateApplicationHomeSubCluster(request);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public GetApplicationHomeSubClusterResponse getApplicationHomeSubCluster(
|
||||||
|
GetApplicationHomeSubClusterRequest request) throws YarnException {
|
||||||
|
return stateStoreClient.getApplicationHomeSubCluster(request);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public GetApplicationsHomeSubClusterResponse getApplicationsHomeSubCluster(
|
||||||
|
GetApplicationsHomeSubClusterRequest request) throws YarnException {
|
||||||
|
return stateStoreClient.getApplicationsHomeSubCluster(request);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public DeleteApplicationHomeSubClusterResponse deleteApplicationHomeSubCluster(
|
||||||
|
DeleteApplicationHomeSubClusterRequest request) throws YarnException {
|
||||||
|
return stateStoreClient.deleteApplicationHomeSubCluster(request);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,17 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with this
|
||||||
|
* work for additional information regarding copyright ownership. The ASF
|
||||||
|
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
* <p>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations under
|
||||||
|
* the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.federation;
|
|
@ -65,7 +65,10 @@ public class ClusterMetricsInfo {
|
||||||
} // JAXB needs this
|
} // JAXB needs this
|
||||||
|
|
||||||
public ClusterMetricsInfo(final ResourceManager rm) {
|
public ClusterMetricsInfo(final ResourceManager rm) {
|
||||||
ResourceScheduler rs = rm.getResourceScheduler();
|
this(rm.getResourceScheduler());
|
||||||
|
}
|
||||||
|
|
||||||
|
public ClusterMetricsInfo(final ResourceScheduler rs) {
|
||||||
QueueMetrics metrics = rs.getRootQueueMetrics();
|
QueueMetrics metrics = rs.getRootQueueMetrics();
|
||||||
ClusterMetrics clusterMetrics = ClusterMetrics.getMetrics();
|
ClusterMetrics clusterMetrics = ClusterMetrics.getMetrics();
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,170 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with this
|
||||||
|
* work for additional information regarding copyright ownership. The ASF
|
||||||
|
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
* <p>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations under
|
||||||
|
* the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.federation;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.StringReader;
|
||||||
|
|
||||||
|
import javax.xml.bind.JAXBException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.ha.HAServiceProtocol;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import com.sun.jersey.api.json.JSONConfiguration;
|
||||||
|
import com.sun.jersey.api.json.JSONJAXBContext;
|
||||||
|
import com.sun.jersey.api.json.JSONUnmarshaller;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Unit tests for FederationStateStoreService.
|
||||||
|
*/
|
||||||
|
public class TestFederationRMStateStoreService {
|
||||||
|
|
||||||
|
private final HAServiceProtocol.StateChangeRequestInfo requestInfo =
|
||||||
|
new HAServiceProtocol.StateChangeRequestInfo(
|
||||||
|
HAServiceProtocol.RequestSource.REQUEST_BY_USER);
|
||||||
|
private final SubClusterId subClusterId = SubClusterId.newInstance("SC-1");
|
||||||
|
private final GetSubClusterInfoRequest request =
|
||||||
|
GetSubClusterInfoRequest.newInstance(subClusterId);
|
||||||
|
|
||||||
|
private Configuration conf;
|
||||||
|
private FederationStateStore stateStore;
|
||||||
|
private long lastHearbeatTS = 0;
|
||||||
|
private JSONJAXBContext jc;
|
||||||
|
private JSONUnmarshaller unmarshaller;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() throws IOException, YarnException, JAXBException {
|
||||||
|
conf = new YarnConfiguration();
|
||||||
|
jc = new JSONJAXBContext(
|
||||||
|
JSONConfiguration.mapped().rootUnwrapping(false).build(),
|
||||||
|
ClusterMetricsInfo.class);
|
||||||
|
unmarshaller = jc.createJSONUnmarshaller();
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() throws Exception {
|
||||||
|
unmarshaller = null;
|
||||||
|
jc = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFederationStateStoreService() throws Exception {
|
||||||
|
conf.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true);
|
||||||
|
conf.set(YarnConfiguration.RM_CLUSTER_ID, subClusterId.getId());
|
||||||
|
final MockRM rm = new MockRM(conf);
|
||||||
|
|
||||||
|
// Initially there should be no entry for the sub-cluster
|
||||||
|
rm.init(conf);
|
||||||
|
stateStore = rm.getFederationStateStoreService().getStateStoreClient();
|
||||||
|
try {
|
||||||
|
stateStore.getSubCluster(request);
|
||||||
|
Assert.fail("There should be no entry for the sub-cluster.");
|
||||||
|
} catch (YarnException e) {
|
||||||
|
Assert.assertTrue(e.getMessage().endsWith("does not exist"));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Validate if sub-cluster is registered
|
||||||
|
rm.start();
|
||||||
|
String capability = checkSubClusterInfo(SubClusterState.SC_NEW);
|
||||||
|
Assert.assertTrue(capability.isEmpty());
|
||||||
|
|
||||||
|
// Heartbeat to see if sub-cluster transitions to running
|
||||||
|
FederationStateStoreHeartbeat storeHeartbeat =
|
||||||
|
rm.getFederationStateStoreService().getStateStoreHeartbeatThread();
|
||||||
|
storeHeartbeat.run();
|
||||||
|
capability = checkSubClusterInfo(SubClusterState.SC_RUNNING);
|
||||||
|
checkClusterMetricsInfo(capability, 0);
|
||||||
|
|
||||||
|
// heartbeat again after adding a node.
|
||||||
|
rm.registerNode("127.0.0.1:1234", 4 * 1024);
|
||||||
|
storeHeartbeat.run();
|
||||||
|
capability = checkSubClusterInfo(SubClusterState.SC_RUNNING);
|
||||||
|
checkClusterMetricsInfo(capability, 1);
|
||||||
|
|
||||||
|
// Validate sub-cluster deregistration
|
||||||
|
rm.getFederationStateStoreService()
|
||||||
|
.deregisterSubCluster(SubClusterDeregisterRequest
|
||||||
|
.newInstance(subClusterId, SubClusterState.SC_UNREGISTERED));
|
||||||
|
checkSubClusterInfo(SubClusterState.SC_UNREGISTERED);
|
||||||
|
|
||||||
|
// check after failover
|
||||||
|
explicitFailover(rm);
|
||||||
|
|
||||||
|
capability = checkSubClusterInfo(SubClusterState.SC_NEW);
|
||||||
|
Assert.assertTrue(capability.isEmpty());
|
||||||
|
|
||||||
|
// Heartbeat to see if sub-cluster transitions to running
|
||||||
|
storeHeartbeat =
|
||||||
|
rm.getFederationStateStoreService().getStateStoreHeartbeatThread();
|
||||||
|
storeHeartbeat.run();
|
||||||
|
capability = checkSubClusterInfo(SubClusterState.SC_RUNNING);
|
||||||
|
checkClusterMetricsInfo(capability, 0);
|
||||||
|
|
||||||
|
// heartbeat again after adding a node.
|
||||||
|
rm.registerNode("127.0.0.1:1234", 4 * 1024);
|
||||||
|
storeHeartbeat.run();
|
||||||
|
capability = checkSubClusterInfo(SubClusterState.SC_RUNNING);
|
||||||
|
checkClusterMetricsInfo(capability, 1);
|
||||||
|
|
||||||
|
rm.stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void explicitFailover(MockRM rm) throws IOException {
|
||||||
|
rm.getAdminService().transitionToStandby(requestInfo);
|
||||||
|
Assert.assertTrue(rm.getRMContext()
|
||||||
|
.getHAServiceState() == HAServiceProtocol.HAServiceState.STANDBY);
|
||||||
|
rm.getAdminService().transitionToActive(requestInfo);
|
||||||
|
Assert.assertTrue(rm.getRMContext()
|
||||||
|
.getHAServiceState() == HAServiceProtocol.HAServiceState.ACTIVE);
|
||||||
|
lastHearbeatTS = 0;
|
||||||
|
stateStore = rm.getFederationStateStoreService().getStateStoreClient();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void checkClusterMetricsInfo(String capability, int numNodes)
|
||||||
|
throws JAXBException {
|
||||||
|
ClusterMetricsInfo clusterMetricsInfo = unmarshaller.unmarshalFromJSON(
|
||||||
|
new StringReader(capability), ClusterMetricsInfo.class);
|
||||||
|
Assert.assertEquals(numNodes, clusterMetricsInfo.getTotalNodes());
|
||||||
|
}
|
||||||
|
|
||||||
|
private String checkSubClusterInfo(SubClusterState state)
|
||||||
|
throws YarnException {
|
||||||
|
Assert.assertNotNull(stateStore.getSubCluster(request));
|
||||||
|
SubClusterInfo response =
|
||||||
|
stateStore.getSubCluster(request).getSubClusterInfo();
|
||||||
|
Assert.assertEquals(state, response.getState());
|
||||||
|
Assert.assertTrue(response.getLastHeartBeat() >= lastHearbeatTS);
|
||||||
|
lastHearbeatTS = response.getLastHeartBeat();
|
||||||
|
return response.getCapability();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue