YARN-8869. YARN Service Client might not work correctly with RM REST API for Kerberos authentication. Contributed by Eric Yang.

(cherry picked from commit fa94d370b6)
This commit is contained in:
Sunil G 2018-10-15 21:21:57 +05:30
parent aff21d7271
commit 1ca5f974f1
2 changed files with 120 additions and 12 deletions

View File

@ -70,7 +70,6 @@ import org.slf4j.LoggerFactory;
import com.sun.jersey.api.client.Client; import com.sun.jersey.api.client.Client;
import com.sun.jersey.api.client.ClientResponse; import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.WebResource;
import com.sun.jersey.api.client.WebResource.Builder; import com.sun.jersey.api.client.WebResource.Builder;
import com.sun.jersey.api.client.config.ClientConfig; import com.sun.jersey.api.client.config.ClientConfig;
import com.sun.jersey.api.client.config.DefaultClientConfig; import com.sun.jersey.api.client.config.DefaultClientConfig;
@ -144,7 +143,7 @@ public class ApiServiceClient extends AppAdminClient {
/** /**
* Calculate Resource Manager address base on working REST API. * Calculate Resource Manager address base on working REST API.
*/ */
private String getRMWebAddress() { String getRMWebAddress() {
Configuration conf = getConfig(); Configuration conf = getConfig();
String scheme = "http://"; String scheme = "http://";
String path = "/app/v1/services/version"; String path = "/app/v1/services/version";
@ -156,8 +155,7 @@ public class ApiServiceClient extends AppAdminClient {
.get("yarn.resourcemanager.webapp.https.address"); .get("yarn.resourcemanager.webapp.https.address");
} }
boolean useKerberos = UserGroupInformation.isSecurityEnabled(); boolean useKerberos = UserGroupInformation.isSecurityEnabled();
List<String> rmServers = RMHAUtils List<String> rmServers = getRMHAWebAddresses(conf);
.getRMHAWebappAddresses(new YarnConfiguration(conf));
for (String host : rmServers) { for (String host : rmServers) {
try { try {
Client client = Client.create(); Client client = Client.create();
@ -175,16 +173,16 @@ public class ApiServiceClient extends AppAdminClient {
LOG.debug("Fail to resolve username: {}", e); LOG.debug("Fail to resolve username: {}", e);
} }
} }
WebResource webResource = client Builder builder = client
.resource(sb.toString()); .resource(sb.toString()).type(MediaType.APPLICATION_JSON);
if (useKerberos) { if (useKerberos) {
String[] server = host.split(":"); String[] server = host.split(":");
String challenge = generateToken(server[0]); String challenge = generateToken(server[0]);
webResource.header(HttpHeaders.AUTHORIZATION, "Negotiate " + builder.header(HttpHeaders.AUTHORIZATION, "Negotiate " +
challenge); challenge);
LOG.debug("Authorization: Negotiate {}", challenge); LOG.debug("Authorization: Negotiate {}", challenge);
} }
ClientResponse test = webResource.get(ClientResponse.class); ClientResponse test = builder.get(ClientResponse.class);
if (test.getStatus() == 200) { if (test.getStatus() == 200) {
rmAddress = host; rmAddress = host;
break; break;
@ -197,6 +195,11 @@ public class ApiServiceClient extends AppAdminClient {
return scheme+rmAddress; return scheme+rmAddress;
} }
List<String> getRMHAWebAddresses(Configuration conf) {
return RMHAUtils
.getRMHAWebappAddresses(new YarnConfiguration(conf));
}
/** /**
* Compute active resource manager API service location. * Compute active resource manager API service location.
* *

View File

@ -21,11 +21,19 @@ package org.apache.hadoop.yarn.service.client;
import static org.junit.Assert.*; import static org.junit.Assert.*;
import java.io.File; import java.io.File;
import java.io.IOException;
import javax.security.sasl.Sasl; import javax.security.sasl.Sasl;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.util.Map; import java.util.Map;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.minikdc.KerberosSecurityTestcase; import org.apache.hadoop.minikdc.KerberosSecurityTestcase;
@ -33,6 +41,13 @@ import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.SaslRpcServer.QualityOfProtection; import org.apache.hadoop.security.SaslRpcServer.QualityOfProtection;
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
import org.apache.log4j.Logger;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -55,29 +70,119 @@ public class TestSecureApiServiceClient extends KerberosSecurityTestcase {
private File keytabFile; private File keytabFile;
private Configuration conf = new Configuration(); private Configuration testConf = new Configuration();
private Map<String, String> props; private Map<String, String> props;
private static Server server;
private static Logger LOG = Logger
.getLogger(TestSecureApiServiceClient.class);
private ApiServiceClient asc;
/**
* A mocked version of API Service for testing purpose.
*
*/
@SuppressWarnings("serial")
public static class TestServlet extends HttpServlet {
private static boolean headerFound = false;
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp)
throws ServletException, IOException {
Enumeration<String> headers = req.getHeaderNames();
while(headers.hasMoreElements()) {
String header = headers.nextElement();
LOG.info(header);
}
if (req.getHeader("Authorization")!=null) {
headerFound = true;
resp.setStatus(HttpServletResponse.SC_OK);
} else {
headerFound = false;
resp.setStatus(HttpServletResponse.SC_NOT_FOUND);
}
}
@Override
protected void doPost(HttpServletRequest req, HttpServletResponse resp)
throws ServletException, IOException {
resp.setStatus(HttpServletResponse.SC_OK);
}
@Override
protected void doPut(HttpServletRequest req, HttpServletResponse resp)
throws ServletException, IOException {
resp.setStatus(HttpServletResponse.SC_OK);
}
@Override
protected void doDelete(HttpServletRequest req, HttpServletResponse resp)
throws ServletException, IOException {
resp.setStatus(HttpServletResponse.SC_OK);
}
public static boolean isHeaderExist() {
return headerFound;
}
}
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
keytabFile = new File(getWorkDir(), "keytab"); keytabFile = new File(getWorkDir(), "keytab");
getKdc().createPrincipal(keytabFile, clientPrincipal, server1Principal, getKdc().createPrincipal(keytabFile, clientPrincipal, server1Principal,
server2Principal); server2Principal);
SecurityUtil.setAuthenticationMethod(AuthenticationMethod.KERBEROS, conf); SecurityUtil.setAuthenticationMethod(AuthenticationMethod.KERBEROS,
UserGroupInformation.setConfiguration(conf); testConf);
UserGroupInformation.setConfiguration(testConf);
UserGroupInformation.setShouldRenewImmediatelyForTests(true); UserGroupInformation.setShouldRenewImmediatelyForTests(true);
props = new HashMap<String, String>(); props = new HashMap<String, String>();
props.put(Sasl.QOP, QualityOfProtection.AUTHENTICATION.saslQop); props.put(Sasl.QOP, QualityOfProtection.AUTHENTICATION.saslQop);
server = new Server(8088);
((QueuedThreadPool)server.getThreadPool()).setMaxThreads(10);
ServletContextHandler context = new ServletContextHandler();
context.setContextPath("/app");
server.setHandler(context);
context.addServlet(new ServletHolder(TestServlet.class), "/*");
((ServerConnector)server.getConnectors()[0]).setHost("localhost");
server.start();
List<String> rmServers = new ArrayList<String>();
rmServers.add("localhost:8088");
testConf.set("yarn.resourcemanager.webapp.address",
"localhost:8088");
asc = new ApiServiceClient() {
@Override
List<String> getRMHAWebAddresses(Configuration conf) {
return rmServers;
}
};
asc.serviceInit(testConf);
}
@After
public void tearDown() throws Exception {
server.stop();
} }
@Test @Test
public void testHttpSpnegoChallenge() throws Exception { public void testHttpSpnegoChallenge() throws Exception {
UserGroupInformation.loginUserFromKeytab(clientPrincipal, keytabFile UserGroupInformation.loginUserFromKeytab(clientPrincipal, keytabFile
.getCanonicalPath()); .getCanonicalPath());
ApiServiceClient asc = new ApiServiceClient(); asc = new ApiServiceClient();
String challenge = asc.generateToken("localhost"); String challenge = asc.generateToken("localhost");
assertNotNull(challenge); assertNotNull(challenge);
} }
@Test
public void testAuthorizationHeader() throws Exception {
UserGroupInformation.loginUserFromKeytab(clientPrincipal, keytabFile
.getCanonicalPath());
String rmAddress = asc.getRMWebAddress();
if (TestServlet.isHeaderExist()) {
assertEquals(rmAddress, "http://localhost:8088");
} else {
fail("Did not see Authorization header.");
}
}
} }