From 3a7bfdf3180abae27cfcbf0dab7ff743a044994d Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Mon, 27 Apr 2015 14:58:16 -0700 Subject: [PATCH] MAPREDUCE-6324. Fixed MapReduce uber jobs to not fail the udpate of AM-RM tokens when they roll-over. Contributed by Jason Lowe. (cherry picked from commit 9fc32c5c4d1d5f50c605bdb0e3b13f44c86660c8) --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../v2/app/local/LocalContainerAllocator.java | 28 +++- .../local/TestLocalContainerAllocator.java | 152 ++++++++++++++++-- 3 files changed, 172 insertions(+), 11 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 518b6e3ee98..2af375dc095 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -17,6 +17,9 @@ Release 2.7.1 - UNRELEASED MAPREDUCE-6238. MR2 can't run local jobs with -libjars command options which is a regression from MR1 (zxu via rkanter) + MAPREDUCE-6324. Fixed MapReduce uber jobs to not fail the udpate of AM-RM + tokens when they roll-over. (Jason Lowe via vinodkv) + Release 2.7.0 - 2015-04-20 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java index 74dfb39af49..aed10233142 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java @@ -18,11 +18,13 @@ package org.apache.hadoop.mapreduce.v2.app.local; +import java.io.IOException; import java.util.ArrayList; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.JobCounter; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.v2.api.records.TaskType; @@ -35,17 +37,22 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssigned import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator; import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent; import org.apache.hadoop.mapreduce.v2.app.rm.RMCommunicator; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.Token; +import org.apache.hadoop.yarn.client.ClientRMProxy; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException; import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; /** * Allocates containers locally. Doesn't allocate a real container; @@ -99,8 +106,9 @@ public class LocalContainerAllocator extends RMCommunicator AllocateRequest.newInstance(this.lastResponseID, super.getApplicationProgress(), new ArrayList(), new ArrayList(), null); + AllocateResponse allocateResponse = null; try { - scheduler.allocate(allocateRequest); + allocateResponse = scheduler.allocate(allocateRequest); // Reset retry count if no exception occurred. retrystartTime = System.currentTimeMillis(); } catch (ApplicationAttemptNotFoundException e) { @@ -131,6 +139,24 @@ public class LocalContainerAllocator extends RMCommunicator // continue to attempt to contact the RM. throw e; } + + if (allocateResponse != null) { + this.lastResponseID = allocateResponse.getResponseId(); + Token token = allocateResponse.getAMRMToken(); + if (token != null) { + updateAMRMToken(token); + } + } + } + + private void updateAMRMToken(Token token) throws IOException { + org.apache.hadoop.security.token.Token amrmToken = + new org.apache.hadoop.security.token.Token(token + .getIdentifier().array(), token.getPassword().array(), new Text( + token.getKind()), new Text(token.getService())); + UserGroupInformation currentUGI = UserGroupInformation.getCurrentUser(); + currentUGI.addToken(amrmToken); + amrmToken.setService(ClientRMProxy.getAMRMTokenService(getConfig())); } @SuppressWarnings("unchecked") diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java index 90dbe489f4b..f901ed8f100 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java @@ -22,23 +22,43 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import java.io.IOException; +import java.security.PrivilegedExceptionAction; +import java.util.Collections; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.ClusterInfo; import org.apache.hadoop.mapreduce.v2.app.client.ClientService; import org.apache.hadoop.mapreduce.v2.app.job.Job; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerResourceDecrease; +import org.apache.hadoop.yarn.api.records.ContainerResourceIncrease; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NMToken; +import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.client.ClientRMProxy; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.ipc.RPCUtil; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.Assert; import org.junit.Test; @@ -48,8 +68,13 @@ public class TestLocalContainerAllocator { public void testRMConnectionRetry() throws Exception { // verify the connection exception is thrown // if we haven't exhausted the retry interval + ApplicationMasterProtocol mockScheduler = + mock(ApplicationMasterProtocol.class); + when(mockScheduler.allocate(isA(AllocateRequest.class))) + .thenThrow(RPCUtil.getRemoteException(new IOException("forcefail"))); Configuration conf = new Configuration(); - LocalContainerAllocator lca = new StubbedLocalContainerAllocator(); + LocalContainerAllocator lca = + new StubbedLocalContainerAllocator(mockScheduler); lca.init(conf); lca.start(); try { @@ -63,7 +88,7 @@ public class TestLocalContainerAllocator { // verify YarnRuntimeException is thrown when the retry interval has expired conf.setLong(MRJobConfig.MR_AM_TO_RM_WAIT_INTERVAL_MS, 0); - lca = new StubbedLocalContainerAllocator(); + lca = new StubbedLocalContainerAllocator(mockScheduler); lca.init(conf); lca.start(); try { @@ -76,12 +101,84 @@ public class TestLocalContainerAllocator { } } + @Test + public void testAllocResponseId() throws Exception { + ApplicationMasterProtocol scheduler = new MockScheduler(); + Configuration conf = new Configuration(); + LocalContainerAllocator lca = + new StubbedLocalContainerAllocator(scheduler); + lca.init(conf); + lca.start(); + + // do two heartbeats to verify the response ID is being tracked + lca.heartbeat(); + lca.heartbeat(); + lca.close(); + } + + @Test + public void testAMRMTokenUpdate() throws Exception { + Configuration conf = new Configuration(); + ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance( + ApplicationId.newInstance(1, 1), 1); + AMRMTokenIdentifier oldTokenId = new AMRMTokenIdentifier(attemptId, 1); + AMRMTokenIdentifier newTokenId = new AMRMTokenIdentifier(attemptId, 2); + Token oldToken = new Token( + oldTokenId.getBytes(), "oldpassword".getBytes(), oldTokenId.getKind(), + new Text()); + Token newToken = new Token( + newTokenId.getBytes(), "newpassword".getBytes(), newTokenId.getKind(), + new Text()); + + MockScheduler scheduler = new MockScheduler(); + scheduler.amToken = newToken; + + final LocalContainerAllocator lca = + new StubbedLocalContainerAllocator(scheduler); + lca.init(conf); + lca.start(); + + UserGroupInformation testUgi = UserGroupInformation.createUserForTesting( + "someuser", new String[0]); + testUgi.addToken(oldToken); + testUgi.doAs(new PrivilegedExceptionAction() { + @Override + public Void run() throws Exception { + lca.heartbeat(); + return null; + } + }); + lca.close(); + + // verify there is only one AMRM token in the UGI and it matches the + // updated token from the RM + int tokenCount = 0; + Token ugiToken = null; + for (Token token : testUgi.getTokens()) { + if (AMRMTokenIdentifier.KIND_NAME.equals(token.getKind())) { + ugiToken = token; + ++tokenCount; + } + } + + Assert.assertEquals("too many AMRM tokens", 1, tokenCount); + Assert.assertArrayEquals("token identifier not updated", + newToken.getIdentifier(), ugiToken.getIdentifier()); + Assert.assertArrayEquals("token password not updated", + newToken.getPassword(), ugiToken.getPassword()); + Assert.assertEquals("AMRM token service not updated", + new Text(ClientRMProxy.getAMRMTokenService(conf)), + ugiToken.getService()); + } + private static class StubbedLocalContainerAllocator extends LocalContainerAllocator { + private ApplicationMasterProtocol scheduler; - public StubbedLocalContainerAllocator() { + public StubbedLocalContainerAllocator(ApplicationMasterProtocol scheduler) { super(mock(ClientService.class), createAppContext(), "nmhost", 1, 2, null); + this.scheduler = scheduler; } @Override @@ -99,13 +196,6 @@ public class TestLocalContainerAllocator { @Override protected ApplicationMasterProtocol createSchedulerProxy() { - ApplicationMasterProtocol scheduler = mock(ApplicationMasterProtocol.class); - try { - when(scheduler.allocate(isA(AllocateRequest.class))) - .thenThrow(RPCUtil.getRemoteException(new IOException("forcefail"))); - } catch (YarnException e) { - } catch (IOException e) { - } return scheduler; } @@ -126,4 +216,46 @@ public class TestLocalContainerAllocator { return ctx; } } + + private static class MockScheduler implements ApplicationMasterProtocol { + int responseId = 0; + Token amToken = null; + + @Override + public RegisterApplicationMasterResponse registerApplicationMaster( + RegisterApplicationMasterRequest request) throws YarnException, + IOException { + return null; + } + + @Override + public FinishApplicationMasterResponse finishApplicationMaster( + FinishApplicationMasterRequest request) throws YarnException, + IOException { + return null; + } + + @Override + public AllocateResponse allocate(AllocateRequest request) + throws YarnException, IOException { + Assert.assertEquals("response ID mismatch", + responseId, request.getResponseId()); + ++responseId; + org.apache.hadoop.yarn.api.records.Token yarnToken = null; + if (amToken != null) { + yarnToken = org.apache.hadoop.yarn.api.records.Token.newInstance( + amToken.getIdentifier(), amToken.getKind().toString(), + amToken.getPassword(), amToken.getService().toString()); + } + return AllocateResponse.newInstance(responseId, + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + Resources.none(), null, 1, null, + Collections.emptyList(), + yarnToken, + Collections.emptyList(), + Collections.emptyList()); + } + } }