Merge r1604949 from trunk. YARN-2191. Added a new test to ensure NM will clean up completed applications in the case of RM restart. Contributed by Wangda Tan
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1604950 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
fccacfb212
commit
5addc7d50e
|
@ -155,6 +155,9 @@ Release 2.5.0 - UNRELEASED
|
||||||
YARN-2159. Better logging in SchedulerNode#allocateContainer.
|
YARN-2159. Better logging in SchedulerNode#allocateContainer.
|
||||||
(Ray Chiang via kasha)
|
(Ray Chiang via kasha)
|
||||||
|
|
||||||
|
YARN-2191. Added a new test to ensure NM will clean up completed applications
|
||||||
|
in the case of RM restart. (Wangda Tan via jianhe)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
|
@ -28,16 +28,20 @@ import java.util.Map;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.Container;
|
import org.apache.hadoop.yarn.api.records.Container;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||||
import org.apache.hadoop.yarn.event.DrainDispatcher;
|
import org.apache.hadoop.yarn.event.DrainDispatcher;
|
||||||
import org.apache.hadoop.yarn.event.EventHandler;
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
|
@ -46,6 +50,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||||
|
import org.apache.http.client.params.AllClientPNames;
|
||||||
import org.apache.log4j.Level;
|
import org.apache.log4j.Level;
|
||||||
import org.apache.log4j.LogManager;
|
import org.apache.log4j.LogManager;
|
||||||
import org.apache.log4j.Logger;
|
import org.apache.log4j.Logger;
|
||||||
|
@ -303,7 +308,7 @@ public class TestApplicationCleanup {
|
||||||
|
|
||||||
@SuppressWarnings("resource")
|
@SuppressWarnings("resource")
|
||||||
@Test (timeout = 60000)
|
@Test (timeout = 60000)
|
||||||
public void testAppCleanupWhenRestartedAfterAppFinished() throws Exception {
|
public void testAppCleanupWhenRMRestartedAfterAppFinished() throws Exception {
|
||||||
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
|
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
|
||||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||||
memStore.init(conf);
|
memStore.init(conf);
|
||||||
|
@ -336,6 +341,65 @@ public class TestApplicationCleanup {
|
||||||
rm1.stop();
|
rm1.stop();
|
||||||
rm2.stop();
|
rm2.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("resource")
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testAppCleanupWhenRMRestartedBeforeAppFinished() throws Exception {
|
||||||
|
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
|
||||||
|
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||||
|
memStore.init(conf);
|
||||||
|
|
||||||
|
// start RM
|
||||||
|
MockRM rm1 = new MockRM(conf, memStore);
|
||||||
|
rm1.start();
|
||||||
|
MockNM nm1 =
|
||||||
|
new MockNM("127.0.0.1:1234", 1024, rm1.getResourceTrackerService());
|
||||||
|
nm1.registerNode();
|
||||||
|
MockNM nm2 =
|
||||||
|
new MockNM("127.0.0.1:5678", 1024, rm1.getResourceTrackerService());
|
||||||
|
nm2.registerNode();
|
||||||
|
|
||||||
|
// create app and launch the AM
|
||||||
|
RMApp app0 = rm1.submitApp(200);
|
||||||
|
MockAM am0 = launchAM(app0, rm1, nm1);
|
||||||
|
|
||||||
|
// alloc another container on nm2
|
||||||
|
AllocateResponse allocResponse =
|
||||||
|
am0.allocate(Arrays.asList(ResourceRequest.newInstance(
|
||||||
|
Priority.newInstance(1), "*", Resource.newInstance(1024, 0), 1)),
|
||||||
|
null);
|
||||||
|
while (null == allocResponse.getAllocatedContainers()
|
||||||
|
|| allocResponse.getAllocatedContainers().isEmpty()) {
|
||||||
|
nm2.nodeHeartbeat(true);
|
||||||
|
allocResponse = am0.allocate(null, null);
|
||||||
|
Thread.sleep(1000);
|
||||||
|
}
|
||||||
|
|
||||||
|
// start new RM
|
||||||
|
MockRM rm2 = new MockRM(conf, memStore);
|
||||||
|
rm2.start();
|
||||||
|
|
||||||
|
// nm1/nm2 register to rm2, and do a heartbeat
|
||||||
|
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
|
||||||
|
nm1.registerNode(Arrays.asList(NMContainerStatus.newInstance(
|
||||||
|
ContainerId.newInstance(am0.getApplicationAttemptId(), 1),
|
||||||
|
ContainerState.COMPLETE, Resource.newInstance(1024, 1), "", 0)), Arrays
|
||||||
|
.asList(app0.getApplicationId()));
|
||||||
|
nm2.setResourceTrackerService(rm2.getResourceTrackerService());
|
||||||
|
nm2.registerNode(Arrays.asList(app0.getApplicationId()));
|
||||||
|
|
||||||
|
// assert app state has been saved.
|
||||||
|
rm2.waitForState(app0.getApplicationId(), RMAppState.FAILED);
|
||||||
|
|
||||||
|
// wait for application cleanup message received on NM1
|
||||||
|
waitForAppCleanupMessageRecved(nm1, app0.getApplicationId());
|
||||||
|
|
||||||
|
// wait for application cleanup message received on NM2
|
||||||
|
waitForAppCleanupMessageRecved(nm2, app0.getApplicationId());
|
||||||
|
|
||||||
|
rm1.stop();
|
||||||
|
rm2.stop();
|
||||||
|
}
|
||||||
|
|
||||||
public static void main(String[] args) throws Exception {
|
public static void main(String[] args) throws Exception {
|
||||||
TestApplicationCleanup t = new TestApplicationCleanup();
|
TestApplicationCleanup t = new TestApplicationCleanup();
|
||||||
|
|
Loading…
Reference in New Issue