YARN-8232. RMContainer lost queue name when RM HA happens. (Hu Ziqian via wangda)

Change-Id: Ia21e1da6871570c993bbedde76ce32929e95970f
(cherry picked from commit 6b96a73bb0)
This commit is contained in:
Wangda Tan 2018-05-08 11:34:45 -07:00 committed by Jason Lowe
parent 113e2d6801
commit 777743beb6
2 changed files with 54 additions and 12 deletions

View File

@ -513,7 +513,8 @@ public abstract class AbstractYarnScheduler
} }
// create container // create container
RMContainer rmContainer = recoverAndCreateContainer(container, nm); RMContainer rmContainer = recoverAndCreateContainer(container, nm,
schedulerApp.getQueue().getQueueName());
// recover RMContainer // recover RMContainer
rmContainer.handle( rmContainer.handle(
@ -563,7 +564,7 @@ public abstract class AbstractYarnScheduler
} }
private RMContainer recoverAndCreateContainer(NMContainerStatus status, private RMContainer recoverAndCreateContainer(NMContainerStatus status,
RMNode node) { RMNode node, String queueName) {
Container container = Container container =
Container.newInstance(status.getContainerId(), node.getNodeID(), Container.newInstance(status.getContainerId(), node.getNodeID(),
node.getHttpAddress(), status.getAllocatedResource(), node.getHttpAddress(), status.getAllocatedResource(),
@ -576,6 +577,7 @@ public abstract class AbstractYarnScheduler
SchedulerRequestKey.extractFrom(container), attemptId, node.getNodeID(), SchedulerRequestKey.extractFrom(container), attemptId, node.getNodeID(),
applications.get(attemptId.getApplicationId()).getUser(), rmContext, applications.get(attemptId.getApplicationId()).getUser(), rmContext,
status.getCreationTime(), status.getNodeLabelExpression()); status.getCreationTime(), status.getNodeLabelExpression());
((RMContainerImpl) rmContainer).setQueueName(queueName);
return rmContainer; return rmContainer;
} }

View File

@ -30,22 +30,14 @@ import java.util.Map;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.Service; import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.*;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceOption;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM; import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
@ -59,6 +51,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService; import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
@ -874,4 +867,51 @@ public class TestAbstractYarnScheduler extends ParameterizedSchedulerTestBase {
rm.stop(); rm.stop();
} }
} }
@Test(timeout=60000)
public void testContainerRecoveredByNode() throws Exception {
System.out.println("Starting testContainerRecoveredByNode");
final int maxMemory = 10 * 1024;
YarnConfiguration conf = getConf();
conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
conf.setBoolean(
YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, true);
conf.set(
YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
MockRM rm1 = new MockRM(conf);
try {
rm1.start();
RMApp app1 =
rm1.submitApp(200, "name", "user",
new HashMap<ApplicationAccessType, String>(), false, "default",
-1, null, "Test", false, true);
MockNM nm1 =
new MockNM("127.0.0.1:1234", 10240, rm1.getResourceTrackerService());
nm1.registerNode();
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
am1.allocate("127.0.0.1", 8192, 1, new ArrayList<ContainerId>());
YarnScheduler scheduler = rm1.getResourceScheduler();
RMNode node1 = MockNodes.newNodeInfo(
0, Resources.createResource(maxMemory), 1, "127.0.0.2");
ContainerId containerId = ContainerId.newContainerId(
app1.getCurrentAppAttempt().getAppAttemptId(), 2);
NMContainerStatus containerReport =
NMContainerStatus.newInstance(containerId, 0, ContainerState.RUNNING,
Resource.newInstance(1024, 1), "recover container", 0,
Priority.newInstance(0), 0);
List<NMContainerStatus> containerReports = new ArrayList<>();
containerReports.add(containerReport);
scheduler.handle(new NodeAddedSchedulerEvent(node1, containerReports));
RMContainer rmContainer = scheduler.getRMContainer(containerId);
//verify queue name when rmContainer is recovered
Assert.assertEquals(app1.getQueue(), rmContainer.getQueueName());
} finally {
rm1.stop();
System.out.println("Stopping testContainerRecoveredByNode");
}
}
} }