YARN-2795. Fixed ResourceManager to not crash loading node-label data from HDFS in secure mode. Contributed by Wangda Tan.
This commit is contained in:
parent
237890feab
commit
ec6cbece8e
|
@ -849,6 +849,9 @@ Release 2.6.0 - UNRELEASED
|
||||||
that were caused when adding log-upload-time via YARN-2703. (Xuan Gong via
|
that were caused when adding log-upload-time via YARN-2703. (Xuan Gong via
|
||||||
vinodkv)
|
vinodkv)
|
||||||
|
|
||||||
|
YARN-2795. Fixed ResourceManager to not crash loading node-label data from
|
||||||
|
HDFS in secure mode. (Wangda Tan via vinodkv)
|
||||||
|
|
||||||
Release 2.5.1 - 2014-09-05
|
Release 2.5.1 - 2014-09-05
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -23,9 +23,7 @@ import java.io.InputStream;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.security.PrivilegedExceptionAction;
|
import java.security.PrivilegedExceptionAction;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
|
||||||
import java.util.concurrent.BlockingQueue;
|
import java.util.concurrent.BlockingQueue;
|
||||||
import java.util.concurrent.LinkedBlockingQueue;
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
|
|
||||||
|
@ -195,7 +193,17 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
||||||
protected void serviceInit(Configuration conf) throws Exception {
|
protected void serviceInit(Configuration conf) throws Exception {
|
||||||
this.conf = conf;
|
this.conf = conf;
|
||||||
this.rmContext = new RMContextImpl();
|
this.rmContext = new RMContextImpl();
|
||||||
|
|
||||||
|
// Set UGI and do login
|
||||||
|
// If security is enabled, use login user
|
||||||
|
// If security is not enabled, use current user
|
||||||
|
this.rmLoginUGI = UserGroupInformation.getCurrentUser();
|
||||||
|
try {
|
||||||
|
doSecureLogin();
|
||||||
|
} catch(IOException ie) {
|
||||||
|
throw new YarnRuntimeException("Failed to login", ie);
|
||||||
|
}
|
||||||
|
|
||||||
this.configurationProvider =
|
this.configurationProvider =
|
||||||
ConfigurationProviderFactory.getConfigurationProvider(conf);
|
ConfigurationProviderFactory.getConfigurationProvider(conf);
|
||||||
this.configurationProvider.init(this.conf);
|
this.configurationProvider.init(this.conf);
|
||||||
|
@ -242,14 +250,13 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
||||||
if (this.rmContext.isHAEnabled()) {
|
if (this.rmContext.isHAEnabled()) {
|
||||||
HAUtil.verifyAndSetConfiguration(this.conf);
|
HAUtil.verifyAndSetConfiguration(this.conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
createAndInitActiveServices();
|
createAndInitActiveServices();
|
||||||
|
|
||||||
webAppAddress = WebAppUtils.getWebAppBindURL(this.conf,
|
webAppAddress = WebAppUtils.getWebAppBindURL(this.conf,
|
||||||
YarnConfiguration.RM_BIND_HOST,
|
YarnConfiguration.RM_BIND_HOST,
|
||||||
WebAppUtils.getRMWebAppURLWithoutScheme(this.conf));
|
WebAppUtils.getRMWebAppURLWithoutScheme(this.conf));
|
||||||
|
|
||||||
this.rmLoginUGI = UserGroupInformation.getCurrentUser();
|
|
||||||
|
|
||||||
super.serviceInit(this.conf);
|
super.serviceInit(this.conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1019,17 +1026,13 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
||||||
}
|
}
|
||||||
|
|
||||||
synchronized void transitionToActive() throws Exception {
|
synchronized void transitionToActive() throws Exception {
|
||||||
if (rmContext.getHAServiceState() ==
|
if (rmContext.getHAServiceState() == HAServiceProtocol.HAServiceState.ACTIVE) {
|
||||||
HAServiceProtocol.HAServiceState.ACTIVE) {
|
|
||||||
LOG.info("Already in active state");
|
LOG.info("Already in active state");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG.info("Transitioning to active state");
|
LOG.info("Transitioning to active state");
|
||||||
|
|
||||||
// use rmLoginUGI to startActiveServices.
|
|
||||||
// in non-secure model, rmLoginUGI will be current UGI
|
|
||||||
// in secure model, rmLoginUGI will be LoginUser UGI
|
|
||||||
this.rmLoginUGI.doAs(new PrivilegedExceptionAction<Void>() {
|
this.rmLoginUGI.doAs(new PrivilegedExceptionAction<Void>() {
|
||||||
@Override
|
@Override
|
||||||
public Void run() throws Exception {
|
public Void run() throws Exception {
|
||||||
|
@ -1071,12 +1074,6 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void serviceStart() throws Exception {
|
protected void serviceStart() throws Exception {
|
||||||
try {
|
|
||||||
doSecureLogin();
|
|
||||||
} catch(IOException ie) {
|
|
||||||
throw new YarnRuntimeException("Failed to login", ie);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (this.rmContext.isHAEnabled()) {
|
if (this.rmContext.isHAEnabled()) {
|
||||||
transitionToStandby(true);
|
transitionToStandby(true);
|
||||||
} else {
|
} else {
|
||||||
|
@ -1084,7 +1081,8 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
||||||
}
|
}
|
||||||
|
|
||||||
startWepApp();
|
startWepApp();
|
||||||
if (getConfig().getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) {
|
if (getConfig().getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER,
|
||||||
|
false)) {
|
||||||
int port = webApp.port();
|
int port = webApp.port();
|
||||||
WebAppUtils.setRMWebAppPort(conf, port);
|
WebAppUtils.setRMWebAppPort(conf, port);
|
||||||
}
|
}
|
||||||
|
|
|
@ -28,8 +28,6 @@ import java.net.InetSocketAddress;
|
||||||
|
|
||||||
import javax.ws.rs.core.MediaType;
|
import javax.ws.rs.core.MediaType;
|
||||||
|
|
||||||
import org.junit.Assert;
|
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
@ -37,10 +35,10 @@ import org.apache.hadoop.ha.HAServiceProtocol;
|
||||||
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
||||||
import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
|
import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
|
||||||
import org.apache.hadoop.ha.HealthCheckFailedException;
|
import org.apache.hadoop.ha.HealthCheckFailedException;
|
||||||
import org.apache.hadoop.metrics2.MetricsSystem;
|
|
||||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||||
import org.apache.hadoop.net.NetUtils;
|
import org.apache.hadoop.net.NetUtils;
|
||||||
import org.apache.hadoop.security.AccessControlException;
|
import org.apache.hadoop.security.AccessControlException;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.service.AbstractService;
|
import org.apache.hadoop.service.AbstractService;
|
||||||
import org.apache.hadoop.yarn.conf.HAUtil;
|
import org.apache.hadoop.yarn.conf.HAUtil;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
@ -54,6 +52,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptS
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
||||||
import org.codehaus.jettison.json.JSONException;
|
import org.codehaus.jettison.json.JSONException;
|
||||||
import org.codehaus.jettison.json.JSONObject;
|
import org.codehaus.jettison.json.JSONObject;
|
||||||
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
@ -64,7 +63,7 @@ import com.sun.jersey.api.client.config.DefaultClientConfig;
|
||||||
|
|
||||||
public class TestRMHA {
|
public class TestRMHA {
|
||||||
private Log LOG = LogFactory.getLog(TestRMHA.class);
|
private Log LOG = LogFactory.getLog(TestRMHA.class);
|
||||||
private final Configuration configuration = new YarnConfiguration();
|
private Configuration configuration;
|
||||||
private MockRM rm = null;
|
private MockRM rm = null;
|
||||||
private RMApp app = null;
|
private RMApp app = null;
|
||||||
private RMAppAttempt attempt = null;
|
private RMAppAttempt attempt = null;
|
||||||
|
@ -82,6 +81,8 @@ public class TestRMHA {
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setUp() throws Exception {
|
public void setUp() throws Exception {
|
||||||
|
configuration = new Configuration();
|
||||||
|
UserGroupInformation.setConfiguration(configuration);
|
||||||
configuration.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
|
configuration.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
|
||||||
configuration.set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID + ","
|
configuration.set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID + ","
|
||||||
+ RM2_NODE_ID);
|
+ RM2_NODE_ID);
|
||||||
|
|
|
@ -57,6 +57,7 @@ public class TestResourceManager {
|
||||||
@Before
|
@Before
|
||||||
public void setUp() throws Exception {
|
public void setUp() throws Exception {
|
||||||
Configuration conf = new YarnConfiguration();
|
Configuration conf = new YarnConfiguration();
|
||||||
|
UserGroupInformation.setConfiguration(conf);
|
||||||
resourceManager = new ResourceManager();
|
resourceManager = new ResourceManager();
|
||||||
resourceManager.init(conf);
|
resourceManager.init(conf);
|
||||||
resourceManager.getRMContext().getContainerTokenSecretManager().rollMasterKey();
|
resourceManager.getRMContext().getContainerTokenSecretManager().rollMasterKey();
|
||||||
|
@ -254,7 +255,12 @@ public class TestResourceManager {
|
||||||
AuthenticationFilterInitializer.class.getName() + ", "
|
AuthenticationFilterInitializer.class.getName() + ", "
|
||||||
+ this.getClass().getName() };
|
+ this.getClass().getName() };
|
||||||
for (String filterInitializer : filterInitializers) {
|
for (String filterInitializer : filterInitializers) {
|
||||||
resourceManager = new ResourceManager();
|
resourceManager = new ResourceManager() {
|
||||||
|
@Override
|
||||||
|
protected void doSecureLogin() throws IOException {
|
||||||
|
// Skip the login.
|
||||||
|
}
|
||||||
|
};
|
||||||
Configuration conf = new YarnConfiguration();
|
Configuration conf = new YarnConfiguration();
|
||||||
conf.set(filterInitializerConfKey, filterInitializer);
|
conf.set(filterInitializerConfKey, filterInitializer);
|
||||||
conf.set("hadoop.security.authentication", "kerberos");
|
conf.set("hadoop.security.authentication", "kerberos");
|
||||||
|
|
|
@ -808,7 +808,12 @@ public class TestDelegationTokenRenewer {
|
||||||
CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
|
CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
|
||||||
"kerberos");
|
"kerberos");
|
||||||
UserGroupInformation.setConfiguration(conf);
|
UserGroupInformation.setConfiguration(conf);
|
||||||
MockRM rm = new MockRM(conf);
|
MockRM rm = new MockRM(conf) {
|
||||||
|
@Override
|
||||||
|
protected void doSecureLogin() throws IOException {
|
||||||
|
// Skip the login.
|
||||||
|
}
|
||||||
|
};
|
||||||
ByteBuffer tokens = ByteBuffer.wrap("BOGUS".getBytes());
|
ByteBuffer tokens = ByteBuffer.wrap("BOGUS".getBytes());
|
||||||
ContainerLaunchContext amContainer =
|
ContainerLaunchContext amContainer =
|
||||||
ContainerLaunchContext.newInstance(
|
ContainerLaunchContext.newInstance(
|
||||||
|
|
Loading…
Reference in New Issue