YARN-6234. Support multiple attempts on the node when AMRMProxy is enabled. (Giovanni Matteo Fumarola via Subru).

This commit is contained in:
Subru Krishnan 2017-05-08 16:41:30 -07:00
parent 1769b12a77
commit cd9ff27ffc
2 changed files with 63 additions and 5 deletions

View File

@ -270,18 +270,40 @@ public class AMRMProxyService extends AbstractService implements
* @param user
* @param amrmToken
*/
protected void initializePipeline(
ApplicationAttemptId applicationAttemptId, String user,
Token<AMRMTokenIdentifier> amrmToken,
protected void initializePipeline(ApplicationAttemptId applicationAttemptId,
String user, Token<AMRMTokenIdentifier> amrmToken,
Token<AMRMTokenIdentifier> localToken) {
RequestInterceptorChainWrapper chainWrapper = null;
synchronized (applPipelineMap) {
if (applPipelineMap.containsKey(applicationAttemptId.getApplicationId())) {
if (applPipelineMap
.containsKey(applicationAttemptId.getApplicationId())) {
LOG.warn("Request to start an already existing appId was received. "
+ " This can happen if an application failed and a new attempt "
+ "was created on this machine. ApplicationId: "
+ applicationAttemptId.toString());
return;
RequestInterceptorChainWrapper chainWrapperBackup =
this.applPipelineMap.get(applicationAttemptId.getApplicationId());
if (chainWrapperBackup != null
&& chainWrapperBackup.getApplicationAttemptId() != null
&& !chainWrapperBackup.getApplicationAttemptId()
.equals(applicationAttemptId)) {
// Remove the existing pipeline
LOG.info("Remove the previous pipeline for ApplicationId: "
+ applicationAttemptId.toString());
RequestInterceptorChainWrapper pipeline =
applPipelineMap.remove(applicationAttemptId.getApplicationId());
try {
pipeline.getRootInterceptor().shutdown();
} catch (Throwable ex) {
LOG.warn(
"Failed to shutdown the request processing pipeline for app:"
+ applicationAttemptId.getApplicationId(),
ex);
}
} else {
return;
}
}
chainWrapper = new RequestInterceptorChainWrapper();

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@ -27,10 +28,14 @@ import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyService.RequestInterceptorChainWrapper;
import org.apache.hadoop.yarn.util.Records;
import org.junit.Assert;
import org.junit.Test;
@ -380,6 +385,37 @@ public class TestAMRMProxyService extends BaseAMRMProxyTest {
}
}
@Test
public void testMultipleAttemptsSameNode()
throws YarnException, IOException, Exception {
String user = "hadoop";
ApplicationId appId = ApplicationId.newInstance(1, 1);
ApplicationAttemptId applicationAttemptId;
// First Attempt
RegisterApplicationMasterResponse response1 =
registerApplicationMaster(appId.getId());
Assert.assertNotNull(response1);
AllocateResponse allocateResponse = allocate(appId.getId());
Assert.assertNotNull(allocateResponse);
// Second Attempt
applicationAttemptId = ApplicationAttemptId.newInstance(appId, 2);
getAMRMProxyService().initializePipeline(applicationAttemptId, user, null,
null);
RequestInterceptorChainWrapper chain2 =
getAMRMProxyService().getPipelines().get(appId);
Assert.assertEquals(applicationAttemptId, chain2.getApplicationAttemptId());
allocateResponse = allocate(appId.getId());
Assert.assertNotNull(allocateResponse);
}
private List<Container> getContainersAndAssert(int appId,
int numberOfResourceRequests) throws Exception {
AllocateRequest allocateRequest =