YARN-3749. We should make a copy of configuration when init
MiniYARNCluster with multiple RMs. Contributed by Chun Chen
(cherry picked from commit 5766a04428
)
This commit is contained in:
parent
25c1e54d3f
commit
3f0573b059
|
@ -433,6 +433,9 @@ Release 2.8.0 - UNRELEASED
|
||||||
|
|
||||||
YARN-3762. FairScheduler: CME on FSParentQueue#getQueueUserAclInfo. (kasha)
|
YARN-3762. FairScheduler: CME on FSParentQueue#getQueueUserAclInfo. (kasha)
|
||||||
|
|
||||||
|
YARN-3749. We should make a copy of configuration when init MiniYARNCluster
|
||||||
|
with multiple RMs. (Chun Chen via xgong)
|
||||||
|
|
||||||
Release 2.7.1 - UNRELEASED
|
Release 2.7.1 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -1967,7 +1967,7 @@ public class YarnConfiguration extends Configuration {
|
||||||
public InetSocketAddress updateConnectAddr(String name,
|
public InetSocketAddress updateConnectAddr(String name,
|
||||||
InetSocketAddress addr) {
|
InetSocketAddress addr) {
|
||||||
String prefix = name;
|
String prefix = name;
|
||||||
if (HAUtil.isHAEnabled(this)) {
|
if (HAUtil.isHAEnabled(this) && getServiceAddressConfKeys(this).contains(name)) {
|
||||||
prefix = HAUtil.addSuffix(prefix, HAUtil.getRMHAId(this));
|
prefix = HAUtil.addSuffix(prefix, HAUtil.getRMHAId(this));
|
||||||
}
|
}
|
||||||
return super.updateConnectAddr(prefix, addr);
|
return super.updateConnectAddr(prefix, addr);
|
||||||
|
|
|
@ -36,6 +36,7 @@ import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.HATestUtil;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
@ -101,7 +102,6 @@ import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
|
||||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||||
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
|
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
|
||||||
import org.apache.hadoop.yarn.client.api.YarnClient;
|
import org.apache.hadoop.yarn.client.api.YarnClient;
|
||||||
import org.apache.hadoop.yarn.conf.HAUtil;
|
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.server.MiniYARNCluster;
|
import org.apache.hadoop.yarn.server.MiniYARNCluster;
|
||||||
|
@ -161,26 +161,6 @@ public abstract class ProtocolHATestBase extends ClientBaseWithFixes {
|
||||||
protected Thread failoverThread = null;
|
protected Thread failoverThread = null;
|
||||||
private volatile boolean keepRunning;
|
private volatile boolean keepRunning;
|
||||||
|
|
||||||
private void setConfForRM(String rmId, String prefix, String value) {
|
|
||||||
conf.set(HAUtil.addSuffix(prefix, rmId), value);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void setRpcAddressForRM(String rmId, int base) {
|
|
||||||
setConfForRM(rmId, YarnConfiguration.RM_ADDRESS, "0.0.0.0:" +
|
|
||||||
(base + YarnConfiguration.DEFAULT_RM_PORT));
|
|
||||||
setConfForRM(rmId, YarnConfiguration.RM_SCHEDULER_ADDRESS, "0.0.0.0:" +
|
|
||||||
(base + YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT));
|
|
||||||
setConfForRM(rmId, YarnConfiguration.RM_ADMIN_ADDRESS, "0.0.0.0:" +
|
|
||||||
(base + YarnConfiguration.DEFAULT_RM_ADMIN_PORT));
|
|
||||||
setConfForRM(rmId, YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
|
|
||||||
"0.0.0.0:" + (base + YarnConfiguration
|
|
||||||
.DEFAULT_RM_RESOURCE_TRACKER_PORT));
|
|
||||||
setConfForRM(rmId, YarnConfiguration.RM_WEBAPP_ADDRESS, "0.0.0.0:" +
|
|
||||||
(base + YarnConfiguration.DEFAULT_RM_WEBAPP_PORT));
|
|
||||||
setConfForRM(rmId, YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS, "0.0.0.0:" +
|
|
||||||
(base + YarnConfiguration.DEFAULT_RM_WEBAPP_HTTPS_PORT));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setup() throws IOException {
|
public void setup() throws IOException {
|
||||||
failoverThread = null;
|
failoverThread = null;
|
||||||
|
@ -189,8 +169,8 @@ public abstract class ProtocolHATestBase extends ClientBaseWithFixes {
|
||||||
conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
|
conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
|
||||||
conf.setInt(YarnConfiguration.CLIENT_FAILOVER_MAX_ATTEMPTS, 5);
|
conf.setInt(YarnConfiguration.CLIENT_FAILOVER_MAX_ATTEMPTS, 5);
|
||||||
conf.set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID + "," + RM2_NODE_ID);
|
conf.set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID + "," + RM2_NODE_ID);
|
||||||
setRpcAddressForRM(RM1_NODE_ID, RM1_PORT_BASE);
|
HATestUtil.setRpcAddressForRM(RM1_NODE_ID, RM1_PORT_BASE, conf);
|
||||||
setRpcAddressForRM(RM2_NODE_ID, RM2_PORT_BASE);
|
HATestUtil.setRpcAddressForRM(RM2_NODE_ID, RM2_PORT_BASE, conf);
|
||||||
|
|
||||||
conf.setLong(YarnConfiguration.CLIENT_FAILOVER_SLEEPTIME_BASE_MS, 100L);
|
conf.setLong(YarnConfiguration.CLIENT_FAILOVER_SLEEPTIME_BASE_MS, 100L);
|
||||||
|
|
||||||
|
|
|
@ -54,18 +54,18 @@ public class TestApplicationMasterServiceProtocolOnHA
|
||||||
public void initialize() throws Exception {
|
public void initialize() throws Exception {
|
||||||
startHACluster(0, false, false, true);
|
startHACluster(0, false, false, true);
|
||||||
attemptId = this.cluster.createFakeApplicationAttemptId();
|
attemptId = this.cluster.createFakeApplicationAttemptId();
|
||||||
amClient = ClientRMProxy
|
|
||||||
.createRMProxy(this.conf, ApplicationMasterProtocol.class);
|
|
||||||
|
|
||||||
Token<AMRMTokenIdentifier> appToken =
|
Token<AMRMTokenIdentifier> appToken =
|
||||||
this.cluster.getResourceManager().getRMContext()
|
this.cluster.getResourceManager().getRMContext()
|
||||||
.getAMRMTokenSecretManager().createAndGetAMRMToken(attemptId);
|
.getAMRMTokenSecretManager().createAndGetAMRMToken(attemptId);
|
||||||
appToken.setService(ClientRMProxy.getAMRMTokenService(conf));
|
appToken.setService(ClientRMProxy.getAMRMTokenService(this.conf));
|
||||||
UserGroupInformation.setLoginUser(UserGroupInformation
|
UserGroupInformation.setLoginUser(UserGroupInformation
|
||||||
.createRemoteUser(UserGroupInformation.getCurrentUser()
|
.createRemoteUser(UserGroupInformation.getCurrentUser().getUserName()));
|
||||||
.getUserName()));
|
|
||||||
UserGroupInformation.getCurrentUser().addToken(appToken);
|
UserGroupInformation.getCurrentUser().addToken(appToken);
|
||||||
syncToken(appToken);
|
syncToken(appToken);
|
||||||
|
|
||||||
|
amClient = ClientRMProxy
|
||||||
|
.createRMProxy(this.conf, ApplicationMasterProtocol.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
@After
|
@After
|
||||||
|
|
|
@ -38,11 +38,11 @@ import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
||||||
import org.apache.hadoop.service.Service.STATE;
|
import org.apache.hadoop.service.Service.STATE;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.client.api.YarnClient;
|
import org.apache.hadoop.yarn.client.api.YarnClient;
|
||||||
import org.apache.hadoop.yarn.conf.HAUtil;
|
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.server.MiniYARNCluster;
|
import org.apache.hadoop.yarn.server.MiniYARNCluster;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.AdminService;
|
import org.apache.hadoop.yarn.server.resourcemanager.AdminService;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.HATestUtil;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||||
import org.apache.hadoop.yarn.server.webproxy.WebAppProxyServer;
|
import org.apache.hadoop.yarn.server.webproxy.WebAppProxyServer;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
|
@ -66,34 +66,14 @@ public class TestRMFailover extends ClientBaseWithFixes {
|
||||||
private MiniYARNCluster cluster;
|
private MiniYARNCluster cluster;
|
||||||
private ApplicationId fakeAppId;
|
private ApplicationId fakeAppId;
|
||||||
|
|
||||||
|
|
||||||
private void setConfForRM(String rmId, String prefix, String value) {
|
|
||||||
conf.set(HAUtil.addSuffix(prefix, rmId), value);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void setRpcAddressForRM(String rmId, int base) {
|
|
||||||
setConfForRM(rmId, YarnConfiguration.RM_ADDRESS, "0.0.0.0:" +
|
|
||||||
(base + YarnConfiguration.DEFAULT_RM_PORT));
|
|
||||||
setConfForRM(rmId, YarnConfiguration.RM_SCHEDULER_ADDRESS, "0.0.0.0:" +
|
|
||||||
(base + YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT));
|
|
||||||
setConfForRM(rmId, YarnConfiguration.RM_ADMIN_ADDRESS, "0.0.0.0:" +
|
|
||||||
(base + YarnConfiguration.DEFAULT_RM_ADMIN_PORT));
|
|
||||||
setConfForRM(rmId, YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, "0.0.0.0:" +
|
|
||||||
(base + YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT));
|
|
||||||
setConfForRM(rmId, YarnConfiguration.RM_WEBAPP_ADDRESS, "0.0.0.0:" +
|
|
||||||
(base + YarnConfiguration.DEFAULT_RM_WEBAPP_PORT));
|
|
||||||
setConfForRM(rmId, YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS, "0.0.0.0:" +
|
|
||||||
(base + YarnConfiguration.DEFAULT_RM_WEBAPP_HTTPS_PORT));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setup() throws IOException {
|
public void setup() throws IOException {
|
||||||
fakeAppId = ApplicationId.newInstance(System.currentTimeMillis(), 0);
|
fakeAppId = ApplicationId.newInstance(System.currentTimeMillis(), 0);
|
||||||
conf = new YarnConfiguration();
|
conf = new YarnConfiguration();
|
||||||
conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
|
conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
|
||||||
conf.set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID + "," + RM2_NODE_ID);
|
conf.set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID + "," + RM2_NODE_ID);
|
||||||
setRpcAddressForRM(RM1_NODE_ID, RM1_PORT_BASE);
|
HATestUtil.setRpcAddressForRM(RM1_NODE_ID, RM1_PORT_BASE, conf);
|
||||||
setRpcAddressForRM(RM2_NODE_ID, RM2_PORT_BASE);
|
HATestUtil.setRpcAddressForRM(RM2_NODE_ID, RM2_PORT_BASE, conf);
|
||||||
|
|
||||||
conf.setLong(YarnConfiguration.CLIENT_FAILOVER_SLEEPTIME_BASE_MS, 100L);
|
conf.setLong(YarnConfiguration.CLIENT_FAILOVER_SLEEPTIME_BASE_MS, 100L);
|
||||||
|
|
||||||
|
|
|
@ -27,6 +27,7 @@ import java.net.InetSocketAddress;
|
||||||
import java.net.SocketAddress;
|
import java.net.SocketAddress;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertNull;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
|
|
||||||
|
@ -202,5 +203,26 @@ public class TestYarnConfiguration {
|
||||||
serverAddress);
|
serverAddress);
|
||||||
|
|
||||||
assertTrue(resourceTrackerConnectAddress.toString().startsWith("yo.yo.yo"));
|
assertTrue(resourceTrackerConnectAddress.toString().startsWith("yo.yo.yo"));
|
||||||
|
|
||||||
|
//tests updateConnectAddr won't add suffix to NM service address configurations
|
||||||
|
conf = new YarnConfiguration();
|
||||||
|
conf.set(YarnConfiguration.NM_LOCALIZER_ADDRESS, "yo.yo.yo");
|
||||||
|
conf.set(YarnConfiguration.NM_BIND_HOST, "0.0.0.0");
|
||||||
|
conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
|
||||||
|
conf.set(YarnConfiguration.RM_HA_ID, "rm1");
|
||||||
|
|
||||||
|
serverAddress = new InetSocketAddress(
|
||||||
|
YarnConfiguration.DEFAULT_NM_LOCALIZER_ADDRESS.split(":")[0],
|
||||||
|
Integer.valueOf(YarnConfiguration.DEFAULT_NM_LOCALIZER_ADDRESS.split(":")[1]));
|
||||||
|
|
||||||
|
InetSocketAddress localizerAddress = conf.updateConnectAddr(
|
||||||
|
YarnConfiguration.NM_BIND_HOST,
|
||||||
|
YarnConfiguration.NM_LOCALIZER_ADDRESS,
|
||||||
|
YarnConfiguration.DEFAULT_NM_LOCALIZER_ADDRESS,
|
||||||
|
serverAddress);
|
||||||
|
|
||||||
|
assertTrue(localizerAddress.toString().startsWith("yo.yo.yo"));
|
||||||
|
assertNull(conf.get(
|
||||||
|
HAUtil.addSuffix(YarnConfiguration.NM_LOCALIZER_ADDRESS, "rm1")));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -107,7 +107,7 @@ public class ApplicationMasterService extends AbstractService implements
|
||||||
private static final Log LOG = LogFactory.getLog(ApplicationMasterService.class);
|
private static final Log LOG = LogFactory.getLog(ApplicationMasterService.class);
|
||||||
private final AMLivelinessMonitor amLivelinessMonitor;
|
private final AMLivelinessMonitor amLivelinessMonitor;
|
||||||
private YarnScheduler rScheduler;
|
private YarnScheduler rScheduler;
|
||||||
private InetSocketAddress bindAddress;
|
private InetSocketAddress masterServiceAddress;
|
||||||
private Server server;
|
private Server server;
|
||||||
private final RecordFactory recordFactory =
|
private final RecordFactory recordFactory =
|
||||||
RecordFactoryProvider.getRecordFactory(null);
|
RecordFactoryProvider.getRecordFactory(null);
|
||||||
|
@ -123,15 +123,18 @@ public class ApplicationMasterService extends AbstractService implements
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void serviceStart() throws Exception {
|
protected void serviceInit(Configuration conf) throws Exception {
|
||||||
Configuration conf = getConfig();
|
masterServiceAddress = conf.getSocketAddr(
|
||||||
YarnRPC rpc = YarnRPC.create(conf);
|
|
||||||
|
|
||||||
InetSocketAddress masterServiceAddress = conf.getSocketAddr(
|
|
||||||
YarnConfiguration.RM_BIND_HOST,
|
YarnConfiguration.RM_BIND_HOST,
|
||||||
YarnConfiguration.RM_SCHEDULER_ADDRESS,
|
YarnConfiguration.RM_SCHEDULER_ADDRESS,
|
||||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
|
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
|
||||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
|
YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void serviceStart() throws Exception {
|
||||||
|
Configuration conf = getConfig();
|
||||||
|
YarnRPC rpc = YarnRPC.create(conf);
|
||||||
|
|
||||||
Configuration serverConf = conf;
|
Configuration serverConf = conf;
|
||||||
// If the auth is not-simple, enforce it to be token-based.
|
// If the auth is not-simple, enforce it to be token-based.
|
||||||
|
@ -160,7 +163,7 @@ public class ApplicationMasterService extends AbstractService implements
|
||||||
}
|
}
|
||||||
|
|
||||||
this.server.start();
|
this.server.start();
|
||||||
this.bindAddress =
|
this.masterServiceAddress =
|
||||||
conf.updateConnectAddr(YarnConfiguration.RM_BIND_HOST,
|
conf.updateConnectAddr(YarnConfiguration.RM_BIND_HOST,
|
||||||
YarnConfiguration.RM_SCHEDULER_ADDRESS,
|
YarnConfiguration.RM_SCHEDULER_ADDRESS,
|
||||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
|
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
|
||||||
|
@ -170,7 +173,7 @@ public class ApplicationMasterService extends AbstractService implements
|
||||||
|
|
||||||
@Private
|
@Private
|
||||||
public InetSocketAddress getBindAddress() {
|
public InetSocketAddress getBindAddress() {
|
||||||
return this.bindAddress;
|
return this.masterServiceAddress;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Obtain the needed AMRMTokenIdentifier from the remote-UGI. RPC layer
|
// Obtain the needed AMRMTokenIdentifier from the remote-UGI. RPC layer
|
||||||
|
|
|
@ -0,0 +1,38 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.yarn.conf.HAUtil;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
|
||||||
|
public class HATestUtil {
|
||||||
|
|
||||||
|
public static void setRpcAddressForRM(String rmId, int base,
|
||||||
|
Configuration conf) {
|
||||||
|
for (String confKey : YarnConfiguration.getServiceAddressConfKeys(conf)) {
|
||||||
|
setConfForRM(rmId, confKey, "0.0.0.0:" + (base +
|
||||||
|
YarnConfiguration.getRMDefaultPortNumber(confKey, conf)), conf);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void setConfForRM(String rmId, String prefix, String value,
|
||||||
|
Configuration conf) {
|
||||||
|
conf.set(HAUtil.addSuffix(prefix, rmId), value);
|
||||||
|
}
|
||||||
|
}
|
|
@ -22,7 +22,6 @@ import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.ha.ClientBaseWithFixes;
|
import org.apache.hadoop.ha.ClientBaseWithFixes;
|
||||||
import org.apache.hadoop.ha.ServiceFailedException;
|
import org.apache.hadoop.ha.ServiceFailedException;
|
||||||
import org.apache.hadoop.yarn.conf.HAUtil;
|
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
@ -42,25 +41,6 @@ public class TestRMEmbeddedElector extends ClientBaseWithFixes {
|
||||||
private Configuration conf;
|
private Configuration conf;
|
||||||
private AtomicBoolean callbackCalled;
|
private AtomicBoolean callbackCalled;
|
||||||
|
|
||||||
private void setConfForRM(String rmId, String prefix, String value) {
|
|
||||||
conf.set(HAUtil.addSuffix(prefix, rmId), value);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void setRpcAddressForRM(String rmId, int base) {
|
|
||||||
setConfForRM(rmId, YarnConfiguration.RM_ADDRESS, "0.0.0.0:" +
|
|
||||||
(base + YarnConfiguration.DEFAULT_RM_PORT));
|
|
||||||
setConfForRM(rmId, YarnConfiguration.RM_SCHEDULER_ADDRESS, "0.0.0.0:" +
|
|
||||||
(base + YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT));
|
|
||||||
setConfForRM(rmId, YarnConfiguration.RM_ADMIN_ADDRESS, "0.0.0.0:" +
|
|
||||||
(base + YarnConfiguration.DEFAULT_RM_ADMIN_PORT));
|
|
||||||
setConfForRM(rmId, YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, "0.0.0.0:" +
|
|
||||||
(base + YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT));
|
|
||||||
setConfForRM(rmId, YarnConfiguration.RM_WEBAPP_ADDRESS, "0.0.0.0:" +
|
|
||||||
(base + YarnConfiguration.DEFAULT_RM_WEBAPP_PORT));
|
|
||||||
setConfForRM(rmId, YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS, "0.0.0.0:" +
|
|
||||||
(base + YarnConfiguration.DEFAULT_RM_WEBAPP_HTTPS_PORT));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setup() throws IOException {
|
public void setup() throws IOException {
|
||||||
conf = new YarnConfiguration();
|
conf = new YarnConfiguration();
|
||||||
|
@ -73,8 +53,8 @@ public class TestRMEmbeddedElector extends ClientBaseWithFixes {
|
||||||
|
|
||||||
conf.set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID + "," + RM2_NODE_ID);
|
conf.set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID + "," + RM2_NODE_ID);
|
||||||
conf.set(YarnConfiguration.RM_HA_ID, RM1_NODE_ID);
|
conf.set(YarnConfiguration.RM_HA_ID, RM1_NODE_ID);
|
||||||
setRpcAddressForRM(RM1_NODE_ID, RM1_PORT_BASE);
|
HATestUtil.setRpcAddressForRM(RM1_NODE_ID, RM1_PORT_BASE, conf);
|
||||||
setRpcAddressForRM(RM2_NODE_ID, RM2_PORT_BASE);
|
HATestUtil.setRpcAddressForRM(RM2_NODE_ID, RM2_PORT_BASE, conf);
|
||||||
|
|
||||||
conf.setLong(YarnConfiguration.CLIENT_FAILOVER_SLEEPTIME_BASE_MS, 100L);
|
conf.setLong(YarnConfiguration.CLIENT_FAILOVER_SLEEPTIME_BASE_MS, 100L);
|
||||||
|
|
||||||
|
|
|
@ -287,10 +287,12 @@ public class MiniYARNCluster extends CompositeService {
|
||||||
}
|
}
|
||||||
|
|
||||||
private synchronized void initResourceManager(int index, Configuration conf) {
|
private synchronized void initResourceManager(int index, Configuration conf) {
|
||||||
if (HAUtil.isHAEnabled(conf)) {
|
Configuration newConf = resourceManagers.length > 1 ?
|
||||||
conf.set(YarnConfiguration.RM_HA_ID, rmIds[index]);
|
new YarnConfiguration(conf) : conf;
|
||||||
|
if (HAUtil.isHAEnabled(newConf)) {
|
||||||
|
newConf.set(YarnConfiguration.RM_HA_ID, rmIds[index]);
|
||||||
}
|
}
|
||||||
resourceManagers[index].init(conf);
|
resourceManagers[index].init(newConf);
|
||||||
resourceManagers[index].getRMContext().getDispatcher().register(
|
resourceManagers[index].getRMContext().getDispatcher().register(
|
||||||
RMAppAttemptEventType.class,
|
RMAppAttemptEventType.class,
|
||||||
new EventHandler<RMAppAttemptEvent>() {
|
new EventHandler<RMAppAttemptEvent>() {
|
||||||
|
@ -329,10 +331,11 @@ public class MiniYARNCluster extends CompositeService {
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
throw new YarnRuntimeException(t);
|
throw new YarnRuntimeException(t);
|
||||||
}
|
}
|
||||||
|
Configuration conf = resourceManagers[index].getConfig();
|
||||||
LOG.info("MiniYARN ResourceManager address: " +
|
LOG.info("MiniYARN ResourceManager address: " +
|
||||||
getConfig().get(YarnConfiguration.RM_ADDRESS));
|
conf.get(YarnConfiguration.RM_ADDRESS));
|
||||||
LOG.info("MiniYARN ResourceManager web address: " +
|
LOG.info("MiniYARN ResourceManager web address: " +
|
||||||
WebAppUtils.getRMWebAppURLWithoutScheme(getConfig()));
|
WebAppUtils.getRMWebAppURLWithoutScheme(conf));
|
||||||
}
|
}
|
||||||
|
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
|
@ -352,7 +355,6 @@ public class MiniYARNCluster extends CompositeService {
|
||||||
resourceManagers[index].stop();
|
resourceManagers[index].stop();
|
||||||
resourceManagers[index] = null;
|
resourceManagers[index] = null;
|
||||||
}
|
}
|
||||||
Configuration conf = getConfig();
|
|
||||||
resourceManagers[index] = new ResourceManager();
|
resourceManagers[index] = new ResourceManager();
|
||||||
initResourceManager(index, getConfig());
|
initResourceManager(index, getConfig());
|
||||||
startResourceManager(index);
|
startResourceManager(index);
|
||||||
|
@ -433,6 +435,7 @@ public class MiniYARNCluster extends CompositeService {
|
||||||
private class ResourceManagerWrapper extends AbstractService {
|
private class ResourceManagerWrapper extends AbstractService {
|
||||||
private int index;
|
private int index;
|
||||||
|
|
||||||
|
|
||||||
public ResourceManagerWrapper(int i) {
|
public ResourceManagerWrapper(int i) {
|
||||||
super(ResourceManagerWrapper.class.getName() + "_" + i);
|
super(ResourceManagerWrapper.class.getName() + "_" + i);
|
||||||
index = i;
|
index = i;
|
||||||
|
@ -448,10 +451,11 @@ public class MiniYARNCluster extends CompositeService {
|
||||||
@Override
|
@Override
|
||||||
protected synchronized void serviceStart() throws Exception {
|
protected synchronized void serviceStart() throws Exception {
|
||||||
startResourceManager(index);
|
startResourceManager(index);
|
||||||
|
Configuration conf = resourceManagers[index].getConfig();
|
||||||
LOG.info("MiniYARN ResourceManager address: " +
|
LOG.info("MiniYARN ResourceManager address: " +
|
||||||
getConfig().get(YarnConfiguration.RM_ADDRESS));
|
conf.get(YarnConfiguration.RM_ADDRESS));
|
||||||
LOG.info("MiniYARN ResourceManager web address: " +
|
LOG.info("MiniYARN ResourceManager web address: " + WebAppUtils
|
||||||
WebAppUtils.getRMWebAppURLWithoutScheme(getConfig()));
|
.getRMWebAppURLWithoutScheme(conf));
|
||||||
super.serviceStart();
|
super.serviceStart();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -19,7 +19,9 @@
|
||||||
package org.apache.hadoop.yarn.server;
|
package org.apache.hadoop.yarn.server;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.yarn.conf.HAUtil;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.HATestUtil;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
@ -112,4 +114,39 @@ public class TestMiniYarnCluster {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMultiRMConf() {
|
||||||
|
String RM1_NODE_ID = "rm1", RM2_NODE_ID = "rm2";
|
||||||
|
int RM1_PORT_BASE = 10000, RM2_PORT_BASE = 20000;
|
||||||
|
Configuration conf = new YarnConfiguration();
|
||||||
|
conf.set(YarnConfiguration.RM_CLUSTER_ID, "yarn-test-cluster");
|
||||||
|
conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
|
||||||
|
conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
|
||||||
|
conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
|
||||||
|
conf.set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID + "," + RM2_NODE_ID);
|
||||||
|
HATestUtil.setRpcAddressForRM(RM1_NODE_ID, RM1_PORT_BASE, conf);
|
||||||
|
HATestUtil.setRpcAddressForRM(RM2_NODE_ID, RM2_PORT_BASE, conf);
|
||||||
|
conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, true);
|
||||||
|
conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_USE_RPC, true);
|
||||||
|
|
||||||
|
MiniYARNCluster cluster =
|
||||||
|
new MiniYARNCluster(TestMiniYarnCluster.class.getName(),
|
||||||
|
2, 0, 1, 1);
|
||||||
|
cluster.init(conf);
|
||||||
|
Configuration conf1 = cluster.getResourceManager(0).getConfig(),
|
||||||
|
conf2 = cluster.getResourceManager(1).getConfig();
|
||||||
|
Assert.assertFalse(conf1 == conf2);
|
||||||
|
Assert.assertEquals("0.0.0.0:18032",
|
||||||
|
conf1.get(HAUtil.addSuffix(YarnConfiguration.RM_ADDRESS, RM1_NODE_ID)));
|
||||||
|
Assert.assertEquals("0.0.0.0:28032",
|
||||||
|
conf1.get(HAUtil.addSuffix(YarnConfiguration.RM_ADDRESS, RM2_NODE_ID)));
|
||||||
|
Assert.assertEquals("rm1", conf1.get(YarnConfiguration.RM_HA_ID));
|
||||||
|
|
||||||
|
Assert.assertEquals("0.0.0.0:18032",
|
||||||
|
conf2.get(HAUtil.addSuffix(YarnConfiguration.RM_ADDRESS, RM1_NODE_ID)));
|
||||||
|
Assert.assertEquals("0.0.0.0:28032",
|
||||||
|
conf2.get(HAUtil.addSuffix(YarnConfiguration.RM_ADDRESS, RM2_NODE_ID)));
|
||||||
|
Assert.assertEquals("rm2", conf2.get(YarnConfiguration.RM_HA_ID));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue