YARN-4180. AMLauncher does not retry on failures when talking to NM. (adhoot)
(cherry picked from commit 9735afe967
)
Conflicts:
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java
This commit is contained in:
parent
600022ae69
commit
22f2501476
|
@ -947,6 +947,9 @@ Release 2.7.2 - UNRELEASED
|
||||||
YARN-3624. ApplicationHistoryServer should not reverse the order of the
|
YARN-3624. ApplicationHistoryServer should not reverse the order of the
|
||||||
filters it gets. (Mit Desai via xgong)
|
filters it gets. (Mit Desai via xgong)
|
||||||
|
|
||||||
|
YARN-4180. AMLauncher does not retry on failures when talking to NM.
|
||||||
|
(adhoot)
|
||||||
|
|
||||||
Release 2.7.1 - 2015-07-06
|
Release 2.7.1 - 2015-07-06
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -21,7 +21,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.amlauncher;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.security.PrivilegedAction;
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -51,6 +50,7 @@ import org.apache.hadoop.yarn.api.records.Container;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
|
import org.apache.hadoop.yarn.client.NMProxy;
|
||||||
import org.apache.hadoop.yarn.event.EventHandler;
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||||
|
@ -151,10 +151,10 @@ public class AMLauncher implements Runnable {
|
||||||
final ContainerId containerId) {
|
final ContainerId containerId) {
|
||||||
|
|
||||||
final NodeId node = masterContainer.getNodeId();
|
final NodeId node = masterContainer.getNodeId();
|
||||||
final InetSocketAddress containerManagerBindAddress =
|
final InetSocketAddress containerManagerConnectAddress =
|
||||||
NetUtils.createSocketAddrForHost(node.getHost(), node.getPort());
|
NetUtils.createSocketAddrForHost(node.getHost(), node.getPort());
|
||||||
|
|
||||||
final YarnRPC rpc = YarnRPC.create(conf); // TODO: Don't create again and again.
|
final YarnRPC rpc = getYarnRPC();
|
||||||
|
|
||||||
UserGroupInformation currentUser =
|
UserGroupInformation currentUser =
|
||||||
UserGroupInformation.createRemoteUser(containerId
|
UserGroupInformation.createRemoteUser(containerId
|
||||||
|
@ -168,18 +168,15 @@ public class AMLauncher implements Runnable {
|
||||||
rmContext.getNMTokenSecretManager().createNMToken(
|
rmContext.getNMTokenSecretManager().createNMToken(
|
||||||
containerId.getApplicationAttemptId(), node, user);
|
containerId.getApplicationAttemptId(), node, user);
|
||||||
currentUser.addToken(ConverterUtils.convertFromYarn(token,
|
currentUser.addToken(ConverterUtils.convertFromYarn(token,
|
||||||
containerManagerBindAddress));
|
containerManagerConnectAddress));
|
||||||
|
|
||||||
return currentUser
|
return NMProxy.createNMProxy(conf, ContainerManagementProtocol.class,
|
||||||
.doAs(new PrivilegedAction<ContainerManagementProtocol>() {
|
currentUser, rpc, containerManagerConnectAddress);
|
||||||
|
|
||||||
@Override
|
|
||||||
public ContainerManagementProtocol run() {
|
|
||||||
return (ContainerManagementProtocol) rpc.getProxy(
|
|
||||||
ContainerManagementProtocol.class,
|
|
||||||
containerManagerBindAddress, conf);
|
|
||||||
}
|
}
|
||||||
});
|
|
||||||
|
@VisibleForTesting
|
||||||
|
protected YarnRPC getYarnRPC() {
|
||||||
|
return YarnRPC.create(conf); // TODO: Don't create again and again.
|
||||||
}
|
}
|
||||||
|
|
||||||
private ContainerLaunchContext createAMContainerLaunchContext(
|
private ContainerLaunchContext createAMContainerLaunchContext(
|
||||||
|
|
|
@ -164,10 +164,14 @@ public class MockRM extends ResourceManager {
|
||||||
public void waitForState(ApplicationAttemptId attemptId,
|
public void waitForState(ApplicationAttemptId attemptId,
|
||||||
RMAppAttemptState finalState)
|
RMAppAttemptState finalState)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
|
waitForState(attemptId, finalState, 40000);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void waitForState(ApplicationAttemptId attemptId,
|
||||||
|
RMAppAttemptState finalState, int timeoutMsecs) throws Exception {
|
||||||
RMApp app = getRMContext().getRMApps().get(attemptId.getApplicationId());
|
RMApp app = getRMContext().getRMApps().get(attemptId.getApplicationId());
|
||||||
Assert.assertNotNull("app shouldn't be null", app);
|
Assert.assertNotNull("app shouldn't be null", app);
|
||||||
RMAppAttempt attempt = app.getRMAppAttempt(attemptId);
|
RMAppAttempt attempt = app.getRMAppAttempt(attemptId);
|
||||||
final int timeoutMsecs = 40000;
|
|
||||||
final int minWaitMsecs = 1000;
|
final int minWaitMsecs = 1000;
|
||||||
final int waitMsPerLoop = 10;
|
final int waitMsPerLoop = 10;
|
||||||
int loop = 0;
|
int loop = 0;
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager;
|
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
@ -49,14 +50,19 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||||
import org.apache.hadoop.yarn.api.records.SerializedException;
|
import org.apache.hadoop.yarn.api.records.SerializedException;
|
||||||
import org.apache.hadoop.yarn.api.records.Token;
|
import org.apache.hadoop.yarn.api.records.Token;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||||
|
import org.apache.hadoop.yarn.event.DrainDispatcher;
|
||||||
import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
|
import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
|
||||||
import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
|
import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.NMNotYetReadyException;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.ipc.RPCUtil;
|
import org.apache.hadoop.yarn.ipc.RPCUtil;
|
||||||
|
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||||
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncher;
|
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncher;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
|
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||||
|
@ -67,6 +73,10 @@ import org.apache.log4j.Logger;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import static org.mockito.Matchers.any;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
public class TestApplicationMasterLauncher {
|
public class TestApplicationMasterLauncher {
|
||||||
|
|
||||||
private static final Log LOG = LogFactory
|
private static final Log LOG = LogFactory
|
||||||
|
@ -194,6 +204,60 @@ public class TestApplicationMasterLauncher {
|
||||||
rm.stop();
|
rm.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRetriesOnFailures() throws Exception {
|
||||||
|
final ContainerManagementProtocol mockProxy =
|
||||||
|
mock(ContainerManagementProtocol.class);
|
||||||
|
final StartContainersResponse mockResponse =
|
||||||
|
mock(StartContainersResponse.class);
|
||||||
|
when(mockProxy.startContainers(any(StartContainersRequest.class)))
|
||||||
|
.thenThrow(new NMNotYetReadyException("foo")).thenReturn(mockResponse);
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
|
||||||
|
conf.setInt(YarnConfiguration.CLIENT_NM_CONNECT_RETRY_INTERVAL_MS, 1);
|
||||||
|
final DrainDispatcher dispatcher = new DrainDispatcher();
|
||||||
|
MockRM rm = new MockRMWithCustomAMLauncher(conf, null) {
|
||||||
|
@Override
|
||||||
|
protected ApplicationMasterLauncher createAMLauncher() {
|
||||||
|
return new ApplicationMasterLauncher(getRMContext()) {
|
||||||
|
@Override
|
||||||
|
protected Runnable createRunnableLauncher(RMAppAttempt application,
|
||||||
|
AMLauncherEventType event) {
|
||||||
|
return new AMLauncher(context, application, event, getConfig()) {
|
||||||
|
@Override
|
||||||
|
protected YarnRPC getYarnRPC() {
|
||||||
|
YarnRPC mockRpc = mock(YarnRPC.class);
|
||||||
|
|
||||||
|
when(mockRpc.getProxy(
|
||||||
|
any(Class.class),
|
||||||
|
any(InetSocketAddress.class),
|
||||||
|
any(Configuration.class)))
|
||||||
|
.thenReturn(mockProxy);
|
||||||
|
return mockRpc;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Dispatcher createDispatcher() {
|
||||||
|
return dispatcher;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
rm.start();
|
||||||
|
MockNM nm1 = rm.registerNode("127.0.0.1:1234", 5120);
|
||||||
|
|
||||||
|
RMApp app = rm.submitApp(2000);
|
||||||
|
final ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
|
||||||
|
.getAppAttemptId();
|
||||||
|
|
||||||
|
// kick the scheduling
|
||||||
|
nm1.nodeHeartbeat(true);
|
||||||
|
dispatcher.await();
|
||||||
|
|
||||||
|
rm.waitForState(appAttemptId, RMAppAttemptState.LAUNCHED, 500);
|
||||||
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unused")
|
@SuppressWarnings("unused")
|
||||||
@Test(timeout = 100000)
|
@Test(timeout = 100000)
|
||||||
|
|
Loading…
Reference in New Issue