YARN-1368. Added core functionality of recovering container state into schedulers after ResourceManager Restart so as to preserve running work in the cluster. Contributed by Jian He.
svn merge --ignore-ancestry -c 1601303 ../../trunk/ git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1601304 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
d678a9ecf9
commit
319e422733
|
@ -17,6 +17,10 @@ Release 2.5.0 - UNRELEASED
|
|||
YARN-1338. Recover localized resource cache state upon nodemanager restart
|
||||
(Jason Lowe via junping_du)
|
||||
|
||||
YARN-1368. Added core functionality of recovering container state into
|
||||
schedulers after ResourceManager Restart so as to preserve running work in
|
||||
the cluster. (Jian He via vinodkv)
|
||||
|
||||
IMPROVEMENTS
|
||||
|
||||
YARN-1479. Invalid NaN values in Hadoop REST API JSON response (Chen He via
|
||||
|
|
|
@ -318,6 +318,13 @@ public class YarnConfiguration extends Configuration {
|
|||
public static final String RECOVERY_ENABLED = RM_PREFIX + "recovery.enabled";
|
||||
public static final boolean DEFAULT_RM_RECOVERY_ENABLED = false;
|
||||
|
||||
@Private
|
||||
public static final String RM_WORK_PRESERVING_RECOVERY_ENABLED = RM_PREFIX
|
||||
+ "work-preserving-recovery.enabled";
|
||||
@Private
|
||||
public static final boolean DEFAULT_RM_WORK_PRESERVING_RECOVERY_ENABLED =
|
||||
false;
|
||||
|
||||
/** Zookeeper interaction configs */
|
||||
public static final String RM_ZK_PREFIX = RM_PREFIX + "zk-";
|
||||
|
||||
|
|
|
@ -269,6 +269,14 @@
|
|||
<value>false</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description>Enable RM work preserving recovery. This configuration is private
|
||||
to YARN for experimenting the feature.
|
||||
</description>
|
||||
<name>yarn.resourcemanager.work-preserving-recovery.enabled</name>
|
||||
<value>false</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description>The class to use as the persistent store.
|
||||
|
||||
|
|
|
@ -99,4 +99,6 @@ public interface RMContext {
|
|||
RMApplicationHistoryWriter rmApplicationHistoryWriter);
|
||||
|
||||
ConfigurationProvider getConfigurationProvider();
|
||||
|
||||
boolean isWorkPreservingRecoveryEnabled();
|
||||
}
|
|
@ -60,6 +60,7 @@ public class RMContextImpl implements RMContext {
|
|||
= new ConcurrentHashMap<String, RMNode>();
|
||||
|
||||
private boolean isHAEnabled;
|
||||
private boolean isWorkPreservingRecoveryEnabled;
|
||||
private HAServiceState haServiceState =
|
||||
HAServiceProtocol.HAServiceState.INITIALIZING;
|
||||
|
||||
|
@ -329,6 +330,15 @@ public class RMContextImpl implements RMContext {
|
|||
}
|
||||
}
|
||||
|
||||
public void setWorkPreservingRecoveryEnabled(boolean enabled) {
|
||||
this.isWorkPreservingRecoveryEnabled = enabled;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isWorkPreservingRecoveryEnabled() {
|
||||
return this.isWorkPreservingRecoveryEnabled;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RMApplicationHistoryWriter getRMApplicationHistoryWriter() {
|
||||
return rmApplicationHistoryWriter;
|
||||
|
|
|
@ -364,9 +364,15 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
|||
YarnConfiguration.DEFAULT_RM_RECOVERY_ENABLED);
|
||||
|
||||
RMStateStore rmStore = null;
|
||||
if(isRecoveryEnabled) {
|
||||
if (isRecoveryEnabled) {
|
||||
recoveryEnabled = true;
|
||||
rmStore = RMStateStoreFactory.getStore(conf);
|
||||
boolean isWorkPreservingRecoveryEnabled =
|
||||
conf.getBoolean(
|
||||
YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED,
|
||||
YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_ENABLED);
|
||||
rmContext
|
||||
.setWorkPreservingRecoveryEnabled(isWorkPreservingRecoveryEnabled);
|
||||
} else {
|
||||
recoveryEnabled = false;
|
||||
rmStore = new NullRMStateStore();
|
||||
|
|
|
@ -60,6 +60,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeReconnectEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStartedEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
||||
|
@ -243,11 +244,13 @@ public class ResourceTrackerService extends AbstractService implements
|
|||
Resource capability = request.getResource();
|
||||
String nodeManagerVersion = request.getNMVersion();
|
||||
|
||||
if (!rmContext.isWorkPreservingRecoveryEnabled()) {
|
||||
if (!request.getNMContainerStatuses().isEmpty()) {
|
||||
LOG.info("received container statuses on node manager register :"
|
||||
+ request.getNMContainerStatuses());
|
||||
for (NMContainerStatus report : request.getNMContainerStatuses()) {
|
||||
handleNMContainerStatus(report);
|
||||
for (NMContainerStatus status : request.getNMContainerStatuses()) {
|
||||
handleNMContainerStatus(status);
|
||||
}
|
||||
}
|
||||
}
|
||||
RegisterNodeManagerResponse response = recordFactory
|
||||
|
@ -308,7 +311,7 @@ public class ResourceTrackerService extends AbstractService implements
|
|||
RMNode oldNode = this.rmContext.getRMNodes().putIfAbsent(nodeId, rmNode);
|
||||
if (oldNode == null) {
|
||||
this.rmContext.getDispatcher().getEventHandler().handle(
|
||||
new RMNodeEvent(nodeId, RMNodeEventType.STARTED));
|
||||
new RMNodeStartedEvent(nodeId, request.getNMContainerStatuses()));
|
||||
} else {
|
||||
LOG.info("Reconnect from the node at: " + host);
|
||||
this.nmLivelinessMonitor.unregister(nodeId);
|
||||
|
|
|
@ -723,29 +723,36 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|||
}
|
||||
}
|
||||
|
||||
// synchronously recover attempt to ensure any incoming external events
|
||||
// to be processed after the attempt processes the recover event.
|
||||
private void recoverAppAttempts() {
|
||||
for (RMAppAttempt attempt : getAppAttempts().values()) {
|
||||
attempt.handle(new RMAppAttemptEvent(attempt.getAppAttemptId(),
|
||||
RMAppAttemptEventType.RECOVER));
|
||||
}
|
||||
}
|
||||
|
||||
private static final class RMAppRecoveredTransition implements
|
||||
MultipleArcTransition<RMAppImpl, RMAppEvent, RMAppState> {
|
||||
|
||||
@Override
|
||||
public RMAppState transition(RMAppImpl app, RMAppEvent event) {
|
||||
|
||||
for (RMAppAttempt attempt : app.getAppAttempts().values()) {
|
||||
// synchronously recover attempt to ensure any incoming external events
|
||||
// to be processed after the attempt processes the recover event.
|
||||
attempt.handle(
|
||||
new RMAppAttemptEvent(attempt.getAppAttemptId(),
|
||||
RMAppAttemptEventType.RECOVER));
|
||||
}
|
||||
|
||||
// The app has completed.
|
||||
if (app.recoveredFinalState != null) {
|
||||
app.recoverAppAttempts();
|
||||
new FinalTransition(app.recoveredFinalState).transition(app, event);
|
||||
return app.recoveredFinalState;
|
||||
}
|
||||
|
||||
// Last attempt is in final state, do not add to scheduler and just return
|
||||
// ACCEPTED waiting for last RMAppAttempt to send finished or failed event
|
||||
// back.
|
||||
// Notify scheduler about the app on recovery
|
||||
new AddApplicationToSchedulerTransition().transition(app, event);
|
||||
|
||||
// recover attempts
|
||||
app.recoverAppAttempts();
|
||||
|
||||
// Last attempt is in final state, return ACCEPTED waiting for last
|
||||
// RMAppAttempt to send finished or failed event back.
|
||||
if (app.currentAttempt != null
|
||||
&& (app.currentAttempt.getState() == RMAppAttemptState.KILLED
|
||||
|| app.currentAttempt.getState() == RMAppAttemptState.FINISHED
|
||||
|
@ -754,9 +761,6 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|||
return RMAppState.ACCEPTED;
|
||||
}
|
||||
|
||||
// Notify scheduler about the app on recovery
|
||||
new AddApplicationToSchedulerTransition().transition(app, event);
|
||||
|
||||
// No existent attempts means the attempt associated with this app was not
|
||||
// started or started but not yet saved.
|
||||
if (app.attempts.isEmpty()) {
|
||||
|
@ -1055,8 +1059,12 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|||
if (app.finishTime == 0 ) {
|
||||
app.finishTime = System.currentTimeMillis();
|
||||
}
|
||||
// Recovered apps that are completed were not added to scheduler, so no
|
||||
// need to remove them from scheduler.
|
||||
if (app.recoveredFinalState == null) {
|
||||
app.handler.handle(new AppRemovedSchedulerEvent(app.applicationId,
|
||||
finalState));
|
||||
}
|
||||
app.handler.handle(
|
||||
new RMAppManagerEvent(app.applicationId,
|
||||
RMAppManagerEventType.APP_COMPLETED));
|
||||
|
|
|
@ -267,15 +267,17 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|||
.addTransition(RMAppAttemptState.ALLOCATED, RMAppAttemptState.FINAL_SAVING,
|
||||
RMAppAttemptEventType.CONTAINER_FINISHED,
|
||||
new FinalSavingTransition(
|
||||
new AMContainerCrashedTransition(), RMAppAttemptState.FAILED))
|
||||
new AMContainerCrashedBeforeRunningTransition(), RMAppAttemptState.FAILED))
|
||||
|
||||
// Transitions from LAUNCHED State
|
||||
.addTransition(RMAppAttemptState.LAUNCHED, RMAppAttemptState.RUNNING,
|
||||
RMAppAttemptEventType.REGISTERED, new AMRegisteredTransition())
|
||||
.addTransition(RMAppAttemptState.LAUNCHED, RMAppAttemptState.FINAL_SAVING,
|
||||
.addTransition(RMAppAttemptState.LAUNCHED,
|
||||
EnumSet.of(RMAppAttemptState.LAUNCHED, RMAppAttemptState.FINAL_SAVING),
|
||||
RMAppAttemptEventType.CONTAINER_FINISHED,
|
||||
new FinalSavingTransition(
|
||||
new AMContainerCrashedTransition(), RMAppAttemptState.FAILED))
|
||||
new ContainerFinishedTransition(
|
||||
new AMContainerCrashedBeforeRunningTransition(),
|
||||
RMAppAttemptState.LAUNCHED))
|
||||
.addTransition(
|
||||
RMAppAttemptState.LAUNCHED, RMAppAttemptState.FINAL_SAVING,
|
||||
RMAppAttemptEventType.EXPIRE,
|
||||
|
@ -302,7 +304,9 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|||
RMAppAttemptState.RUNNING,
|
||||
EnumSet.of(RMAppAttemptState.RUNNING, RMAppAttemptState.FINAL_SAVING),
|
||||
RMAppAttemptEventType.CONTAINER_FINISHED,
|
||||
new ContainerFinishedTransition())
|
||||
new ContainerFinishedTransition(
|
||||
new AMContainerCrashedAtRunningTransition(),
|
||||
RMAppAttemptState.RUNNING))
|
||||
.addTransition(
|
||||
RMAppAttemptState.RUNNING, RMAppAttemptState.FINAL_SAVING,
|
||||
RMAppAttemptEventType.EXPIRE,
|
||||
|
@ -904,6 +908,12 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|||
}
|
||||
return appAttempt.recoveredFinalState;
|
||||
} else {
|
||||
// Add the current attempt to the scheduler.
|
||||
if (appAttempt.rmContext.isWorkPreservingRecoveryEnabled()) {
|
||||
appAttempt.eventHandler.handle(new AppAttemptAddedSchedulerEvent(
|
||||
appAttempt.getAppAttemptId(), false));
|
||||
}
|
||||
|
||||
/*
|
||||
* Since the application attempt's final state is not saved that means
|
||||
* for AM container (previous attempt) state must be one of these.
|
||||
|
@ -1207,17 +1217,16 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|||
}
|
||||
}
|
||||
|
||||
private static final class AMContainerCrashedTransition extends
|
||||
private static final class AMContainerCrashedBeforeRunningTransition extends
|
||||
BaseFinalTransition {
|
||||
|
||||
public AMContainerCrashedTransition() {
|
||||
public AMContainerCrashedBeforeRunningTransition() {
|
||||
super(RMAppAttemptState.FAILED);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void transition(RMAppAttemptImpl appAttempt,
|
||||
RMAppAttemptEvent event) {
|
||||
|
||||
RMAppAttemptContainerFinishedEvent finishEvent =
|
||||
((RMAppAttemptContainerFinishedEvent)event);
|
||||
|
||||
|
@ -1410,6 +1419,16 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|||
implements
|
||||
MultipleArcTransition<RMAppAttemptImpl, RMAppAttemptEvent, RMAppAttemptState> {
|
||||
|
||||
// The transition To Do after attempt final state is saved.
|
||||
private BaseTransition transitionToDo;
|
||||
private RMAppAttemptState currentState;
|
||||
|
||||
public ContainerFinishedTransition(BaseTransition transitionToDo,
|
||||
RMAppAttemptState currentState) {
|
||||
this.transitionToDo = transitionToDo;
|
||||
this.currentState = currentState;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
|
||||
RMAppAttemptEvent event) {
|
||||
|
@ -1426,14 +1445,13 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|||
containerStatus.getContainerId())) {
|
||||
// Remember the follow up transition and save the final attempt state.
|
||||
appAttempt.rememberTargetTransitionsAndStoreState(event,
|
||||
new ContainerFinishedFinalStateSavedTransition(),
|
||||
RMAppAttemptState.FAILED, RMAppAttemptState.FAILED);
|
||||
transitionToDo, RMAppAttemptState.FAILED, RMAppAttemptState.FAILED);
|
||||
return RMAppAttemptState.FINAL_SAVING;
|
||||
}
|
||||
|
||||
// Normal container.Put it in completedcontainers list
|
||||
appAttempt.justFinishedContainers.add(containerStatus);
|
||||
return RMAppAttemptState.RUNNING;
|
||||
return this.currentState;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1451,7 +1469,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|||
}
|
||||
}
|
||||
|
||||
private static class ContainerFinishedFinalStateSavedTransition extends
|
||||
private static class AMContainerCrashedAtRunningTransition extends
|
||||
BaseTransition {
|
||||
@Override
|
||||
public void
|
||||
|
|
|
@ -33,5 +33,7 @@ public enum RMContainerEventType {
|
|||
RELEASED,
|
||||
|
||||
// Source: ContainerAllocationExpirer
|
||||
EXPIRE
|
||||
EXPIRE,
|
||||
|
||||
RECOVER
|
||||
}
|
||||
|
|
|
@ -35,12 +35,14 @@ import org.apache.hadoop.yarn.api.records.NodeId;
|
|||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAcquiredEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
|
||||
import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
|
||||
import org.apache.hadoop.yarn.state.MultipleArcTransition;
|
||||
import org.apache.hadoop.yarn.state.SingleArcTransition;
|
||||
import org.apache.hadoop.yarn.state.StateMachine;
|
||||
import org.apache.hadoop.yarn.state.StateMachineFactory;
|
||||
|
@ -65,6 +67,9 @@ public class RMContainerImpl implements RMContainer {
|
|||
RMContainerEventType.KILL)
|
||||
.addTransition(RMContainerState.NEW, RMContainerState.RESERVED,
|
||||
RMContainerEventType.RESERVED, new ContainerReservedTransition())
|
||||
.addTransition(RMContainerState.NEW,
|
||||
EnumSet.of(RMContainerState.RUNNING, RMContainerState.COMPLETED),
|
||||
RMContainerEventType.RECOVER, new ContainerRecoveredTransition())
|
||||
|
||||
// Transitions from RESERVED state
|
||||
.addTransition(RMContainerState.RESERVED, RMContainerState.RESERVED,
|
||||
|
@ -341,6 +346,38 @@ public class RMContainerImpl implements RMContainer {
|
|||
}
|
||||
}
|
||||
|
||||
private static final class ContainerRecoveredTransition
|
||||
implements
|
||||
MultipleArcTransition<RMContainerImpl, RMContainerEvent, RMContainerState> {
|
||||
@Override
|
||||
public RMContainerState transition(RMContainerImpl container,
|
||||
RMContainerEvent event) {
|
||||
NMContainerStatus report =
|
||||
((RMContainerRecoverEvent) event).getContainerReport();
|
||||
if (report.getContainerState().equals(ContainerState.COMPLETE)) {
|
||||
ContainerStatus status =
|
||||
ContainerStatus.newInstance(report.getContainerId(),
|
||||
report.getContainerState(), report.getDiagnostics(),
|
||||
report.getContainerExitStatus());
|
||||
|
||||
new FinishedTransition().transition(container,
|
||||
new RMContainerFinishedEvent(container.containerId, status,
|
||||
RMContainerEventType.FINISHED));
|
||||
return RMContainerState.COMPLETED;
|
||||
} else if (report.getContainerState().equals(ContainerState.RUNNING)) {
|
||||
// Tell the appAttempt
|
||||
container.eventHandler.handle(new RMAppAttemptContainerAcquiredEvent(
|
||||
container.getApplicationAttemptId(), container.getContainer()));
|
||||
return RMContainerState.RUNNING;
|
||||
} else {
|
||||
// This can never happen.
|
||||
LOG.warn("RMContainer received unexpected recover event with container"
|
||||
+ " state " + report.getContainerState() + " while recovering.");
|
||||
return RMContainerState.RUNNING;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static final class ContainerReservedTransition extends
|
||||
BaseTransition {
|
||||
|
||||
|
@ -398,7 +435,6 @@ public class RMContainerImpl implements RMContainer {
|
|||
// Inform AppAttempt
|
||||
container.eventHandler.handle(new RMAppAttemptContainerFinishedEvent(
|
||||
container.appAttemptId, finishedEvent.getRemoteContainerStatus()));
|
||||
|
||||
container.rmContext.getRMApplicationHistoryWriter()
|
||||
.containerFinished(container);
|
||||
}
|
||||
|
|
|
@ -0,0 +1,37 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
|
||||
|
||||
public class RMContainerRecoverEvent extends RMContainerEvent {
|
||||
|
||||
private final NMContainerStatus containerReport;
|
||||
|
||||
public RMContainerRecoverEvent(ContainerId containerId,
|
||||
NMContainerStatus containerReport) {
|
||||
super(containerId, RMContainerEventType.RECOVER);
|
||||
this.containerReport = containerReport;
|
||||
}
|
||||
|
||||
public NMContainerStatus getContainerReport() {
|
||||
return containerReport;
|
||||
}
|
||||
}
|
|
@ -48,6 +48,7 @@ import org.apache.hadoop.yarn.api.records.ResourceOption;
|
|||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
|
||||
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ClusterMetrics;
|
||||
|
@ -460,12 +461,8 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
|
|||
@Override
|
||||
public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
|
||||
// Inform the scheduler
|
||||
|
||||
rmNode.context.getDispatcher().getEventHandler().handle(
|
||||
new NodeAddedSchedulerEvent(rmNode));
|
||||
rmNode.context.getDispatcher().getEventHandler().handle(
|
||||
new NodesListManagerEvent(
|
||||
NodesListManagerEventType.NODE_USABLE, rmNode));
|
||||
RMNodeStartedEvent startEvent = (RMNodeStartedEvent) event;
|
||||
List<NMContainerStatus> containers = null;
|
||||
|
||||
String host = rmNode.nodeId.getHost();
|
||||
if (rmNode.context.getInactiveRMNodes().containsKey(host)) {
|
||||
|
@ -476,7 +473,14 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
|
|||
} else {
|
||||
// Increment activeNodes explicitly because this is a new node.
|
||||
ClusterMetrics.getMetrics().incrNumActiveNodes();
|
||||
containers = startEvent.getContainerRecoveryReports();
|
||||
}
|
||||
|
||||
rmNode.context.getDispatcher().getEventHandler()
|
||||
.handle(new NodeAddedSchedulerEvent(rmNode, containers));
|
||||
rmNode.context.getDispatcher().getEventHandler().handle(
|
||||
new NodesListManagerEvent(
|
||||
NodesListManagerEventType.NODE_USABLE, rmNode));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -513,7 +517,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
|
|||
}
|
||||
rmNode.context.getRMNodes().put(newNode.getNodeID(), newNode);
|
||||
rmNode.context.getDispatcher().getEventHandler().handle(
|
||||
new RMNodeEvent(newNode.getNodeID(), RMNodeEventType.STARTED));
|
||||
new RMNodeStartedEvent(newNode.getNodeID(), null));
|
||||
}
|
||||
rmNode.context.getDispatcher().getEventHandler().handle(
|
||||
new NodesListManagerEvent(
|
||||
|
|
|
@ -0,0 +1,38 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.rmnode;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
|
||||
|
||||
public class RMNodeStartedEvent extends RMNodeEvent {
|
||||
|
||||
private List<NMContainerStatus> containerReports;
|
||||
|
||||
public RMNodeStartedEvent(NodeId nodeId, List<NMContainerStatus> containerReports) {
|
||||
super(nodeId, RMNodeEventType.STARTED);
|
||||
this.containerReports = containerReports;
|
||||
}
|
||||
|
||||
public List<NMContainerStatus> getContainerRecoveryReports() {
|
||||
return this.containerReports;
|
||||
}
|
||||
}
|
|
@ -32,14 +32,21 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|||
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerRecoverEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public abstract class AbstractYarnScheduler
|
||||
<T extends SchedulerApplicationAttempt, N extends SchedulerNode>
|
||||
extends AbstractService implements ResourceScheduler {
|
||||
|
@ -47,8 +54,7 @@ public abstract class AbstractYarnScheduler
|
|||
private static final Log LOG = LogFactory.getLog(AbstractYarnScheduler.class);
|
||||
|
||||
// Nodes in the cluster, indexed by NodeId
|
||||
protected Map<NodeId, N> nodes =
|
||||
new ConcurrentHashMap<NodeId, N>();
|
||||
protected Map<NodeId, N> nodes = new ConcurrentHashMap<NodeId, N>();
|
||||
|
||||
// Whole capacity of the cluster
|
||||
protected Resource clusterResource = Resource.newInstance(0, 0);
|
||||
|
@ -58,6 +64,7 @@ public abstract class AbstractYarnScheduler
|
|||
|
||||
protected RMContext rmContext;
|
||||
protected Map<ApplicationId, SchedulerApplication<T>> applications;
|
||||
|
||||
protected final static List<Container> EMPTY_CONTAINER_LIST =
|
||||
new ArrayList<Container>();
|
||||
protected static final Allocation EMPTY_ALLOCATION = new Allocation(
|
||||
|
@ -169,4 +176,90 @@ public abstract class AbstractYarnScheduler
|
|||
throw new YarnException(getClass().getSimpleName()
|
||||
+ " does not support moving apps between queues");
|
||||
}
|
||||
|
||||
private void killOrphanContainerOnNode(RMNode node,
|
||||
NMContainerStatus container) {
|
||||
if (!container.getContainerState().equals(ContainerState.COMPLETE)) {
|
||||
this.rmContext.getDispatcher().getEventHandler().handle(
|
||||
new RMNodeCleanContainerEvent(node.getNodeID(),
|
||||
container.getContainerId()));
|
||||
}
|
||||
}
|
||||
|
||||
public synchronized void recoverContainersOnNode(
|
||||
List<NMContainerStatus> containerReports, RMNode nm) {
|
||||
if (!rmContext.isWorkPreservingRecoveryEnabled()
|
||||
|| containerReports == null
|
||||
|| (containerReports != null && containerReports.isEmpty())) {
|
||||
return;
|
||||
}
|
||||
|
||||
for (NMContainerStatus container : containerReports) {
|
||||
ApplicationId appId =
|
||||
container.getContainerId().getApplicationAttemptId().getApplicationId();
|
||||
RMApp rmApp = rmContext.getRMApps().get(appId);
|
||||
if (rmApp == null) {
|
||||
LOG.error("Skip recovering container " + container
|
||||
+ " for unknown application.");
|
||||
killOrphanContainerOnNode(nm, container);
|
||||
continue;
|
||||
}
|
||||
|
||||
// Unmanaged AM recovery is addressed in YARN-1815
|
||||
if (rmApp.getApplicationSubmissionContext().getUnmanagedAM()) {
|
||||
LOG.info("Skip recovering container " + container + " for unmanaged AM."
|
||||
+ rmApp.getApplicationId());
|
||||
killOrphanContainerOnNode(nm, container);
|
||||
continue;
|
||||
}
|
||||
|
||||
SchedulerApplication<T> schedulerApp = applications.get(appId);
|
||||
if (schedulerApp == null) {
|
||||
LOG.info("Skip recovering container " + container
|
||||
+ " for unknown SchedulerApplication. Application current state is "
|
||||
+ rmApp.getState());
|
||||
killOrphanContainerOnNode(nm, container);
|
||||
continue;
|
||||
}
|
||||
|
||||
LOG.info("Recovering container " + container);
|
||||
SchedulerApplicationAttempt schedulerAttempt =
|
||||
schedulerApp.getCurrentAppAttempt();
|
||||
|
||||
// create container
|
||||
RMContainer rmContainer = recoverAndCreateContainer(container, nm);
|
||||
|
||||
// recover RMContainer
|
||||
rmContainer.handle(new RMContainerRecoverEvent(container.getContainerId(),
|
||||
container));
|
||||
|
||||
// recover scheduler node
|
||||
nodes.get(nm.getNodeID()).recoverContainer(rmContainer);
|
||||
|
||||
// recover queue: update headroom etc.
|
||||
Queue queue = schedulerAttempt.getQueue();
|
||||
queue.recoverContainer(clusterResource, schedulerAttempt, rmContainer);
|
||||
|
||||
// recover scheduler attempt
|
||||
schedulerAttempt.recoverContainer(rmContainer);
|
||||
}
|
||||
}
|
||||
|
||||
private RMContainer recoverAndCreateContainer(NMContainerStatus report,
|
||||
RMNode node) {
|
||||
Container container =
|
||||
Container.newInstance(report.getContainerId(), node.getNodeID(),
|
||||
node.getHttpAddress(), report.getAllocatedResource(),
|
||||
report.getPriority(), null);
|
||||
ApplicationAttemptId attemptId =
|
||||
container.getId().getApplicationAttemptId();
|
||||
RMContainer rmContainer =
|
||||
new RMContainerImpl(container, attemptId, node.getNodeID(),
|
||||
applications.get(attemptId.getApplicationId()).getUser(), rmContext);
|
||||
return rmContainer;
|
||||
}
|
||||
|
||||
public SchedulerNode getSchedulerNode(NodeId nodeId) {
|
||||
return nodes.get(nodeId);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -39,6 +39,8 @@ import org.apache.hadoop.yarn.api.records.Priority;
|
|||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
|
||||
/**
|
||||
|
@ -409,4 +411,25 @@ public class AppSchedulingInfo {
|
|||
// this.requests = appInfo.getRequests();
|
||||
this.blacklist = appInfo.getBlackList();
|
||||
}
|
||||
|
||||
public synchronized void recoverContainer(RMContainer rmContainer) {
|
||||
// ContainerIdCounter on recovery will be addressed in YARN-2052
|
||||
this.containerIdCounter.incrementAndGet();
|
||||
|
||||
QueueMetrics metrics = queue.getMetrics();
|
||||
if (pending) {
|
||||
// If there was any container to recover, the application was
|
||||
// running from scheduler's POV.
|
||||
pending = false;
|
||||
metrics.runAppAttempt(applicationId, user);
|
||||
}
|
||||
|
||||
// Container is completed. Skip recovering resources.
|
||||
if (rmContainer.getState().equals(RMContainerState.COMPLETED)) {
|
||||
return;
|
||||
}
|
||||
|
||||
metrics.allocateResources(user, 1, rmContainer.getAllocatedResource(),
|
||||
false);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -26,6 +26,8 @@ import org.apache.hadoop.security.UserGroupInformation;
|
|||
import org.apache.hadoop.yarn.api.records.QueueACL;
|
||||
import org.apache.hadoop.yarn.api.records.QueueInfo;
|
||||
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||
|
||||
@Evolving
|
||||
@LimitedPrivate("yarn")
|
||||
|
@ -60,4 +62,13 @@ public interface Queue {
|
|||
boolean hasAccess(QueueACL acl, UserGroupInformation user);
|
||||
|
||||
public ActiveUsersManager getActiveUsersManager();
|
||||
|
||||
/**
|
||||
* Recover the state of the queue for a given container.
|
||||
* @param clusterResource the resource of the cluster
|
||||
* @param schedulerAttempt the application for which the container was allocated
|
||||
* @param rmContainer the container that was recovered.
|
||||
*/
|
||||
public void recoverContainer(Resource clusterResource,
|
||||
SchedulerApplicationAttempt schedulerAttempt, RMContainer rmContainer);
|
||||
}
|
||||
|
|
|
@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEven
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerReservedEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
|
||||
|
@ -535,4 +536,23 @@ public class SchedulerApplicationAttempt {
|
|||
appSchedulingInfo.move(newQueue);
|
||||
this.queue = newQueue;
|
||||
}
|
||||
|
||||
public synchronized void recoverContainer(RMContainer rmContainer) {
|
||||
// recover app scheduling info
|
||||
appSchedulingInfo.recoverContainer(rmContainer);
|
||||
|
||||
if (rmContainer.getState().equals(RMContainerState.COMPLETED)) {
|
||||
return;
|
||||
}
|
||||
LOG.info("SchedulerAttempt " + getApplicationAttemptId()
|
||||
+ " is recovering container " + rmContainer.getContainerId());
|
||||
liveContainers.put(rmContainer.getContainerId(), rmContainer);
|
||||
Resources.addTo(currentConsumption, rmContainer.getContainer()
|
||||
.getResource());
|
||||
// resourceLimit: updated when LeafQueue#recoverContainer#allocateResource
|
||||
// is called.
|
||||
// newlyAllocatedContainers.add(rmContainer);
|
||||
// schedulingOpportunities
|
||||
// lastScheduledContainer
|
||||
}
|
||||
}
|
||||
|
|
|
@ -27,7 +27,6 @@ import org.apache.commons.logging.Log;
|
|||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
|
@ -35,10 +34,10 @@ import org.apache.hadoop.yarn.api.records.Priority;
|
|||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
|
||||
/**
|
||||
* Represents a YARN Cluster Node from the viewpoint of the scheduler.
|
||||
|
@ -119,13 +118,10 @@ public abstract class SchedulerNode {
|
|||
* The Scheduler has allocated containers on this node to the given
|
||||
* application.
|
||||
*
|
||||
* @param applicationId
|
||||
* application
|
||||
* @param rmContainer
|
||||
* allocated container
|
||||
*/
|
||||
public synchronized void allocateContainer(ApplicationId applicationId,
|
||||
RMContainer rmContainer) {
|
||||
public synchronized void allocateContainer(RMContainer rmContainer) {
|
||||
Container container = rmContainer.getContainer();
|
||||
deductAvailableResource(container.getResource());
|
||||
++numContainers;
|
||||
|
@ -166,8 +162,8 @@ public abstract class SchedulerNode {
|
|||
return this.totalResourceCapability;
|
||||
}
|
||||
|
||||
private synchronized boolean isValidContainer(Container c) {
|
||||
if (launchedContainers.containsKey(c.getId())) {
|
||||
public synchronized boolean isValidContainer(ContainerId containerId) {
|
||||
if (launchedContainers.containsKey(containerId)) {
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
|
@ -185,7 +181,7 @@ public abstract class SchedulerNode {
|
|||
* container to be released
|
||||
*/
|
||||
public synchronized void releaseContainer(Container container) {
|
||||
if (!isValidContainer(container)) {
|
||||
if (!isValidContainer(container.getId())) {
|
||||
LOG.error("Invalid container released " + container);
|
||||
return;
|
||||
}
|
||||
|
@ -274,4 +270,12 @@ public abstract class SchedulerNode {
|
|||
// we can only adjust available resource if total resource is changed.
|
||||
Resources.addTo(this.availableResource, deltaResource);
|
||||
}
|
||||
|
||||
|
||||
public synchronized void recoverContainer(RMContainer rmContainer) {
|
||||
if (rmContainer.getState().equals(RMContainerState.COMPLETED)) {
|
||||
return;
|
||||
}
|
||||
allocateContainer(rmContainer);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -28,7 +28,6 @@ import org.apache.hadoop.security.AccessControlException;
|
|||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
import org.apache.hadoop.yarn.api.records.QueueACL;
|
||||
import org.apache.hadoop.yarn.api.records.QueueState;
|
||||
|
@ -234,15 +233,6 @@ extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue {
|
|||
*/
|
||||
public ActiveUsersManager getActiveUsersManager();
|
||||
|
||||
/**
|
||||
* Recover the state of the queue
|
||||
* @param clusterResource the resource of the cluster
|
||||
* @param application the application for which the container was allocated
|
||||
* @param container the container that was recovered.
|
||||
*/
|
||||
public void recoverContainer(Resource clusterResource, FiCaSchedulerApp application,
|
||||
Container container);
|
||||
|
||||
/**
|
||||
* Adds all applications in the queue and its subqueues to the given collection.
|
||||
* @param apps the collection to add the applications to
|
||||
|
|
|
@ -872,6 +872,8 @@ public class CapacityScheduler extends
|
|||
{
|
||||
NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent)event;
|
||||
addNode(nodeAddedEvent.getAddedRMNode());
|
||||
recoverContainersOnNode(nodeAddedEvent.getContainerReports(),
|
||||
nodeAddedEvent.getAddedRMNode());
|
||||
}
|
||||
break;
|
||||
case NODE_REMOVED:
|
||||
|
|
|
@ -59,15 +59,17 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManage
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.server.utils.Lock;
|
||||
import org.apache.hadoop.yarn.server.utils.Lock.NoLock;
|
||||
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
public class LeafQueue implements CSQueue {
|
||||
|
@ -564,7 +566,8 @@ public class LeafQueue implements CSQueue {
|
|||
"numContainers=" + getNumContainers();
|
||||
}
|
||||
|
||||
private synchronized User getUser(String userName) {
|
||||
@VisibleForTesting
|
||||
public synchronized User getUser(String userName) {
|
||||
User user = users.get(userName);
|
||||
if (user == null) {
|
||||
user = new User();
|
||||
|
@ -1346,8 +1349,7 @@ public class LeafQueue implements CSQueue {
|
|||
}
|
||||
|
||||
// Inform the node
|
||||
node.allocateContainer(application.getApplicationId(),
|
||||
allocatedContainer);
|
||||
node.allocateContainer(allocatedContainer);
|
||||
|
||||
LOG.info("assignedContainer" +
|
||||
" application attempt=" + application.getApplicationAttemptId() +
|
||||
|
@ -1446,7 +1448,7 @@ public class LeafQueue implements CSQueue {
|
|||
}
|
||||
|
||||
synchronized void allocateResource(Resource clusterResource,
|
||||
FiCaSchedulerApp application, Resource resource) {
|
||||
SchedulerApplicationAttempt application, Resource resource) {
|
||||
// Update queue metrics
|
||||
Resources.addTo(usedResources, resource);
|
||||
CSQueueUtils.updateQueueStatistics(
|
||||
|
@ -1530,7 +1532,8 @@ public class LeafQueue implements CSQueue {
|
|||
return metrics;
|
||||
}
|
||||
|
||||
static class User {
|
||||
@VisibleForTesting
|
||||
public static class User {
|
||||
Resource consumed = Resources.createResource(0, 0);
|
||||
int pendingApplications = 0;
|
||||
int activeApplications = 0;
|
||||
|
@ -1580,13 +1583,16 @@ public class LeafQueue implements CSQueue {
|
|||
|
||||
@Override
|
||||
public void recoverContainer(Resource clusterResource,
|
||||
FiCaSchedulerApp application, Container container) {
|
||||
SchedulerApplicationAttempt attempt, RMContainer rmContainer) {
|
||||
if (rmContainer.getState().equals(RMContainerState.COMPLETED)) {
|
||||
return;
|
||||
}
|
||||
// Careful! Locking order is important!
|
||||
synchronized (this) {
|
||||
allocateResource(clusterResource, application, container.getResource());
|
||||
allocateResource(clusterResource, attempt, rmContainer.getContainer()
|
||||
.getResource());
|
||||
}
|
||||
getParent().recoverContainer(clusterResource, application, container);
|
||||
|
||||
getParent().recoverContainer(clusterResource, attempt, rmContainer);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1613,5 +1619,4 @@ public class LeafQueue implements CSQueue {
|
|||
apps.add(app.getApplicationAttemptId());
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -38,7 +38,6 @@ import org.apache.hadoop.security.UserGroupInformation;
|
|||
import org.apache.hadoop.security.authorize.AccessControlList;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
import org.apache.hadoop.yarn.api.records.QueueACL;
|
||||
import org.apache.hadoop.yarn.api.records.QueueInfo;
|
||||
|
@ -49,9 +48,11 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
|
|||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
|
||||
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
||||
|
@ -770,13 +771,16 @@ public class ParentQueue implements CSQueue {
|
|||
|
||||
@Override
|
||||
public void recoverContainer(Resource clusterResource,
|
||||
FiCaSchedulerApp application, Container container) {
|
||||
SchedulerApplicationAttempt attempt, RMContainer rmContainer) {
|
||||
if (rmContainer.getState().equals(RMContainerState.COMPLETED)) {
|
||||
return;
|
||||
}
|
||||
// Careful! Locking order is important!
|
||||
synchronized (this) {
|
||||
allocateResource(clusterResource, container.getResource());
|
||||
allocateResource(clusterResource,rmContainer.getContainer().getResource());
|
||||
}
|
||||
if (parent != null) {
|
||||
parent.recoverContainer(clusterResource, application, container);
|
||||
parent.recoverContainer(clusterResource, attempt, rmContainer);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -24,6 +24,7 @@ import org.apache.commons.logging.LogFactory;
|
|||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
|
||||
|
|
|
@ -18,19 +18,34 @@
|
|||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||
|
||||
public class NodeAddedSchedulerEvent extends SchedulerEvent {
|
||||
|
||||
private final RMNode rmNode;
|
||||
private final List<NMContainerStatus> containerReports;
|
||||
|
||||
public NodeAddedSchedulerEvent(RMNode rmNode) {
|
||||
super(SchedulerEventType.NODE_ADDED);
|
||||
this.rmNode = rmNode;
|
||||
this.containerReports = null;
|
||||
}
|
||||
|
||||
public NodeAddedSchedulerEvent(RMNode rmNode,
|
||||
List<NMContainerStatus> containerReports) {
|
||||
super(SchedulerEventType.NODE_ADDED);
|
||||
this.rmNode = rmNode;
|
||||
this.containerReports = containerReports;
|
||||
}
|
||||
|
||||
public RMNode getAddedRMNode() {
|
||||
return rmNode;
|
||||
}
|
||||
|
||||
public List<NMContainerStatus> getContainerReports() {
|
||||
return containerReports;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -264,8 +264,7 @@ public class AppSchedulable extends Schedulable {
|
|||
}
|
||||
|
||||
// Inform the node
|
||||
node.allocateContainer(app.getApplicationId(),
|
||||
allocatedContainer);
|
||||
node.allocateContainer(allocatedContainer);
|
||||
|
||||
// If this container is used to run AM, update the leaf queue's AM usage
|
||||
if (app.getLiveContainers().size() == 1 &&
|
||||
|
|
|
@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
|
||||
@Private
|
||||
|
@ -318,4 +319,10 @@ public class FSLeafQueue extends FSQueue {
|
|||
Resources.addTo(amResourceUsage, amResource);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void recoverContainer(Resource clusterResource,
|
||||
SchedulerApplicationAttempt schedulerAttempt, RMContainer rmContainer) {
|
||||
// TODO Auto-generated method stub
|
||||
}
|
||||
}
|
||||
|
|
|
@ -35,7 +35,9 @@ import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
|
|||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
|
@ -228,4 +230,11 @@ public class FSParentQueue extends FSQueue {
|
|||
// Should never be called since all applications are submitted to LeafQueues
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void recoverContainer(Resource clusterResource,
|
||||
SchedulerApplicationAttempt schedulerAttempt, RMContainer rmContainer) {
|
||||
// TODO Auto-generated method stub
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -76,6 +76,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.ContainersAndNMTokensAllocation;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
||||
|
@ -178,6 +179,17 @@ public class FifoScheduler extends
|
|||
public ActiveUsersManager getActiveUsersManager() {
|
||||
return activeUsersManager;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void recoverContainer(Resource clusterResource,
|
||||
SchedulerApplicationAttempt schedulerAttempt, RMContainer rmContainer) {
|
||||
if (rmContainer.getState().equals(RMContainerState.COMPLETED)) {
|
||||
return;
|
||||
}
|
||||
increaseUsedResources(rmContainer);
|
||||
updateAppHeadRoom(schedulerAttempt);
|
||||
updateAvailableResourcesMetrics();
|
||||
}
|
||||
};
|
||||
|
||||
public FifoScheduler() {
|
||||
|
@ -488,7 +500,7 @@ public class FifoScheduler extends
|
|||
if (attempt == null) {
|
||||
continue;
|
||||
}
|
||||
attempt.setHeadroom(Resources.subtract(clusterResource, usedResource));
|
||||
updateAppHeadRoom(attempt);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -659,11 +671,10 @@ public class FifoScheduler extends
|
|||
application.allocate(type, node, priority, request, container);
|
||||
|
||||
// Inform the node
|
||||
node.allocateContainer(application.getApplicationId(),
|
||||
rmContainer);
|
||||
node.allocateContainer(rmContainer);
|
||||
|
||||
// Update usage for this container
|
||||
Resources.addTo(usedResource, capability);
|
||||
increaseUsedResources(rmContainer);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -708,8 +719,21 @@ public class FifoScheduler extends
|
|||
+ node.getAvailableResource());
|
||||
}
|
||||
|
||||
metrics.setAvailableResourcesToQueue(
|
||||
Resources.subtract(clusterResource, usedResource));
|
||||
updateAvailableResourcesMetrics();
|
||||
}
|
||||
|
||||
private void increaseUsedResources(RMContainer rmContainer) {
|
||||
Resources.addTo(usedResource, rmContainer.getAllocatedResource());
|
||||
}
|
||||
|
||||
private void updateAppHeadRoom(SchedulerApplicationAttempt schedulerAttempt) {
|
||||
schedulerAttempt.setHeadroom(Resources.subtract(clusterResource,
|
||||
usedResource));
|
||||
}
|
||||
|
||||
private void updateAvailableResourcesMetrics() {
|
||||
metrics.setAvailableResourcesToQueue(Resources.subtract(clusterResource,
|
||||
usedResource));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -719,6 +743,9 @@ public class FifoScheduler extends
|
|||
{
|
||||
NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent)event;
|
||||
addNode(nodeAddedEvent.getAddedRMNode());
|
||||
recoverContainersOnNode(nodeAddedEvent.getContainerReports(),
|
||||
nodeAddedEvent.getAddedRMNode());
|
||||
|
||||
}
|
||||
break;
|
||||
case NODE_REMOVED:
|
||||
|
@ -923,4 +950,8 @@ public class FifoScheduler extends
|
|||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
public Resource getUsedResource() {
|
||||
return usedResource;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,10 +21,9 @@ package org.apache.hadoop.yarn.server.resourcemanager;
|
|||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.security.PrivilegedAction;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.junit.Assert;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.io.DataOutputBuffer;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
|
@ -47,12 +46,14 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
|||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.NodeState;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
|
||||
|
@ -69,6 +70,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStartedEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
||||
|
@ -76,6 +81,7 @@ import org.apache.hadoop.yarn.util.Records;
|
|||
import org.apache.log4j.Level;
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.junit.Assert;
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public class MockRM extends ResourceManager {
|
||||
|
@ -144,11 +150,26 @@ public class MockRM extends ResourceManager {
|
|||
}
|
||||
}
|
||||
|
||||
public void waitForContainerToComplete(RMAppAttempt attempt,
|
||||
NMContainerStatus completedContainer) throws InterruptedException {
|
||||
while (true) {
|
||||
List<ContainerStatus> containers = attempt.getJustFinishedContainers();
|
||||
System.out.println("Received completed containers " + containers);
|
||||
for (ContainerStatus container : containers) {
|
||||
if (container.getContainerId().equals(
|
||||
completedContainer.getContainerId())) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
Thread.sleep(200);
|
||||
}
|
||||
}
|
||||
|
||||
public void waitForState(MockNM nm, ContainerId containerId,
|
||||
RMContainerState containerState) throws Exception {
|
||||
RMContainer container = getResourceScheduler().getRMContainer(containerId);
|
||||
int timeoutSecs = 0;
|
||||
while(container == null && timeoutSecs++ < 20) {
|
||||
while(container == null && timeoutSecs++ < 100) {
|
||||
nm.nodeHeartbeat(true);
|
||||
container = getResourceScheduler().getRMContainer(containerId);
|
||||
System.out.println("Waiting for container " + containerId + " to be allocated.");
|
||||
|
@ -333,7 +354,7 @@ public class MockRM extends ResourceManager {
|
|||
public void sendNodeStarted(MockNM nm) throws Exception {
|
||||
RMNodeImpl node = (RMNodeImpl) getRMContext().getRMNodes().get(
|
||||
nm.getNodeId());
|
||||
node.handle(new RMNodeEvent(nm.getNodeId(), RMNodeEventType.STARTED));
|
||||
node.handle(new RMNodeStartedEvent(nm.getNodeId(), null));
|
||||
}
|
||||
|
||||
public void sendNodeLost(MockNM nm) throws Exception {
|
||||
|
@ -542,4 +563,12 @@ public class MockRM extends ResourceManager {
|
|||
.newInstance(appId));
|
||||
return response.getApplicationReport();
|
||||
}
|
||||
|
||||
// Explicitly reset queue metrics for testing.
|
||||
@SuppressWarnings("static-access")
|
||||
public void clearQueueMetrics(RMApp app) {
|
||||
((AbstractYarnScheduler<SchedulerApplicationAttempt, SchedulerNode>) getResourceScheduler())
|
||||
.getSchedulerApplications().get(app.getApplicationId()).getQueue()
|
||||
.getMetrics().clearQueueMetrics();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,16 +18,16 @@
|
|||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
import org.junit.Assert;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
|
@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.api.records.ContainerState;
|
|||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||
|
@ -56,11 +57,10 @@ import org.apache.hadoop.yarn.util.resource.Resources;
|
|||
import org.apache.log4j.Level;
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.junit.Assert;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
public class TestFifoScheduler {
|
||||
private static final Log LOG = LogFactory.getLog(TestFifoScheduler.class);
|
||||
|
@ -298,7 +298,10 @@ public class TestFifoScheduler {
|
|||
FifoScheduler fs = new FifoScheduler();
|
||||
fs.init(conf);
|
||||
fs.start();
|
||||
// mock rmContext to avoid NPE.
|
||||
RMContext context = mock(RMContext.class);
|
||||
fs.reinitialize(conf, null);
|
||||
fs.setRMContext(context);
|
||||
|
||||
RMNode n1 =
|
||||
MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1, "127.0.0.2");
|
||||
|
|
|
@ -43,10 +43,11 @@ import org.junit.Test;
|
|||
public class TestMoveApplication {
|
||||
private ResourceManager resourceManager = null;
|
||||
private static boolean failMove;
|
||||
private Configuration conf;
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
Configuration conf = new YarnConfiguration();
|
||||
conf = new YarnConfiguration();
|
||||
conf.setClass(YarnConfiguration.RM_SCHEDULER, FifoSchedulerWithMove.class,
|
||||
FifoSchedulerWithMove.class);
|
||||
conf.set(YarnConfiguration.YARN_ADMIN_ACL, " ");
|
||||
|
@ -119,26 +120,21 @@ public class TestMoveApplication {
|
|||
}
|
||||
}
|
||||
|
||||
@Test (timeout = 5000)
|
||||
public void testMoveSuccessful() throws Exception {
|
||||
// Submit application
|
||||
Application application = new Application("user1", resourceManager);
|
||||
ApplicationId appId = application.getApplicationId();
|
||||
application.submit();
|
||||
|
||||
// Wait for app to be accepted
|
||||
RMApp app = resourceManager.rmContext.getRMApps().get(appId);
|
||||
while (app.getState() != RMAppState.ACCEPTED) {
|
||||
Thread.sleep(100);
|
||||
}
|
||||
|
||||
ClientRMService clientRMService = resourceManager.getClientRMService();
|
||||
@Test (timeout = 10000)
|
||||
public
|
||||
void testMoveSuccessful() throws Exception {
|
||||
MockRM rm1 = new MockRM(conf);
|
||||
rm1.start();
|
||||
RMApp app = rm1.submitApp(1024);
|
||||
ClientRMService clientRMService = rm1.getClientRMService();
|
||||
// FIFO scheduler does not support moves
|
||||
clientRMService.moveApplicationAcrossQueues(
|
||||
MoveApplicationAcrossQueuesRequest.newInstance(appId, "newqueue"));
|
||||
clientRMService
|
||||
.moveApplicationAcrossQueues(MoveApplicationAcrossQueuesRequest
|
||||
.newInstance(app.getApplicationId(), "newqueue"));
|
||||
|
||||
RMApp rmApp = resourceManager.getRMContext().getRMApps().get(appId);
|
||||
RMApp rmApp = rm1.getRMContext().getRMApps().get(app.getApplicationId());
|
||||
assertEquals("newqueue", rmApp.getQueue());
|
||||
rm1.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -21,15 +21,14 @@ import static org.mockito.Matchers.any;
|
|||
import static org.mockito.Mockito.doAnswer;
|
||||
import static org.mockito.Mockito.doReturn;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.*;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
import org.junit.Assert;
|
||||
|
||||
import org.apache.hadoop.util.HostsFileReader;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
|
@ -49,6 +48,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeReconnectEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStartedEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
||||
|
@ -61,6 +61,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRen
|
|||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
|
@ -160,7 +161,7 @@ public class TestRMNodeTransitions {
|
|||
@Test (timeout = 5000)
|
||||
public void testExpiredContainer() {
|
||||
// Start the node
|
||||
node.handle(new RMNodeEvent(null, RMNodeEventType.STARTED));
|
||||
node.handle(new RMNodeStartedEvent(null, null));
|
||||
verify(scheduler).handle(any(NodeAddedSchedulerEvent.class));
|
||||
|
||||
// Expire a container
|
||||
|
@ -188,11 +189,11 @@ public class TestRMNodeTransitions {
|
|||
@Test (timeout = 5000)
|
||||
public void testContainerUpdate() throws InterruptedException{
|
||||
//Start the node
|
||||
node.handle(new RMNodeEvent(null,RMNodeEventType.STARTED));
|
||||
node.handle(new RMNodeStartedEvent(null, null));
|
||||
|
||||
NodeId nodeId = BuilderUtils.newNodeId("localhost:1", 1);
|
||||
RMNodeImpl node2 = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, null, null);
|
||||
node2.handle(new RMNodeEvent(null,RMNodeEventType.STARTED));
|
||||
node2.handle(new RMNodeStartedEvent(null, null));
|
||||
|
||||
ContainerId completedContainerIdFromNode1 = BuilderUtils.newContainerId(
|
||||
BuilderUtils.newApplicationAttemptId(
|
||||
|
@ -248,7 +249,7 @@ public class TestRMNodeTransitions {
|
|||
@Test (timeout = 5000)
|
||||
public void testStatusChange(){
|
||||
//Start the node
|
||||
node.handle(new RMNodeEvent(null,RMNodeEventType.STARTED));
|
||||
node.handle(new RMNodeStartedEvent(null, null));
|
||||
//Add info to the queue first
|
||||
node.setNextHeartBeat(false);
|
||||
|
||||
|
@ -464,7 +465,7 @@ public class TestRMNodeTransitions {
|
|||
RMNodeImpl node = new RMNodeImpl(nodeId, rmContext,null, 0, 0,
|
||||
null, ResourceOption.newInstance(capability,
|
||||
RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT), nmVersion);
|
||||
node.handle(new RMNodeEvent(node.getNodeID(), RMNodeEventType.STARTED));
|
||||
node.handle(new RMNodeStartedEvent(node.getNodeID(), null));
|
||||
Assert.assertEquals(NodeState.RUNNING, node.getState());
|
||||
return node;
|
||||
}
|
||||
|
@ -495,7 +496,7 @@ public class TestRMNodeTransitions {
|
|||
int initialUnhealthy = cm.getUnhealthyNMs();
|
||||
int initialDecommissioned = cm.getNumDecommisionedNMs();
|
||||
int initialRebooted = cm.getNumRebootedNMs();
|
||||
node.handle(new RMNodeEvent(node.getNodeID(), RMNodeEventType.STARTED));
|
||||
node.handle(new RMNodeStartedEvent(node.getNodeID(), null));
|
||||
Assert.assertEquals("Active Nodes", initialActive + 1, cm.getNumActiveNMs());
|
||||
Assert.assertEquals("Lost Nodes", initialLost, cm.getNumLostNMs());
|
||||
Assert.assertEquals("Unhealthy Nodes",
|
||||
|
|
|
@ -0,0 +1,570 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
|
||||
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
|
||||
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
import org.apache.log4j.Level;
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
|
||||
@SuppressWarnings({"rawtypes", "unchecked"})
|
||||
@RunWith(value = Parameterized.class)
|
||||
public class TestWorkPreservingRMRestart {
|
||||
|
||||
private YarnConfiguration conf;
|
||||
private Class<?> schedulerClass;
|
||||
MockRM rm1 = null;
|
||||
MockRM rm2 = null;
|
||||
|
||||
@Before
|
||||
public void setup() throws UnknownHostException {
|
||||
Logger rootLogger = LogManager.getRootLogger();
|
||||
rootLogger.setLevel(Level.DEBUG);
|
||||
conf = new YarnConfiguration();
|
||||
UserGroupInformation.setConfiguration(conf);
|
||||
conf.set(YarnConfiguration.RECOVERY_ENABLED, "true");
|
||||
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
|
||||
conf.setClass(YarnConfiguration.RM_SCHEDULER, schedulerClass,
|
||||
ResourceScheduler.class);
|
||||
conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, true);
|
||||
DefaultMetricsSystem.setMiniClusterMode(true);
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() {
|
||||
if (rm1 != null) {
|
||||
rm1.stop();
|
||||
}
|
||||
if (rm2 != null) {
|
||||
rm2.stop();
|
||||
}
|
||||
}
|
||||
|
||||
@Parameterized.Parameters
|
||||
public static Collection<Object[]> getTestParameters() {
|
||||
return Arrays.asList(new Object[][] { { CapacityScheduler.class },
|
||||
{ FifoScheduler.class } });
|
||||
}
|
||||
|
||||
public TestWorkPreservingRMRestart(Class<?> schedulerClass) {
|
||||
this.schedulerClass = schedulerClass;
|
||||
}
|
||||
|
||||
// Test common scheduler state including SchedulerAttempt, SchedulerNode,
|
||||
// AppSchedulingInfo can be reconstructed via the container recovery reports
|
||||
// on NM re-registration.
|
||||
// Also test scheduler specific changes: i.e. Queue recovery-
|
||||
// CSQueue/FSQueue/FifoQueue recovery respectively.
|
||||
// Test Strategy: send 3 container recovery reports(AMContainer, running
|
||||
// container, completed container) on NM re-registration, check the states of
|
||||
// SchedulerAttempt, SchedulerNode etc. are updated accordingly.
|
||||
@Test(timeout = 20000)
|
||||
public void testSchedulerRecovery() throws Exception {
|
||||
conf.setBoolean(CapacitySchedulerConfiguration.ENABLE_USER_METRICS, true);
|
||||
conf.set(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS,
|
||||
DominantResourceCalculator.class.getName());
|
||||
|
||||
int containerMemory = 1024;
|
||||
Resource containerResource = Resource.newInstance(containerMemory, 1);
|
||||
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||
memStore.init(conf);
|
||||
rm1 = new MockRM(conf, memStore);
|
||||
rm1.start();
|
||||
MockNM nm1 =
|
||||
new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService());
|
||||
nm1.registerNode();
|
||||
RMApp app1 = rm1.submitApp(200);
|
||||
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
||||
|
||||
// clear queue metrics
|
||||
rm1.clearQueueMetrics(app1);
|
||||
|
||||
// Re-start RM
|
||||
rm2 = new MockRM(conf, memStore);
|
||||
rm2.start();
|
||||
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
|
||||
// recover app
|
||||
RMApp recoveredApp1 =
|
||||
rm2.getRMContext().getRMApps().get(app1.getApplicationId());
|
||||
RMAppAttempt loadedAttempt1 = recoveredApp1.getCurrentAppAttempt();
|
||||
NMContainerStatus amContainer =
|
||||
TestRMRestart.createNMContainerStatus(am1.getApplicationAttemptId(), 1,
|
||||
ContainerState.RUNNING);
|
||||
NMContainerStatus runningContainer =
|
||||
TestRMRestart.createNMContainerStatus(am1.getApplicationAttemptId(), 2,
|
||||
ContainerState.RUNNING);
|
||||
NMContainerStatus completedContainer =
|
||||
TestRMRestart.createNMContainerStatus(am1.getApplicationAttemptId(), 3,
|
||||
ContainerState.COMPLETE);
|
||||
|
||||
nm1.registerNode(Arrays.asList(amContainer, runningContainer,
|
||||
completedContainer));
|
||||
|
||||
// Wait for RM to settle down on recovering containers;
|
||||
waitForNumContainersToRecover(2, rm2, am1.getApplicationAttemptId());
|
||||
|
||||
// check RMContainers are re-recreated and the container state is correct.
|
||||
rm2.waitForState(nm1, amContainer.getContainerId(),
|
||||
RMContainerState.RUNNING);
|
||||
rm2.waitForState(nm1, runningContainer.getContainerId(),
|
||||
RMContainerState.RUNNING);
|
||||
rm2.waitForContainerToComplete(loadedAttempt1, completedContainer);
|
||||
|
||||
AbstractYarnScheduler scheduler =
|
||||
(AbstractYarnScheduler) rm2.getResourceScheduler();
|
||||
SchedulerNode schedulerNode1 = scheduler.getSchedulerNode(nm1.getNodeId());
|
||||
|
||||
// ********* check scheduler node state.*******
|
||||
// 2 running containers.
|
||||
Resource usedResources = Resources.multiply(containerResource, 2);
|
||||
Resource nmResource =
|
||||
Resource.newInstance(nm1.getMemory(), nm1.getvCores());
|
||||
|
||||
assertTrue(schedulerNode1.isValidContainer(amContainer.getContainerId()));
|
||||
assertTrue(schedulerNode1.isValidContainer(runningContainer
|
||||
.getContainerId()));
|
||||
assertFalse(schedulerNode1.isValidContainer(completedContainer
|
||||
.getContainerId()));
|
||||
// 2 launched containers, 1 completed container
|
||||
assertEquals(2, schedulerNode1.getNumContainers());
|
||||
|
||||
assertEquals(Resources.subtract(nmResource, usedResources),
|
||||
schedulerNode1.getAvailableResource());
|
||||
assertEquals(usedResources, schedulerNode1.getUsedResource());
|
||||
Resource availableResources = Resources.subtract(nmResource, usedResources);
|
||||
|
||||
// ***** check queue state based on the underlying scheduler ********
|
||||
Map<ApplicationId, SchedulerApplication> schedulerApps =
|
||||
((AbstractYarnScheduler) rm2.getResourceScheduler())
|
||||
.getSchedulerApplications();
|
||||
SchedulerApplication schedulerApp =
|
||||
schedulerApps.get(recoveredApp1.getApplicationId());
|
||||
|
||||
if (schedulerClass.equals(CapacityScheduler.class)) {
|
||||
checkCSQueue(rm2, schedulerApp, nmResource, nmResource, usedResources, 2);
|
||||
} else if (schedulerClass.equals(FifoScheduler.class)) {
|
||||
checkFifoQueue(schedulerApp, usedResources, availableResources);
|
||||
}
|
||||
|
||||
// *********** check scheduler attempt state.********
|
||||
SchedulerApplicationAttempt schedulerAttempt =
|
||||
schedulerApp.getCurrentAppAttempt();
|
||||
assertTrue(schedulerAttempt.getLiveContainers().contains(
|
||||
scheduler.getRMContainer(amContainer.getContainerId())));
|
||||
assertTrue(schedulerAttempt.getLiveContainers().contains(
|
||||
scheduler.getRMContainer(runningContainer.getContainerId())));
|
||||
assertEquals(schedulerAttempt.getCurrentConsumption(), usedResources);
|
||||
assertEquals(availableResources, schedulerAttempt.getHeadroom());
|
||||
|
||||
// *********** check appSchedulingInfo state ***********
|
||||
assertEquals(4, schedulerAttempt.getNewContainerId());
|
||||
}
|
||||
|
||||
private void checkCSQueue(MockRM rm,
|
||||
SchedulerApplication<SchedulerApplicationAttempt> app,
|
||||
Resource clusterResource, Resource queueResource, Resource usedResource,
|
||||
int numContainers)
|
||||
throws Exception {
|
||||
checkCSLeafQueue(rm2, app, clusterResource, queueResource, usedResource,
|
||||
numContainers);
|
||||
|
||||
LeafQueue queue = (LeafQueue) app.getQueue();
|
||||
Resource availableResources = Resources.subtract(queueResource, usedResource);
|
||||
// ************* check Queue metrics ************
|
||||
QueueMetrics queueMetrics = queue.getMetrics();
|
||||
asserteMetrics(queueMetrics, 1, 0, 1, 0, 2, availableResources.getMemory(),
|
||||
availableResources.getVirtualCores(), usedResource.getMemory(),
|
||||
usedResource.getVirtualCores());
|
||||
|
||||
// ************ check user metrics ***********
|
||||
QueueMetrics userMetrics =
|
||||
queueMetrics.getUserMetrics(app.getUser());
|
||||
asserteMetrics(userMetrics, 1, 0, 1, 0, 2, availableResources.getMemory(),
|
||||
availableResources.getVirtualCores(), usedResource.getMemory(),
|
||||
usedResource.getVirtualCores());
|
||||
}
|
||||
|
||||
private void checkCSLeafQueue(MockRM rm,
|
||||
SchedulerApplication<SchedulerApplicationAttempt> app,
|
||||
Resource clusterResource, Resource queueResource, Resource usedResource,
|
||||
int numContainers) {
|
||||
LeafQueue leafQueue = (LeafQueue) app.getQueue();
|
||||
// assert queue used resources.
|
||||
assertEquals(usedResource, leafQueue.getUsedResources());
|
||||
assertEquals(numContainers, leafQueue.getNumContainers());
|
||||
|
||||
ResourceCalculator calc =
|
||||
((CapacityScheduler) rm.getResourceScheduler()).getResourceCalculator();
|
||||
float usedCapacity =
|
||||
Resources.divide(calc, clusterResource, usedResource, queueResource);
|
||||
// assert queue used capacity
|
||||
assertEquals(usedCapacity, leafQueue.getUsedCapacity(), 1e-8);
|
||||
float absoluteUsedCapacity =
|
||||
Resources.divide(calc, clusterResource, usedResource, clusterResource);
|
||||
// assert queue absolute capacity
|
||||
assertEquals(absoluteUsedCapacity, leafQueue.getAbsoluteUsedCapacity(),
|
||||
1e-8);
|
||||
// assert user consumed resources.
|
||||
assertEquals(usedResource, leafQueue.getUser(app.getUser())
|
||||
.getConsumedResources());
|
||||
}
|
||||
|
||||
private void checkFifoQueue(SchedulerApplication schedulerApp,
|
||||
Resource usedResources, Resource availableResources) throws Exception {
|
||||
FifoScheduler scheduler = (FifoScheduler) rm2.getResourceScheduler();
|
||||
// ************ check cluster used Resources ********
|
||||
assertEquals(usedResources, scheduler.getUsedResource());
|
||||
|
||||
// ************ check app headroom ****************
|
||||
SchedulerApplicationAttempt schedulerAttempt =
|
||||
schedulerApp.getCurrentAppAttempt();
|
||||
assertEquals(availableResources, schedulerAttempt.getHeadroom());
|
||||
|
||||
// ************ check queue metrics ****************
|
||||
QueueMetrics queueMetrics = scheduler.getRootQueueMetrics();
|
||||
asserteMetrics(queueMetrics, 1, 0, 1, 0, 2, availableResources.getMemory(),
|
||||
availableResources.getVirtualCores(), usedResources.getMemory(),
|
||||
usedResources.getVirtualCores());
|
||||
}
|
||||
|
||||
// create 3 container reports for AM
|
||||
public static List<NMContainerStatus>
|
||||
createNMContainerStatusForApp(MockAM am) {
|
||||
List<NMContainerStatus> list =
|
||||
new ArrayList<NMContainerStatus>();
|
||||
NMContainerStatus amContainer =
|
||||
TestRMRestart.createNMContainerStatus(am.getApplicationAttemptId(), 1,
|
||||
ContainerState.RUNNING);
|
||||
NMContainerStatus runningContainer =
|
||||
TestRMRestart.createNMContainerStatus(am.getApplicationAttemptId(), 2,
|
||||
ContainerState.RUNNING);
|
||||
NMContainerStatus completedContainer =
|
||||
TestRMRestart.createNMContainerStatus(am.getApplicationAttemptId(), 3,
|
||||
ContainerState.COMPLETE);
|
||||
list.add(amContainer);
|
||||
list.add(runningContainer);
|
||||
list.add(completedContainer);
|
||||
return list;
|
||||
}
|
||||
|
||||
private static final String R = "Default";
|
||||
private static final String A = "QueueA";
|
||||
private static final String B = "QueueB";
|
||||
private static final String USER_1 = "user1";
|
||||
private static final String USER_2 = "user2";
|
||||
|
||||
private void setupQueueConfiguration(CapacitySchedulerConfiguration conf) {
|
||||
conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] { R });
|
||||
final String Q_R = CapacitySchedulerConfiguration.ROOT + "." + R;
|
||||
conf.setCapacity(Q_R, 100);
|
||||
final String Q_A = Q_R + "." + A;
|
||||
final String Q_B = Q_R + "." + B;
|
||||
conf.setQueues(Q_R, new String[] {A, B});
|
||||
conf.setCapacity(Q_A, 50);
|
||||
conf.setCapacity(Q_B, 50);
|
||||
conf.setDouble(CapacitySchedulerConfiguration
|
||||
.MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT, 0.5f);
|
||||
}
|
||||
|
||||
// Test CS recovery with multi-level queues and multi-users:
|
||||
// 1. setup 2 NMs each with 8GB memory;
|
||||
// 2. setup 2 level queues: Default -> (QueueA, QueueB)
|
||||
// 3. User1 submits 2 apps on QueueA
|
||||
// 4. User2 submits 1 app on QueueB
|
||||
// 5. AM and each container has 1GB memory
|
||||
// 6. Restart RM.
|
||||
// 7. nm1 re-syncs back containers belong to user1
|
||||
// 8. nm2 re-syncs back containers belong to user2.
|
||||
// 9. Assert the parent queue and 2 leaf queues state and the metrics.
|
||||
// 10. Assert each user's consumption inside the queue.
|
||||
@Test (timeout = 30000)
|
||||
public void testCapacitySchedulerRecovery() throws Exception {
|
||||
if (!schedulerClass.equals(CapacityScheduler.class)) {
|
||||
return;
|
||||
}
|
||||
conf.setBoolean(CapacitySchedulerConfiguration.ENABLE_USER_METRICS, true);
|
||||
conf.set(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS,
|
||||
DominantResourceCalculator.class.getName());
|
||||
CapacitySchedulerConfiguration csConf =
|
||||
new CapacitySchedulerConfiguration(conf);
|
||||
setupQueueConfiguration(csConf);
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||
memStore.init(csConf);
|
||||
rm1 = new MockRM(csConf, memStore);
|
||||
rm1.start();
|
||||
MockNM nm1 =
|
||||
new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService());
|
||||
MockNM nm2 =
|
||||
new MockNM("127.1.1.1:4321", 8192, rm1.getResourceTrackerService());
|
||||
nm1.registerNode();
|
||||
nm2.registerNode();
|
||||
RMApp app1_1 = rm1.submitApp(1024, "app1_1", USER_1, null, A);
|
||||
MockAM am1_1 = MockRM.launchAndRegisterAM(app1_1, rm1, nm1);
|
||||
RMApp app1_2 = rm1.submitApp(1024, "app1_2", USER_1, null, A);
|
||||
MockAM am1_2 = MockRM.launchAndRegisterAM(app1_2, rm1, nm2);
|
||||
|
||||
RMApp app2 = rm1.submitApp(1024, "app2", USER_2, null, B);
|
||||
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
|
||||
|
||||
// clear queue metrics
|
||||
rm1.clearQueueMetrics(app1_1);
|
||||
rm1.clearQueueMetrics(app1_2);
|
||||
rm1.clearQueueMetrics(app2);
|
||||
|
||||
// Re-start RM
|
||||
rm2 = new MockRM(csConf, memStore);
|
||||
rm2.start();
|
||||
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
|
||||
nm2.setResourceTrackerService(rm2.getResourceTrackerService());
|
||||
|
||||
List<NMContainerStatus> am1_1Containers =
|
||||
createNMContainerStatusForApp(am1_1);
|
||||
List<NMContainerStatus> am1_2Containers =
|
||||
createNMContainerStatusForApp(am1_2);
|
||||
am1_1Containers.addAll(am1_2Containers);
|
||||
nm1.registerNode(am1_1Containers);
|
||||
|
||||
List<NMContainerStatus> am2Containers =
|
||||
createNMContainerStatusForApp(am2);
|
||||
nm2.registerNode(am2Containers);
|
||||
|
||||
// Wait for RM to settle down on recovering containers;
|
||||
waitForNumContainersToRecover(2, rm2, am1_1.getApplicationAttemptId());
|
||||
waitForNumContainersToRecover(2, rm2, am1_2.getApplicationAttemptId());
|
||||
waitForNumContainersToRecover(2, rm2, am1_2.getApplicationAttemptId());
|
||||
|
||||
// Calculate each queue's resource usage.
|
||||
Resource containerResource = Resource.newInstance(1024, 1);
|
||||
Resource nmResource =
|
||||
Resource.newInstance(nm1.getMemory(), nm1.getvCores());
|
||||
Resource clusterResource = Resources.multiply(nmResource, 2);
|
||||
Resource q1Resource = Resources.multiply(clusterResource, 0.5);
|
||||
Resource q2Resource = Resources.multiply(clusterResource, 0.5);
|
||||
Resource q1UsedResource = Resources.multiply(containerResource, 4);
|
||||
Resource q2UsedResource = Resources.multiply(containerResource, 2);
|
||||
Resource totalUsedResource = Resources.add(q1UsedResource, q2UsedResource);
|
||||
Resource q1availableResources =
|
||||
Resources.subtract(q1Resource, q1UsedResource);
|
||||
Resource q2availableResources =
|
||||
Resources.subtract(q2Resource, q2UsedResource);
|
||||
Resource totalAvailableResource =
|
||||
Resources.add(q1availableResources, q2availableResources);
|
||||
|
||||
Map<ApplicationId, SchedulerApplication> schedulerApps =
|
||||
((AbstractYarnScheduler) rm2.getResourceScheduler())
|
||||
.getSchedulerApplications();
|
||||
SchedulerApplication schedulerApp1_1 =
|
||||
schedulerApps.get(app1_1.getApplicationId());
|
||||
|
||||
// assert queue A state.
|
||||
checkCSLeafQueue(rm2, schedulerApp1_1, clusterResource, q1Resource,
|
||||
q1UsedResource, 4);
|
||||
QueueMetrics queue1Metrics = schedulerApp1_1.getQueue().getMetrics();
|
||||
asserteMetrics(queue1Metrics, 2, 0, 2, 0, 4,
|
||||
q1availableResources.getMemory(), q1availableResources.getVirtualCores(),
|
||||
q1UsedResource.getMemory(), q1UsedResource.getVirtualCores());
|
||||
|
||||
// assert queue B state.
|
||||
SchedulerApplication schedulerApp2 =
|
||||
schedulerApps.get(app2.getApplicationId());
|
||||
checkCSLeafQueue(rm2, schedulerApp2, clusterResource, q2Resource,
|
||||
q2UsedResource, 2);
|
||||
QueueMetrics queue2Metrics = schedulerApp2.getQueue().getMetrics();
|
||||
asserteMetrics(queue2Metrics, 1, 0, 1, 0, 2,
|
||||
q2availableResources.getMemory(), q2availableResources.getVirtualCores(),
|
||||
q2UsedResource.getMemory(), q2UsedResource.getVirtualCores());
|
||||
|
||||
// assert parent queue state.
|
||||
LeafQueue leafQueue = (LeafQueue) schedulerApp2.getQueue();
|
||||
ParentQueue parentQueue = (ParentQueue) leafQueue.getParent();
|
||||
checkParentQueue(parentQueue, 6, totalUsedResource, (float) 6 / 16,
|
||||
(float) 6 / 16);
|
||||
asserteMetrics(parentQueue.getMetrics(), 3, 0, 3, 0, 6,
|
||||
totalAvailableResource.getMemory(),
|
||||
totalAvailableResource.getVirtualCores(), totalUsedResource.getMemory(),
|
||||
totalUsedResource.getVirtualCores());
|
||||
}
|
||||
|
||||
private void checkParentQueue(ParentQueue parentQueue, int numContainers,
|
||||
Resource usedResource, float UsedCapacity, float absoluteUsedCapacity) {
|
||||
assertEquals(numContainers, parentQueue.getNumContainers());
|
||||
assertEquals(usedResource, parentQueue.getUsedResources());
|
||||
assertEquals(UsedCapacity, parentQueue.getUsedCapacity(), 1e-8);
|
||||
assertEquals(absoluteUsedCapacity, parentQueue.getAbsoluteUsedCapacity(), 1e-8);
|
||||
}
|
||||
|
||||
// Test RM shuts down, in the meanwhile, AM fails. Restarted RM scheduler
|
||||
// should not recover the containers that belong to the failed AM.
|
||||
@Test(timeout = 20000)
|
||||
public void testAMfailedBetweenRMRestart() throws Exception {
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||
memStore.init(conf);
|
||||
rm1 = new MockRM(conf, memStore);
|
||||
rm1.start();
|
||||
MockNM nm1 =
|
||||
new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService());
|
||||
nm1.registerNode();
|
||||
RMApp app1 = rm1.submitApp(200);
|
||||
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
||||
|
||||
rm2 = new MockRM(conf, memStore);
|
||||
rm2.start();
|
||||
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
|
||||
|
||||
NMContainerStatus amContainer =
|
||||
TestRMRestart.createNMContainerStatus(am1.getApplicationAttemptId(), 1,
|
||||
ContainerState.COMPLETE);
|
||||
NMContainerStatus runningContainer =
|
||||
TestRMRestart.createNMContainerStatus(am1.getApplicationAttemptId(), 2,
|
||||
ContainerState.RUNNING);
|
||||
NMContainerStatus completedContainer =
|
||||
TestRMRestart.createNMContainerStatus(am1.getApplicationAttemptId(), 3,
|
||||
ContainerState.COMPLETE);
|
||||
nm1.registerNode(Arrays.asList(amContainer, runningContainer,
|
||||
completedContainer));
|
||||
rm2.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.FAILED);
|
||||
// Wait for RM to settle down on recovering containers;
|
||||
Thread.sleep(3000);
|
||||
|
||||
AbstractYarnScheduler scheduler =
|
||||
(AbstractYarnScheduler) rm2.getResourceScheduler();
|
||||
// Previous AM failed, The failed AM should once again release the
|
||||
// just-recovered containers.
|
||||
assertNull(scheduler.getRMContainer(runningContainer.getContainerId()));
|
||||
assertNull(scheduler.getRMContainer(completedContainer.getContainerId()));
|
||||
}
|
||||
|
||||
// Apps already completed before RM restart. Restarted RM scheduler should not
|
||||
// recover containers for completed apps.
|
||||
@Test(timeout = 20000)
|
||||
public void testContainersNotRecoveredForCompletedApps() throws Exception {
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||
memStore.init(conf);
|
||||
rm1 = new MockRM(conf, memStore);
|
||||
rm1.start();
|
||||
MockNM nm1 =
|
||||
new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService());
|
||||
nm1.registerNode();
|
||||
RMApp app1 = rm1.submitApp(200);
|
||||
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
||||
MockRM.finishAMAndVerifyAppState(app1, rm1, nm1, am1);
|
||||
|
||||
rm2 = new MockRM(conf, memStore);
|
||||
rm2.start();
|
||||
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
|
||||
NMContainerStatus runningContainer =
|
||||
TestRMRestart.createNMContainerStatus(am1.getApplicationAttemptId(), 2,
|
||||
ContainerState.RUNNING);
|
||||
NMContainerStatus completedContainer =
|
||||
TestRMRestart.createNMContainerStatus(am1.getApplicationAttemptId(), 3,
|
||||
ContainerState.COMPLETE);
|
||||
nm1.registerNode(Arrays.asList(runningContainer, completedContainer));
|
||||
RMApp recoveredApp1 =
|
||||
rm2.getRMContext().getRMApps().get(app1.getApplicationId());
|
||||
assertEquals(RMAppState.FINISHED, recoveredApp1.getState());
|
||||
|
||||
// Wait for RM to settle down on recovering containers;
|
||||
Thread.sleep(3000);
|
||||
|
||||
AbstractYarnScheduler scheduler =
|
||||
(AbstractYarnScheduler) rm2.getResourceScheduler();
|
||||
|
||||
// scheduler should not recover containers for finished apps.
|
||||
assertNull(scheduler.getRMContainer(runningContainer.getContainerId()));
|
||||
assertNull(scheduler.getRMContainer(completedContainer.getContainerId()));
|
||||
}
|
||||
|
||||
private void asserteMetrics(QueueMetrics qm, int appsSubmitted,
|
||||
int appsPending, int appsRunning, int appsCompleted,
|
||||
int allocatedContainers, int availableMB, int availableVirtualCores,
|
||||
int allocatedMB, int allocatedVirtualCores) {
|
||||
assertEquals(appsSubmitted, qm.getAppsSubmitted());
|
||||
assertEquals(appsPending, qm.getAppsPending());
|
||||
assertEquals(appsRunning, qm.getAppsRunning());
|
||||
assertEquals(appsCompleted, qm.getAppsCompleted());
|
||||
assertEquals(allocatedContainers, qm.getAllocatedContainers());
|
||||
assertEquals(availableMB, qm.getAvailableMB());
|
||||
assertEquals(availableVirtualCores, qm.getAvailableVirtualCores());
|
||||
assertEquals(allocatedMB, qm.getAllocatedMB());
|
||||
assertEquals(allocatedVirtualCores, qm.getAllocatedVirtualCores());
|
||||
}
|
||||
|
||||
private void waitForNumContainersToRecover(int num, MockRM rm,
|
||||
ApplicationAttemptId attemptId) throws Exception {
|
||||
AbstractYarnScheduler scheduler =
|
||||
(AbstractYarnScheduler) rm.getResourceScheduler();
|
||||
SchedulerApplicationAttempt attempt =
|
||||
scheduler.getApplicationAttempt(attemptId);
|
||||
while (attempt == null) {
|
||||
System.out.println("Wait for scheduler attempt " + attemptId
|
||||
+ " to be created");
|
||||
Thread.sleep(200);
|
||||
attempt = scheduler.getApplicationAttempt(attemptId);
|
||||
}
|
||||
while (attempt.getLiveContainers().size() < num) {
|
||||
System.out.println("Wait for " + num + " containers to recover.");
|
||||
Thread.sleep(200);
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue