YARN-8869. YARN Service Client might not work correctly with RM REST API for Kerberos authentication. Contributed by Eric Yang.

This commit is contained in:
Sunil G 2018-10-15 21:21:57 +05:30
parent b4a38e7b3e
commit fa94d370b6
2 changed files with 120 additions and 12 deletions

View File

@ -70,7 +70,6 @@ import org.slf4j.LoggerFactory;
import com.sun.jersey.api.client.Client;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.WebResource;
import com.sun.jersey.api.client.WebResource.Builder;
import com.sun.jersey.api.client.config.ClientConfig;
import com.sun.jersey.api.client.config.DefaultClientConfig;
@ -144,7 +143,7 @@ public class ApiServiceClient extends AppAdminClient {
/**
* Calculate Resource Manager address base on working REST API.
*/
private String getRMWebAddress() {
String getRMWebAddress() {
Configuration conf = getConfig();
String scheme = "http://";
String path = "/app/v1/services/version";
@ -156,8 +155,7 @@ public class ApiServiceClient extends AppAdminClient {
.get("yarn.resourcemanager.webapp.https.address");
}
boolean useKerberos = UserGroupInformation.isSecurityEnabled();
List<String> rmServers = RMHAUtils
.getRMHAWebappAddresses(new YarnConfiguration(conf));
List<String> rmServers = getRMHAWebAddresses(conf);
for (String host : rmServers) {
try {
Client client = Client.create();
@ -175,16 +173,16 @@ public class ApiServiceClient extends AppAdminClient {
LOG.debug("Fail to resolve username: {}", e);
}
}
WebResource webResource = client
.resource(sb.toString());
Builder builder = client
.resource(sb.toString()).type(MediaType.APPLICATION_JSON);
if (useKerberos) {
String[] server = host.split(":");
String challenge = generateToken(server[0]);
webResource.header(HttpHeaders.AUTHORIZATION, "Negotiate " +
builder.header(HttpHeaders.AUTHORIZATION, "Negotiate " +
challenge);
LOG.debug("Authorization: Negotiate {}", challenge);
}
ClientResponse test = webResource.get(ClientResponse.class);
ClientResponse test = builder.get(ClientResponse.class);
if (test.getStatus() == 200) {
rmAddress = host;
break;
@ -197,6 +195,11 @@ public class ApiServiceClient extends AppAdminClient {
return scheme+rmAddress;
}
List<String> getRMHAWebAddresses(Configuration conf) {
return RMHAUtils
.getRMHAWebappAddresses(new YarnConfiguration(conf));
}
/**
* Compute active resource manager API service location.
*

View File

@ -21,11 +21,19 @@ package org.apache.hadoop.yarn.service.client;
import static org.junit.Assert.*;
import java.io.File;
import java.io.IOException;
import javax.security.sasl.Sasl;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.util.Map;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.minikdc.KerberosSecurityTestcase;
@ -33,6 +41,13 @@ import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.SaslRpcServer.QualityOfProtection;
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
import org.apache.log4j.Logger;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -55,29 +70,119 @@ public class TestSecureApiServiceClient extends KerberosSecurityTestcase {
private File keytabFile;
private Configuration conf = new Configuration();
private Configuration testConf = new Configuration();
private Map<String, String> props;
private static Server server;
private static Logger LOG = Logger
.getLogger(TestSecureApiServiceClient.class);
private ApiServiceClient asc;
/**
* A mocked version of API Service for testing purpose.
*
*/
@SuppressWarnings("serial")
public static class TestServlet extends HttpServlet {
private static boolean headerFound = false;
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp)
throws ServletException, IOException {
Enumeration<String> headers = req.getHeaderNames();
while(headers.hasMoreElements()) {
String header = headers.nextElement();
LOG.info(header);
}
if (req.getHeader("Authorization")!=null) {
headerFound = true;
resp.setStatus(HttpServletResponse.SC_OK);
} else {
headerFound = false;
resp.setStatus(HttpServletResponse.SC_NOT_FOUND);
}
}
@Override
protected void doPost(HttpServletRequest req, HttpServletResponse resp)
throws ServletException, IOException {
resp.setStatus(HttpServletResponse.SC_OK);
}
@Override
protected void doPut(HttpServletRequest req, HttpServletResponse resp)
throws ServletException, IOException {
resp.setStatus(HttpServletResponse.SC_OK);
}
@Override
protected void doDelete(HttpServletRequest req, HttpServletResponse resp)
throws ServletException, IOException {
resp.setStatus(HttpServletResponse.SC_OK);
}
public static boolean isHeaderExist() {
return headerFound;
}
}
@Before
public void setUp() throws Exception {
keytabFile = new File(getWorkDir(), "keytab");
getKdc().createPrincipal(keytabFile, clientPrincipal, server1Principal,
server2Principal);
SecurityUtil.setAuthenticationMethod(AuthenticationMethod.KERBEROS, conf);
UserGroupInformation.setConfiguration(conf);
SecurityUtil.setAuthenticationMethod(AuthenticationMethod.KERBEROS,
testConf);
UserGroupInformation.setConfiguration(testConf);
UserGroupInformation.setShouldRenewImmediatelyForTests(true);
props = new HashMap<String, String>();
props.put(Sasl.QOP, QualityOfProtection.AUTHENTICATION.saslQop);
server = new Server(8088);
((QueuedThreadPool)server.getThreadPool()).setMaxThreads(10);
ServletContextHandler context = new ServletContextHandler();
context.setContextPath("/app");
server.setHandler(context);
context.addServlet(new ServletHolder(TestServlet.class), "/*");
((ServerConnector)server.getConnectors()[0]).setHost("localhost");
server.start();
List<String> rmServers = new ArrayList<String>();
rmServers.add("localhost:8088");
testConf.set("yarn.resourcemanager.webapp.address",
"localhost:8088");
asc = new ApiServiceClient() {
@Override
List<String> getRMHAWebAddresses(Configuration conf) {
return rmServers;
}
};
asc.serviceInit(testConf);
}
@After
public void tearDown() throws Exception {
server.stop();
}
@Test
public void testHttpSpnegoChallenge() throws Exception {
UserGroupInformation.loginUserFromKeytab(clientPrincipal, keytabFile
.getCanonicalPath());
ApiServiceClient asc = new ApiServiceClient();
asc = new ApiServiceClient();
String challenge = asc.generateToken("localhost");
assertNotNull(challenge);
}
@Test
public void testAuthorizationHeader() throws Exception {
UserGroupInformation.loginUserFromKeytab(clientPrincipal, keytabFile
.getCanonicalPath());
String rmAddress = asc.getRMWebAddress();
if (TestServlet.isHeaderExist()) {
assertEquals(rmAddress, "http://localhost:8088");
} else {
fail("Did not see Authorization header.");
}
}
}