YARN-8435. Fix NPE when the same client simultaneously contact for the first time Yarn Router. Contributed by Rang Jiaheng.

This commit is contained in:
Giovanni Matteo Fumarola 2018-07-05 10:54:31 -07:00
parent 71df8c27c9
commit 0d9804dcef
6 changed files with 259 additions and 78 deletions

View File

@ -430,13 +430,15 @@ public class RouterClientRMService extends AbstractService
return pipeline.getRootInterceptor().getResourceTypeInfo(request); return pipeline.getRootInterceptor().getResourceTypeInfo(request);
} }
private RequestInterceptorChainWrapper getInterceptorChain() @VisibleForTesting
protected RequestInterceptorChainWrapper getInterceptorChain()
throws IOException { throws IOException {
String user = UserGroupInformation.getCurrentUser().getUserName(); String user = UserGroupInformation.getCurrentUser().getUserName();
if (!userPipelineMap.containsKey(user)) { RequestInterceptorChainWrapper chain = userPipelineMap.get(user);
initializePipeline(user); if (chain != null && chain.getRootInterceptor() != null) {
return chain;
} }
return userPipelineMap.get(user); return initializePipeline(user);
} }
/** /**
@ -503,36 +505,33 @@ public class RouterClientRMService extends AbstractService
* *
* @param user * @param user
*/ */
private void initializePipeline(String user) { private RequestInterceptorChainWrapper initializePipeline(String user) {
RequestInterceptorChainWrapper chainWrapper = null;
synchronized (this.userPipelineMap) { synchronized (this.userPipelineMap) {
if (this.userPipelineMap.containsKey(user)) { if (this.userPipelineMap.containsKey(user)) {
LOG.info("Request to start an already existing user: {}" LOG.info("Request to start an already existing user: {}"
+ " was received, so ignoring.", user); + " was received, so ignoring.", user);
return; return userPipelineMap.get(user);
}
RequestInterceptorChainWrapper chainWrapper =
new RequestInterceptorChainWrapper();
try {
// We should init the pipeline instance after it is created and then
// add to the map, to ensure thread safe.
LOG.info("Initializing request processing pipeline for application "
+ "for the user: {}", user);
ClientRequestInterceptor interceptorChain =
this.createRequestInterceptorChain();
interceptorChain.init(user);
chainWrapper.init(interceptorChain);
} catch (Exception e) {
LOG.error("Init ClientRequestInterceptor error for user: " + user, e);
throw e;
} }
chainWrapper = new RequestInterceptorChainWrapper();
this.userPipelineMap.put(user, chainWrapper); this.userPipelineMap.put(user, chainWrapper);
} return chainWrapper;
// We register the pipeline instance in the map first and then initialize it
// later because chain initialization can be expensive and we would like to
// release the lock as soon as possible to prevent other applications from
// blocking when one application's chain is initializing
LOG.info("Initializing request processing pipeline for application "
+ "for the user: {}", user);
try {
ClientRequestInterceptor interceptorChain =
this.createRequestInterceptorChain();
interceptorChain.init(user);
chainWrapper.init(interceptorChain);
} catch (Exception e) {
synchronized (this.userPipelineMap) {
this.userPipelineMap.remove(user);
}
throw e;
} }
} }

View File

@ -165,13 +165,15 @@ public class RouterRMAdminService extends AbstractService
return interceptorClassNames; return interceptorClassNames;
} }
private RequestInterceptorChainWrapper getInterceptorChain() @VisibleForTesting
protected RequestInterceptorChainWrapper getInterceptorChain()
throws IOException { throws IOException {
String user = UserGroupInformation.getCurrentUser().getUserName(); String user = UserGroupInformation.getCurrentUser().getUserName();
if (!userPipelineMap.containsKey(user)) { RequestInterceptorChainWrapper chain = userPipelineMap.get(user);
initializePipeline(user); if (chain != null && chain.getRootInterceptor() != null) {
return chain;
} }
return userPipelineMap.get(user); return initializePipeline(user);
} }
/** /**
@ -239,35 +241,32 @@ public class RouterRMAdminService extends AbstractService
* *
* @param user * @param user
*/ */
private void initializePipeline(String user) { private RequestInterceptorChainWrapper initializePipeline(String user) {
RequestInterceptorChainWrapper chainWrapper = null;
synchronized (this.userPipelineMap) { synchronized (this.userPipelineMap) {
if (this.userPipelineMap.containsKey(user)) { if (this.userPipelineMap.containsKey(user)) {
LOG.info("Request to start an already existing user: {}" LOG.info("Request to start an already existing user: {}"
+ " was received, so ignoring.", user); + " was received, so ignoring.", user);
return; return userPipelineMap.get(user);
}
RequestInterceptorChainWrapper chainWrapper =
new RequestInterceptorChainWrapper();
try {
// We should init the pipeline instance after it is created and then
// add to the map, to ensure thread safe.
LOG.info("Initializing request processing pipeline for user: {}", user);
RMAdminRequestInterceptor interceptorChain =
this.createRequestInterceptorChain();
interceptorChain.init(user);
chainWrapper.init(interceptorChain);
} catch (Exception e) {
LOG.error("Init RMAdminRequestInterceptor error for user: " + user, e);
throw e;
} }
chainWrapper = new RequestInterceptorChainWrapper();
this.userPipelineMap.put(user, chainWrapper); this.userPipelineMap.put(user, chainWrapper);
} return chainWrapper;
// We register the pipeline instance in the map first and then initialize it
// later because chain initialization can be expensive and we would like to
// release the lock as soon as possible to prevent other applications from
// blocking when one application's chain is initializing
LOG.info("Initializing request processing pipeline for the user: {}", user);
try {
RMAdminRequestInterceptor interceptorChain =
this.createRequestInterceptorChain();
interceptorChain.init(user);
chainWrapper.init(interceptorChain);
} catch (Exception e) {
synchronized (this.userPipelineMap) {
this.userPipelineMap.remove(user);
}
throw e;
} }
} }

View File

@ -173,10 +173,11 @@ public class RouterWebServices implements RMWebServiceProtocol {
} catch (IOException e) { } catch (IOException e) {
LOG.error("Cannot get user: {}", e.getMessage()); LOG.error("Cannot get user: {}", e.getMessage());
} }
if (!userPipelineMap.containsKey(user)) { RequestInterceptorChainWrapper chain = userPipelineMap.get(user);
initializePipeline(user); if (chain != null && chain.getRootInterceptor() != null) {
return chain;
} }
return userPipelineMap.get(user); return initializePipeline(user);
} }
/** /**
@ -242,35 +243,32 @@ public class RouterWebServices implements RMWebServiceProtocol {
* *
* @param user * @param user
*/ */
private void initializePipeline(String user) { private RequestInterceptorChainWrapper initializePipeline(String user) {
RequestInterceptorChainWrapper chainWrapper = null;
synchronized (this.userPipelineMap) { synchronized (this.userPipelineMap) {
if (this.userPipelineMap.containsKey(user)) { if (this.userPipelineMap.containsKey(user)) {
LOG.info("Request to start an already existing user: {}" LOG.info("Request to start an already existing user: {}"
+ " was received, so ignoring.", user); + " was received, so ignoring.", user);
return; return userPipelineMap.get(user);
}
RequestInterceptorChainWrapper chainWrapper =
new RequestInterceptorChainWrapper();
try {
// We should init the pipeline instance after it is created and then
// add to the map, to ensure thread safe.
LOG.info("Initializing request processing pipeline for user: {}", user);
RESTRequestInterceptor interceptorChain =
this.createRequestInterceptorChain();
interceptorChain.init(user);
chainWrapper.init(interceptorChain);
} catch (Exception e) {
LOG.error("Init RESTRequestInterceptor error for user: " + user, e);
throw e;
} }
chainWrapper = new RequestInterceptorChainWrapper();
this.userPipelineMap.put(user, chainWrapper); this.userPipelineMap.put(user, chainWrapper);
} return chainWrapper;
// We register the pipeline instance in the map first and then initialize it
// later because chain initialization can be expensive and we would like to
// release the lock as soon as possible to prevent other applications from
// blocking when one application's chain is initializing
LOG.info("Initializing request processing pipeline for the user: {}", user);
try {
RESTRequestInterceptor interceptorChain =
this.createRequestInterceptorChain();
interceptorChain.init(user);
chainWrapper.init(interceptorChain);
} catch (Exception e) {
synchronized (this.userPipelineMap) {
this.userPipelineMap.remove(user);
}
throw e;
} }
} }

View File

@ -19,8 +19,10 @@
package org.apache.hadoop.yarn.server.router.clientrm; package org.apache.hadoop.yarn.server.router.clientrm;
import java.io.IOException; import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.Map; import java.util.Map;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse;
@ -207,4 +209,62 @@ public class TestRouterClientRMService extends BaseRouterClientRMTest {
Assert.assertNull("test2 should have been evicted", chain); Assert.assertNull("test2 should have been evicted", chain);
} }
/**
* This test validates if the ClientRequestInterceptor chain for the user
* can build and init correctly when a multi-client process begins to
* request RouterClientRMService for the same user simultaneously.
*/
@Test
public void testClientPipelineConcurrent() throws InterruptedException {
final String user = "test1";
/*
* ClientTestThread is a thread to simulate a client request to get a
* ClientRequestInterceptor for the user.
*/
class ClientTestThread extends Thread {
private ClientRequestInterceptor interceptor;
@Override public void run() {
try {
interceptor = pipeline();
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
}
private ClientRequestInterceptor pipeline()
throws IOException, InterruptedException {
return UserGroupInformation.createRemoteUser(user).doAs(
new PrivilegedExceptionAction<ClientRequestInterceptor>() {
@Override
public ClientRequestInterceptor run() throws Exception {
RequestInterceptorChainWrapper wrapper =
getRouterClientRMService().getInterceptorChain();
ClientRequestInterceptor interceptor =
wrapper.getRootInterceptor();
Assert.assertNotNull(interceptor);
LOG.info("init client interceptor success for user " + user);
return interceptor;
}
});
}
}
/*
* We start the first thread. It should not finish initing a chainWrapper
* before the other thread starts. In this way, the second thread can
* init at the same time of the first one. In the end, we validate that
* the 2 threads get the same chainWrapper without going into error.
*/
ClientTestThread client1 = new ClientTestThread();
ClientTestThread client2 = new ClientTestThread();
client1.start();
client2.start();
client1.join();
client2.join();
Assert.assertNotNull(client1.interceptor);
Assert.assertNotNull(client2.interceptor);
Assert.assertTrue(client1.interceptor == client2.interceptor);
}
} }

View File

@ -19,8 +19,10 @@
package org.apache.hadoop.yarn.server.router.rmadmin; package org.apache.hadoop.yarn.server.router.rmadmin;
import java.io.IOException; import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.Map; import java.util.Map;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesResponse;
@ -216,4 +218,62 @@ public class TestRouterRMAdminService extends BaseRouterRMAdminTest {
Assert.assertNull("test2 should have been evicted", chain); Assert.assertNull("test2 should have been evicted", chain);
} }
/**
* This test validates if the RMAdminRequestInterceptor chain for the user
* can build and init correctly when a multi-client process begins to
* request RouterRMAdminService for the same user simultaneously.
*/
@Test
public void testRMAdminPipelineConcurrent() throws InterruptedException {
final String user = "test1";
/*
* ClientTestThread is a thread to simulate a client request to get a
* RMAdminRequestInterceptor for the user.
*/
class ClientTestThread extends Thread {
private RMAdminRequestInterceptor interceptor;
@Override public void run() {
try {
interceptor = pipeline();
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
}
private RMAdminRequestInterceptor pipeline()
throws IOException, InterruptedException {
return UserGroupInformation.createRemoteUser(user).doAs(
new PrivilegedExceptionAction<RMAdminRequestInterceptor>() {
@Override
public RMAdminRequestInterceptor run() throws Exception {
RequestInterceptorChainWrapper wrapper =
getRouterRMAdminService().getInterceptorChain();
RMAdminRequestInterceptor interceptor =
wrapper.getRootInterceptor();
Assert.assertNotNull(interceptor);
LOG.info("init rm admin interceptor success for user" + user);
return interceptor;
}
});
}
}
/*
* We start the first thread. It should not finish initing a chainWrapper
* before the other thread starts. In this way, the second thread can
* init at the same time of the first one. In the end, we validate that
* the 2 threads get the same chainWrapper without going into error.
*/
ClientTestThread client1 = new ClientTestThread();
ClientTestThread client2 = new ClientTestThread();
client1.start();
client2.start();
client1.join();
client2.join();
Assert.assertNotNull(client1.interceptor);
Assert.assertNotNull(client2.interceptor);
Assert.assertTrue(client1.interceptor == client2.interceptor);
}
} }

View File

@ -19,10 +19,12 @@
package org.apache.hadoop.yarn.server.router.webapp; package org.apache.hadoop.yarn.server.router.webapp;
import java.io.IOException; import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.Map; import java.util.Map;
import javax.ws.rs.core.Response; import javax.ws.rs.core.Response;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ActivitiesInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ActivitiesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppActivitiesInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppActivitiesInfo;
@ -49,12 +51,17 @@ import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo; import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
* Test class to validate the WebService interceptor model inside the Router. * Test class to validate the WebService interceptor model inside the Router.
*/ */
public class TestRouterWebServices extends BaseRouterWebServicesTest { public class TestRouterWebServices extends BaseRouterWebServicesTest {
private static final Logger LOG =
LoggerFactory.getLogger(TestRouterWebServices.class);
private String user = "test1"; private String user = "test1";
/** /**
@ -266,4 +273,62 @@ public class TestRouterWebServices extends BaseRouterWebServicesTest {
Assert.assertNull("test2 should have been evicted", chain); Assert.assertNull("test2 should have been evicted", chain);
} }
/**
* This test validates if the RESTRequestInterceptor chain for the user
* can build and init correctly when a multi-client process begins to
* request RouterWebServices for the same user simultaneously.
*/
@Test
public void testWebPipelineConcurrent() throws InterruptedException {
final String user = "test1";
/*
* ClientTestThread is a thread to simulate a client request to get a
* RESTRequestInterceptor for the user.
*/
class ClientTestThread extends Thread {
private RESTRequestInterceptor interceptor;
@Override public void run() {
try {
interceptor = pipeline();
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
}
private RESTRequestInterceptor pipeline()
throws IOException, InterruptedException {
return UserGroupInformation.createRemoteUser(user).doAs(
new PrivilegedExceptionAction<RESTRequestInterceptor>() {
@Override
public RESTRequestInterceptor run() throws Exception {
RequestInterceptorChainWrapper wrapper =
getInterceptorChain(user);
RESTRequestInterceptor interceptor =
wrapper.getRootInterceptor();
Assert.assertNotNull(interceptor);
LOG.info("init web interceptor success for user" + user);
return interceptor;
}
});
}
}
/*
* We start the first thread. It should not finish initing a chainWrapper
* before the other thread starts. In this way, the second thread can
* init at the same time of the first one. In the end, we validate that
* the 2 threads get the same chainWrapper without going into error.
*/
ClientTestThread client1 = new ClientTestThread();
ClientTestThread client2 = new ClientTestThread();
client1.start();
client2.start();
client1.join();
client2.join();
Assert.assertNotNull(client1.interceptor);
Assert.assertNotNull(client2.interceptor);
Assert.assertTrue(client1.interceptor == client2.interceptor);
}
} }