YARN-7786. NullPointerException while launching ApplicationMaster. Contributed by lujie
(cherry picked from commit 766544c0b0
)
This commit is contained in:
parent
f8330d969d
commit
4bff2df137
|
@ -189,6 +189,10 @@ public class AMLauncher implements Runnable {
|
||||||
ContainerLaunchContext container =
|
ContainerLaunchContext container =
|
||||||
applicationMasterContext.getAMContainerSpec();
|
applicationMasterContext.getAMContainerSpec();
|
||||||
|
|
||||||
|
if (container == null){
|
||||||
|
throw new IOException(containerID +
|
||||||
|
" has been cleaned before launched");
|
||||||
|
}
|
||||||
// Finalize the container
|
// Finalize the container
|
||||||
setupTokens(container, containerID);
|
setupTokens(container, containerID);
|
||||||
// set the flow context optionally for timeline service v.2
|
// set the flow context optionally for timeline service v.2
|
||||||
|
@ -305,11 +309,7 @@ public class AMLauncher implements Runnable {
|
||||||
handler.handle(new RMAppAttemptEvent(application.getAppAttemptId(),
|
handler.handle(new RMAppAttemptEvent(application.getAppAttemptId(),
|
||||||
RMAppAttemptEventType.LAUNCHED));
|
RMAppAttemptEventType.LAUNCHED));
|
||||||
} catch(Exception ie) {
|
} catch(Exception ie) {
|
||||||
String message = "Error launching " + application.getAppAttemptId()
|
onAMLaunchFailed(masterContainer.getId(), ie);
|
||||||
+ ". Got exception: " + StringUtils.stringifyException(ie);
|
|
||||||
LOG.info(message);
|
|
||||||
handler.handle(new RMAppAttemptEvent(application
|
|
||||||
.getAppAttemptId(), RMAppAttemptEventType.LAUNCH_FAILED, message));
|
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case CLEANUP:
|
case CLEANUP:
|
||||||
|
@ -344,4 +344,13 @@ public class AMLauncher implements Runnable {
|
||||||
throw (IOException) t;
|
throw (IOException) t;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
protected void onAMLaunchFailed(ContainerId containerId, Exception ie) {
|
||||||
|
String message = "Error launching " + application.getAppAttemptId()
|
||||||
|
+ ". Got exception: " + StringUtils.stringifyException(ie);
|
||||||
|
LOG.info(message);
|
||||||
|
handler.handle(new RMAppAttemptEvent(application
|
||||||
|
.getAppAttemptId(), RMAppAttemptEventType.LAUNCH_FAILED, message));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,12 +24,14 @@ import java.nio.ByteBuffer;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.TimeoutException;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.io.DataOutputBuffer;
|
import org.apache.hadoop.io.DataOutputBuffer;
|
||||||
import org.apache.hadoop.security.Credentials;
|
import org.apache.hadoop.security.Credentials;
|
||||||
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.apache.hadoop.yarn.api.ApplicationConstants;
|
import org.apache.hadoop.yarn.api.ApplicationConstants;
|
||||||
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
|
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||||
|
@ -73,6 +75,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncher;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
|
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
|
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||||
import org.apache.hadoop.yarn.server.utils.AMRMClientUtils;
|
import org.apache.hadoop.yarn.server.utils.AMRMClientUtils;
|
||||||
|
@ -83,6 +86,9 @@ import org.apache.log4j.Logger;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import com.google.common.base.Supplier;
|
||||||
|
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
import static org.mockito.Matchers.any;
|
import static org.mockito.Matchers.any;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
@ -216,10 +222,14 @@ public class TestApplicationMasterLauncher {
|
||||||
// kick the scheduling
|
// kick the scheduling
|
||||||
nm1.nodeHeartbeat(true);
|
nm1.nodeHeartbeat(true);
|
||||||
|
|
||||||
int waitCount = 0;
|
try {
|
||||||
while (containerManager.launched == false && waitCount++ < 20) {
|
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||||
LOG.info("Waiting for AM Launch to happen..");
|
@Override public Boolean get() {
|
||||||
Thread.sleep(1000);
|
return containerManager.launched;
|
||||||
|
}
|
||||||
|
}, 100, 200 * 100);
|
||||||
|
} catch (TimeoutException e) {
|
||||||
|
fail("timed out while waiting for AM Launch to happen.");
|
||||||
}
|
}
|
||||||
Assert.assertTrue(containerManager.launched);
|
Assert.assertTrue(containerManager.launched);
|
||||||
|
|
||||||
|
@ -246,10 +256,14 @@ public class TestApplicationMasterLauncher {
|
||||||
nm1.nodeHeartbeat(attempt.getAppAttemptId(), 1, ContainerState.COMPLETE);
|
nm1.nodeHeartbeat(attempt.getAppAttemptId(), 1, ContainerState.COMPLETE);
|
||||||
rm.waitForState(am.getApplicationAttemptId(), RMAppAttemptState.FINISHED);
|
rm.waitForState(am.getApplicationAttemptId(), RMAppAttemptState.FINISHED);
|
||||||
|
|
||||||
waitCount = 0;
|
try {
|
||||||
while (containerManager.cleanedup == false && waitCount++ < 20) {
|
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||||
LOG.info("Waiting for AM Cleanup to happen..");
|
@Override public Boolean get() {
|
||||||
Thread.sleep(1000);
|
return containerManager.cleanedup;
|
||||||
|
}
|
||||||
|
}, 100, 200 * 100);
|
||||||
|
} catch (TimeoutException e) {
|
||||||
|
fail("timed out while waiting for AM cleanup to happen.");
|
||||||
}
|
}
|
||||||
Assert.assertTrue(containerManager.cleanedup);
|
Assert.assertTrue(containerManager.cleanedup);
|
||||||
|
|
||||||
|
@ -257,6 +271,48 @@ public class TestApplicationMasterLauncher {
|
||||||
rm.stop();
|
rm.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAMCleanupBeforeLaunch() throws Exception {
|
||||||
|
MockRM rm = new MockRM();
|
||||||
|
rm.start();
|
||||||
|
MockNM nm1 = rm.registerNode("127.0.0.1:1234", 5120);
|
||||||
|
RMApp app = rm.submitApp(2000);
|
||||||
|
// kick the scheduling
|
||||||
|
nm1.nodeHeartbeat(true);
|
||||||
|
RMAppAttempt attempt = app.getCurrentAppAttempt();
|
||||||
|
|
||||||
|
try {
|
||||||
|
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||||
|
@Override public Boolean get() {
|
||||||
|
return attempt.getMasterContainer() != null;
|
||||||
|
}
|
||||||
|
}, 10, 200 * 100);
|
||||||
|
} catch (TimeoutException e) {
|
||||||
|
fail("timed out while waiting for AM Launch to happen.");
|
||||||
|
}
|
||||||
|
|
||||||
|
//send kill before launch
|
||||||
|
rm.killApp(app.getApplicationId());
|
||||||
|
rm.waitForState(app.getApplicationId(), RMAppState.KILLED);
|
||||||
|
//Launch after kill
|
||||||
|
AMLauncher launcher = new AMLauncher(rm.getRMContext(),
|
||||||
|
attempt, AMLauncherEventType.LAUNCH, rm.getConfig()) {
|
||||||
|
@Override
|
||||||
|
public void onAMLaunchFailed(ContainerId containerId, Exception e) {
|
||||||
|
Assert.assertFalse("NullPointerException happens "
|
||||||
|
+ " while launching " + containerId,
|
||||||
|
e instanceof NullPointerException);
|
||||||
|
}
|
||||||
|
@Override
|
||||||
|
protected ContainerManagementProtocol getContainerMgrProxy(
|
||||||
|
ContainerId containerId) {
|
||||||
|
return new MyContainerManagerImpl();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
launcher.run();
|
||||||
|
rm.stop();
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testRetriesOnFailures() throws Exception {
|
public void testRetriesOnFailures() throws Exception {
|
||||||
final ContainerManagementProtocol mockProxy =
|
final ContainerManagementProtocol mockProxy =
|
||||||
|
|
Loading…
Reference in New Issue