From d481684c7c9293a94f54ef622a92753531c6acc7 Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Tue, 23 Jun 2015 11:39:55 -0700 Subject: [PATCH] MAPREDUCE-6410. Fixed MapReduce JobHistory server to use the right (login) UGI to refresh log and cleaner settings. Contributed by Varun Saxena. --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../mapreduce/v2/hs/server/HSAdminServer.java | 43 ++++++++++++++- .../v2/hs/server/TestHSAdminServer.java | 55 +++++++++++++++++++ 3 files changed, 99 insertions(+), 2 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 0cefde0c5f0..5eae44eb27d 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -548,6 +548,9 @@ Release 2.7.1 - UNRELEASED MAPREDUCE-6387. Serialize the recently added Task#encryptedSpillKey field at the end. (Arun Suresh via kasha) + MAPREDUCE-6410. Fixed MapReduce JobHistory server to use the right (login) + UGI to refresh log and cleaner settings. (Varun Saxena via vinodkv) + Release 2.7.0 - 2015-04-20 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/server/HSAdminServer.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/server/HSAdminServer.java index ef79bf87f64..3fef5e278b0 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/server/HSAdminServer.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/server/HSAdminServer.java @@ -20,6 +20,7 @@ package org.apache.hadoop.mapreduce.v2.hs.server; import java.io.IOException; import java.net.InetSocketAddress; +import java.security.PrivilegedExceptionAction; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -52,6 +53,7 @@ import org.apache.hadoop.mapreduce.v2.hs.JobHistory; import org.apache.hadoop.mapreduce.v2.hs.proto.HSAdminRefreshProtocolProtos.HSAdminRefreshProtocolService; import org.apache.hadoop.mapreduce.v2.hs.protocolPB.HSAdminRefreshProtocolServerSideTranslatorPB; +import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.BlockingService; @Private @@ -67,6 +69,8 @@ public class HSAdminServer extends AbstractService implements HSAdminProtocol { private static final String HISTORY_ADMIN_SERVER = "HSAdminServer"; private JobHistory jobHistoryService = null; + private UserGroupInformation loginUGI; + public HSAdminServer(AggregatedLogDeletionService aggLogDelService, JobHistory jobHistoryService) { super(HSAdminServer.class.getName()); @@ -125,9 +129,24 @@ public class HSAdminServer extends AbstractService implements HSAdminProtocol { @Override protected void serviceStart() throws Exception { + if (UserGroupInformation.isSecurityEnabled()) { + loginUGI = UserGroupInformation.getLoginUser(); + } else { + loginUGI = UserGroupInformation.getCurrentUser(); + } clientRpcServer.start(); } + @VisibleForTesting + UserGroupInformation getLoginUGI() { + return loginUGI; + } + + @VisibleForTesting + void setLoginUGI(UserGroupInformation ugi) { + loginUGI = ugi; + } + @Override protected void serviceStop() throws Exception { if (clientRpcServer != null) { @@ -233,7 +252,17 @@ public class HSAdminServer extends AbstractService implements HSAdminProtocol { public void refreshLogRetentionSettings() throws IOException { UserGroupInformation user = checkAcls("refreshLogRetentionSettings"); - aggLogDelService.refreshLogRetentionSettings(); + try { + loginUGI.doAs(new PrivilegedExceptionAction() { + @Override + public Void run() throws IOException { + aggLogDelService.refreshLogRetentionSettings(); + return null; + } + }); + } catch (InterruptedException e) { + throw new IOException(e); + } HSAuditLogger.logSuccess(user.getShortUserName(), "refreshLogRetentionSettings", "HSAdminServer"); @@ -243,7 +272,17 @@ public class HSAdminServer extends AbstractService implements HSAdminProtocol { public void refreshJobRetentionSettings() throws IOException { UserGroupInformation user = checkAcls("refreshJobRetentionSettings"); - jobHistoryService.refreshJobRetentionSettings(); + try { + loginUGI.doAs(new PrivilegedExceptionAction() { + @Override + public Void run() throws IOException { + jobHistoryService.refreshJobRetentionSettings(); + return null; + } + }); + } catch (InterruptedException e) { + throw new IOException(e); + } HSAuditLogger.logSuccess(user.getShortUserName(), "refreshJobRetentionSettings", HISTORY_ADMIN_SERVER); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/server/TestHSAdminServer.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/server/TestHSAdminServer.java index 7ab90f01084..d8313479f32 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/server/TestHSAdminServer.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/server/TestHSAdminServer.java @@ -21,6 +21,8 @@ package org.apache.hadoop.mapreduce.v2.hs.server; import static org.junit.Assert.*; import java.io.IOException; +import java.security.PrivilegedAction; +import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -45,6 +47,9 @@ import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameters; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.mockito.Mockito.verify; @@ -286,6 +291,56 @@ public class TestHSAdminServer { verify(jobHistoryService).refreshJobRetentionSettings(); } + @SuppressWarnings("unchecked") + @Test + public void testUGIForLogAndJobRefresh() throws Exception { + UserGroupInformation ugi = + UserGroupInformation.createUserForTesting("test", new String[] {"grp"}); + UserGroupInformation loginUGI = spy(hsAdminServer.getLoginUGI()); + hsAdminServer.setLoginUGI(loginUGI); + + // Run refresh log retention settings with test user + ugi.doAs(new PrivilegedAction() { + @Override + public Void run() { + String[] args = new String[1]; + args[0] = "-refreshLogRetentionSettings"; + try { + hsAdminClient.run(args); + } catch (Exception e) { + fail("refreshLogRetentionSettings should have been successful"); + } + return null; + } + }); + // Verify if AggregatedLogDeletionService#refreshLogRetentionSettings was + // called with login UGI, instead of the UGI command was run with. + verify(loginUGI).doAs(any(PrivilegedExceptionAction.class)); + verify(alds).refreshLogRetentionSettings(); + + // Reset for refresh job retention settings + reset(loginUGI); + + // Run refresh job retention settings with test user + ugi.doAs(new PrivilegedAction() { + @Override + public Void run() { + String[] args = new String[1]; + args[0] = "-refreshJobRetentionSettings"; + try { + hsAdminClient.run(args); + } catch (Exception e) { + fail("refreshJobRetentionSettings should have been successful"); + } + return null; + } + }); + // Verify if JobHistory#refreshJobRetentionSettings was called with + // login UGI, instead of the UGI command was run with. + verify(loginUGI).doAs(any(PrivilegedExceptionAction.class)); + verify(jobHistoryService).refreshJobRetentionSettings(); + } + @After public void cleanUp() { if (hsAdminServer != null)