diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java
index a248f22cfc5..e2770030b11 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java
@@ -68,7 +68,14 @@ public class RetryPolicies {
*
*/
public static final RetryPolicy RETRY_FOREVER = new RetryForever();
-
+
+ /**
+ *
+ * Keep failing over forever
+ *
+ */
+ public static final RetryPolicy FAILOVER_FOREVER = new FailoverForever();
+
/**
*
* Keep trying a limited number of times, waiting a fixed time between attempts,
@@ -166,6 +173,14 @@ public class RetryPolicies {
return RetryAction.RETRY;
}
}
+
+ static class FailoverForever implements RetryPolicy {
+ @Override
+ public RetryAction shouldRetry(Exception e, int retries, int failovers,
+ boolean isIdempotentOrAtMostOnce) throws Exception {
+ return RetryAction.FAILOVER_AND_RETRY;
+ }
+ }
/**
* Retry up to maxRetries.
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index e69ea1bda3a..e8e262caaab 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -52,6 +52,9 @@ Release 2.4.0 - UNRELEASED
YARN-312. Introduced ResourceManagerAdministrationProtocol changes to support
changing resources on node. (Junping Du via vinodkv)
+ YARN-1028. Added FailoverProxyProvider capability to ResourceManager to help
+ with RM failover. (Karthik Kambatla via vinodkv)
+
IMPROVEMENTS
YARN-7. Support CPU resource for DistributedShell. (Junping Du via llu)
diff --git a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
index 80598a43bb8..486bebfec50 100644
--- a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
+++ b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
@@ -310,4 +310,12 @@
+
+
+
+
+
+
+
+
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index de420b05e35..e96c217b8cc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -296,6 +296,31 @@ public class YarnConfiguration extends Configuration {
HttpConfig.isSecure() ? RM_WEBAPP_HTTPS_ADDRESS
: RM_WEBAPP_ADDRESS));
+ public static final String CLIENT_FAILOVER_PREFIX =
+ YARN_PREFIX + "client.failover-";
+ public static final String CLIENT_FAILOVER_PROXY_PROVIDER =
+ CLIENT_FAILOVER_PREFIX + "proxy-provider";
+ public static final String DEFAULT_CLIENT_FAILOVER_PROXY_PROVIDER =
+ "org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider";
+
+ public static final String CLIENT_FAILOVER_MAX_ATTEMPTS =
+ CLIENT_FAILOVER_PREFIX + "max-attempts";
+
+ public static final String CLIENT_FAILOVER_SLEEPTIME_BASE_MS =
+ CLIENT_FAILOVER_PREFIX + "sleep-base-ms";
+
+ public static final String CLIENT_FAILOVER_SLEEPTIME_MAX_MS =
+ CLIENT_FAILOVER_PREFIX + "sleep-max-ms";
+
+ public static final String CLIENT_FAILOVER_RETRIES =
+ CLIENT_FAILOVER_PREFIX + "retries";
+ public static final int DEFAULT_CLIENT_FAILOVER_RETRIES = 0;
+
+ public static final String CLIENT_FAILOVER_RETRIES_ON_SOCKET_TIMEOUTS =
+ CLIENT_FAILOVER_PREFIX + "retries-on-socket-timeouts";
+ public static final int
+ DEFAULT_CLIENT_FAILOVER_RETRIES_ON_SOCKET_TIMEOUTS = 0;
+
////////////////////////////////
// RM state store configs
////////////////////////////////
@@ -850,22 +875,31 @@ public class YarnConfiguration extends Configuration {
public static final String IS_MINI_YARN_CLUSTER = YARN_PREFIX
+ "is.minicluster";
+ public static final String YARN_MC_PREFIX = YARN_PREFIX + "minicluster.";
+
/** Whether to use fixed ports with the minicluster. */
- public static final String YARN_MINICLUSTER_FIXED_PORTS = YARN_PREFIX
- + "minicluster.fixed.ports";
+ public static final String YARN_MINICLUSTER_FIXED_PORTS =
+ YARN_MC_PREFIX + "fixed.ports";
/**
* Default is false to be able to run tests concurrently without port
* conflicts.
*/
- public static boolean DEFAULT_YARN_MINICLUSTER_FIXED_PORTS = false;
+ public static final boolean DEFAULT_YARN_MINICLUSTER_FIXED_PORTS = false;
+
+ /**
+ * Whether the NM should use RPC to connect to the RM. Default is false.
+ * Can be set to true only when using fixed ports.
+ */
+ public static final String YARN_MINICLUSTER_USE_RPC = YARN_MC_PREFIX + "use-rpc";
+ public static final boolean DEFAULT_YARN_MINICLUSTER_USE_RPC = false;
/**
* Whether users are explicitly trying to control resource monitoring
* configuration for the MiniYARNCluster. Disabled by default.
*/
public static final String YARN_MINICLUSTER_CONTROL_RESOURCE_MONITORING =
- YARN_PREFIX + "minicluster.control-resource-monitoring";
+ YARN_MC_PREFIX + "control-resource-monitoring";
public static final boolean
DEFAULT_YARN_MINICLUSTER_CONTROL_RESOURCE_MONITORING = false;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java
index 049f4cc8266..06bbc3555c4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java
@@ -23,6 +23,7 @@ import java.net.InetSocketAddress;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
@@ -34,17 +35,37 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
-public class ClientRMProxy extends RMProxy {
+import com.google.common.base.Preconditions;
+public class ClientRMProxy extends RMProxy {
private static final Log LOG = LogFactory.getLog(ClientRMProxy.class);
+ private interface ClientRMProtocols extends ApplicationClientProtocol,
+ ApplicationMasterProtocol, ResourceManagerAdministrationProtocol {
+ // Add nothing
+ }
+
+ static {
+ INSTANCE = new ClientRMProxy();
+ }
+
+ private ClientRMProxy(){
+ super();
+ }
+
+ /**
+ * Create a proxy to the ResourceManager for the specified protocol.
+ * @param configuration Configuration with all the required information.
+ * @param protocol Client protocol for which proxy is being requested.
+ * @param Type of proxy.
+ * @return Proxy to the ResourceManager for the specified client protocol.
+ * @throws IOException
+ */
public static T createRMProxy(final Configuration configuration,
final Class protocol) throws IOException {
- YarnConfiguration conf = (configuration instanceof YarnConfiguration)
- ? (YarnConfiguration) configuration
- : new YarnConfiguration(configuration);
- InetSocketAddress rmAddress = getRMAddress(conf, protocol);
- return createRMProxy(conf, protocol, rmAddress);
+ // This method exists only to initiate this class' static INSTANCE. TODO:
+ // FIX if possible
+ return RMProxy.createRMProxy(configuration, protocol);
}
private static void setupTokens(InetSocketAddress resourceManagerAddress)
@@ -63,7 +84,9 @@ public class ClientRMProxy extends RMProxy {
}
}
- private static InetSocketAddress getRMAddress(YarnConfiguration conf,
+ @InterfaceAudience.Private
+ @Override
+ protected InetSocketAddress getRMAddress(YarnConfiguration conf,
Class> protocol) throws IOException {
if (protocol == ApplicationClientProtocol.class) {
return conf.getSocketAddr(YarnConfiguration.RM_ADDRESS,
@@ -89,4 +112,12 @@ public class ClientRMProxy extends RMProxy {
throw new IllegalStateException(message);
}
}
+
+ @InterfaceAudience.Private
+ @Override
+ protected void checkAllowedProtocols(Class> protocol) {
+ Preconditions.checkArgument(
+ protocol.isAssignableFrom(ClientRMProtocols.class),
+ "RM does not support this client protocol");
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
index 7c446045fe5..a5ff9f67dc9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
@@ -19,7 +19,6 @@
package org.apache.hadoop.yarn.client.api.impl;
import java.io.IOException;
-import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
@@ -79,7 +78,6 @@ public class YarnClientImpl extends YarnClient {
private static final Log LOG = LogFactory.getLog(YarnClientImpl.class);
protected ApplicationClientProtocol rmClient;
- protected InetSocketAddress rmAddress;
protected long submitPollIntervalMillis;
private long asyncApiPollIntervalMillis;
@@ -89,15 +87,9 @@ public class YarnClientImpl extends YarnClient {
super(YarnClientImpl.class.getName());
}
- private static InetSocketAddress getRmAddress(Configuration conf) {
- return conf.getSocketAddr(YarnConfiguration.RM_ADDRESS,
- YarnConfiguration.DEFAULT_RM_ADDRESS, YarnConfiguration.DEFAULT_RM_PORT);
- }
-
@SuppressWarnings("deprecation")
@Override
protected void serviceInit(Configuration conf) throws Exception {
- this.rmAddress = getRmAddress(conf);
asyncApiPollIntervalMillis =
conf.getLong(YarnConfiguration.YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS,
YarnConfiguration.DEFAULT_YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS);
@@ -180,9 +172,7 @@ public class YarnClientImpl extends YarnClient {
}
}
-
- LOG.info("Submitted application " + applicationId + " to ResourceManager"
- + " at " + rmAddress);
+ LOG.info("Submitted application " + applicationId);
return applicationId;
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java
new file mode 100644
index 00000000000..8545a1a2839
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ha.HAServiceProtocol;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.HAUtil;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.MiniYARNCluster;
+import org.apache.hadoop.yarn.server.resourcemanager.AdminService;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestRMFailover {
+ private static final Log LOG =
+ LogFactory.getLog(TestRMFailover.class.getName());
+
+ private static final String RM1_NODE_ID = "rm1";
+ private static final int RM1_PORT_BASE = 10000;
+ private static final String RM2_NODE_ID = "rm2";
+ private static final int RM2_PORT_BASE = 20000;
+ private static final HAServiceProtocol.StateChangeRequestInfo req =
+ new HAServiceProtocol.StateChangeRequestInfo(
+ HAServiceProtocol.RequestSource.REQUEST_BY_USER_FORCED);
+
+ private static Configuration conf;
+ private static MiniYARNCluster cluster;
+
+ private static void setConfForRM(String rmId, String prefix, String value) {
+ conf.set(HAUtil.addSuffix(prefix, rmId), value);
+ }
+
+ private static void setRpcAddressForRM(String rmId, int base) {
+ setConfForRM(rmId, YarnConfiguration.RM_ADDRESS, "0.0.0.0:" +
+ (base + YarnConfiguration.DEFAULT_RM_PORT));
+ setConfForRM(rmId, YarnConfiguration.RM_SCHEDULER_ADDRESS, "0.0.0.0:" +
+ (base + YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT));
+ setConfForRM(rmId, YarnConfiguration.RM_ADMIN_ADDRESS, "0.0.0.0:" +
+ (base + YarnConfiguration.DEFAULT_RM_ADMIN_PORT));
+ setConfForRM(rmId, YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, "0.0.0.0:" +
+ (base + YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT));
+ setConfForRM(rmId, YarnConfiguration.RM_WEBAPP_ADDRESS, "0.0.0.0:" +
+ (base + YarnConfiguration.DEFAULT_RM_WEBAPP_PORT));
+ setConfForRM(rmId, YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS, "0.0.0.0:" +
+ (base + YarnConfiguration.DEFAULT_RM_WEBAPP_HTTPS_PORT));
+ }
+
+ private static AdminService getRMAdminService(int index) {
+ return
+ cluster.getResourceManager(index).getRMContext().getRMAdminService();
+ }
+
+ @BeforeClass
+ public static void setup() throws IOException {
+ conf = new YarnConfiguration();
+ conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
+ conf.set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID + "," + RM2_NODE_ID);
+ setRpcAddressForRM(RM1_NODE_ID, RM1_PORT_BASE);
+ setRpcAddressForRM(RM2_NODE_ID, RM2_PORT_BASE);
+
+ conf.setInt(YarnConfiguration.CLIENT_FAILOVER_MAX_ATTEMPTS, 100);
+ conf.setLong(YarnConfiguration.CLIENT_FAILOVER_SLEEPTIME_BASE_MS, 100L);
+ conf.setLong(YarnConfiguration.CLIENT_FAILOVER_SLEEPTIME_MAX_MS, 1000L);
+ conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, true);
+ conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_USE_RPC, true);
+
+ cluster = new MiniYARNCluster(TestRMFailover.class.getName(), 2, 1, 1, 1);
+ cluster.init(conf);
+ cluster.start();
+
+ cluster.getResourceManager(0).getRMContext().getRMAdminService()
+ .transitionToActive(req);
+ assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex());
+ }
+
+ @AfterClass
+ public static void teardown() {
+ cluster.stop();
+ }
+
+ private void verifyClientConnection() {
+ int numRetries = 3;
+ while(numRetries-- > 0) {
+ Configuration conf = new YarnConfiguration(TestRMFailover.conf);
+ YarnClient client = YarnClient.createYarnClient();
+ client.init(conf);
+ client.start();
+ try {
+ client.getApplications();
+ return;
+ } catch (Exception e) {
+ LOG.error(e);
+ } finally {
+ client.stop();
+ }
+ }
+ fail("Client couldn't connect to the Active RM");
+ }
+
+ @Test
+ public void testExplicitFailover()
+ throws YarnException, InterruptedException, IOException {
+ assertTrue("NMs failed to connect to the RM",
+ cluster.waitForNodeManagersToConnect(5000));
+ verifyClientConnection();
+
+ // Failover to the second RM
+ getRMAdminService(0).transitionToStandby(req);
+ getRMAdminService(1).transitionToActive(req);
+ assertEquals("Wrong ResourceManager is active",
+ HAServiceProtocol.HAServiceState.ACTIVE,
+ getRMAdminService(1).getServiceStatus().getState());
+ assertTrue("NMs failed to connect to the RM",
+ cluster.waitForNodeManagersToConnect(5000));
+ verifyClientConnection();
+
+ // Failover back to the first RM
+ getRMAdminService(1).transitionToStandby(req);
+ getRMAdminService(0).transitionToActive(req);
+ assertEquals("Wrong ResourceManager is active",
+ HAServiceProtocol.HAServiceState.ACTIVE,
+ getRMAdminService(0).getServiceStatus().getState());
+ assertTrue("NMs failed to connect to the RM",
+ cluster.waitForNodeManagersToConnect(5000));
+ verifyClientConnection();
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ConfiguredRMFailoverProxyProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ConfiguredRMFailoverProxyProvider.java
new file mode 100644
index 00000000000..ef56edd4293
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ConfiguredRMFailoverProxyProvider.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.yarn.conf.HAUtil;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class ConfiguredRMFailoverProxyProvider
+ implements RMFailoverProxyProvider {
+ private static final Log LOG =
+ LogFactory.getLog(ConfiguredRMFailoverProxyProvider.class);
+
+ private int currentProxyIndex = 0;
+ Map proxies = new HashMap();
+
+ private RMProxy rmProxy;
+ private Class protocol;
+ protected YarnConfiguration conf;
+ protected String[] rmServiceIds;
+
+ @Override
+ public void init(Configuration configuration, RMProxy rmProxy,
+ Class protocol) {
+ this.rmProxy = rmProxy;
+ this.protocol = protocol;
+ this.rmProxy.checkAllowedProtocols(this.protocol);
+ this.conf = new YarnConfiguration(configuration);
+ Collection rmIds = HAUtil.getRMHAIds(conf);
+ this.rmServiceIds = rmIds.toArray(new String[rmIds.size()]);
+ conf.set(YarnConfiguration.RM_HA_ID, rmServiceIds[currentProxyIndex]);
+
+ conf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY,
+ conf.getInt(YarnConfiguration.CLIENT_FAILOVER_RETRIES,
+ YarnConfiguration.DEFAULT_CLIENT_FAILOVER_RETRIES));
+
+ conf.setInt(CommonConfigurationKeysPublic.
+ IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY,
+ conf.getInt(YarnConfiguration.CLIENT_FAILOVER_RETRIES_ON_SOCKET_TIMEOUTS,
+ YarnConfiguration.DEFAULT_CLIENT_FAILOVER_RETRIES_ON_SOCKET_TIMEOUTS));
+ }
+
+ private T getProxyInternal() {
+ try {
+ final InetSocketAddress rmAddress = rmProxy.getRMAddress(conf, protocol);
+ return RMProxy.getProxy(conf, protocol, rmAddress);
+ } catch (IOException ioe) {
+ LOG.error("Unable to create proxy to the ResourceManager " +
+ rmServiceIds[currentProxyIndex], ioe);
+ return null;
+ }
+ }
+
+ @Override
+ public synchronized T getProxy() {
+ String rmId = rmServiceIds[currentProxyIndex];
+ T current = proxies.get(rmId);
+ if (current == null) {
+ current = getProxyInternal();
+ proxies.put(rmId, current);
+ }
+ return current;
+ }
+
+ @Override
+ public synchronized void performFailover(T currentProxy) {
+ currentProxyIndex = (currentProxyIndex + 1) % rmServiceIds.length;
+ conf.set(YarnConfiguration.RM_HA_ID, rmServiceIds[currentProxyIndex]);
+ LOG.info("Failing over to " + rmServiceIds[currentProxyIndex]);
+ }
+
+ @Override
+ public Class getInterface() {
+ return protocol;
+ }
+
+ /**
+ * Close all the proxy objects which have been opened over the lifetime of
+ * this proxy provider.
+ */
+ @Override
+ public synchronized void close() throws IOException {
+ for (T proxy : proxies.values()) {
+ if (proxy instanceof Closeable) {
+ ((Closeable)proxy).close();
+ } else {
+ RPC.stopProxy(proxy);
+ }
+ }
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMFailoverProxyProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMFailoverProxyProvider.java
new file mode 100644
index 00000000000..63b4764ab5a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMFailoverProxyProvider.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.retry.FailoverProxyProvider;
+
+@InterfaceAudience.Private
+public interface RMFailoverProxyProvider extends FailoverProxyProvider {
+ /**
+ * Initialize internal data structures, invoked right after instantiation.
+ *
+ * @param conf Configuration to use
+ * @param proxy The {@link RMProxy} instance to use
+ * @param protocol The communication protocol to use
+ */
+ public void init(Configuration conf, RMProxy proxy, Class protocol);
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java
index 5fff760eb2d..1651c13100c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java
@@ -36,6 +36,8 @@ import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryProxy;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.ipc.YarnRPC;
@@ -48,7 +50,68 @@ import com.google.common.annotations.VisibleForTesting;
public class RMProxy {
private static final Log LOG = LogFactory.getLog(RMProxy.class);
+ protected static RMProxy INSTANCE;
+ protected RMProxy() {}
+
+ /**
+ * Verify the passed protocol is supported.
+ */
+ @Private
+ protected void checkAllowedProtocols(Class> protocol) {}
+
+ /**
+ * Get the ResourceManager address from the provided Configuration for the
+ * given protocol.
+ */
+ @Private
+ protected InetSocketAddress getRMAddress(
+ YarnConfiguration conf, Class> protocol) throws IOException {
+ throw new UnsupportedOperationException("This method should be invoked " +
+ "from an instance of ClientRMProxy or ServerRMProxy");
+ }
+
+ /**
+ * Create a proxy for the specified protocol. For non-HA,
+ * this is a direct connection to the ResourceManager address. When HA is
+ * enabled, the proxy handles the failover between the ResourceManagers as
+ * well.
+ */
+ @Private
+ protected static T createRMProxy(final Configuration configuration,
+ final Class protocol) throws IOException {
+ YarnConfiguration conf = (configuration instanceof YarnConfiguration)
+ ? (YarnConfiguration) configuration
+ : new YarnConfiguration(configuration);
+ RetryPolicy retryPolicy = createRetryPolicy(conf);
+ if (HAUtil.isHAEnabled(conf)) {
+ RMFailoverProxyProvider provider =
+ INSTANCE.createRMFailoverProxyProvider(conf, protocol);
+ return (T) RetryProxy.create(protocol, provider, retryPolicy);
+ } else {
+ InetSocketAddress rmAddress = INSTANCE.getRMAddress(conf, protocol);
+ LOG.info("Connecting to ResourceManager at " + rmAddress);
+ T proxy = RMProxy.getProxy(conf, protocol, rmAddress);
+ return (T) RetryProxy.create(protocol, proxy, retryPolicy);
+ }
+ }
+
+ /**
+ * @deprecated
+ * This method is deprecated and is not used by YARN internally any more.
+ * To create a proxy to the RM, use ClientRMProxy#createRMProxy or
+ * ServerRMProxy#createRMProxy.
+ *
+ * Create a proxy to the ResourceManager at the specified address.
+ *
+ * @param conf Configuration to generate retry policy
+ * @param protocol Protocol for the proxy
+ * @param rmAddress Address of the ResourceManager
+ * @param Type information of the proxy
+ * @return Proxy to the RM
+ * @throws IOException
+ */
+ @Deprecated
public static T createRMProxy(final Configuration conf,
final Class protocol, InetSocketAddress rmAddress) throws IOException {
RetryPolicy retryPolicy = createRetryPolicy(conf);
@@ -57,12 +120,16 @@ public class RMProxy {
return (T) RetryProxy.create(protocol, proxy, retryPolicy);
}
- private static T getProxy(final Configuration conf,
+ /**
+ * Get a proxy to the RM at the specified address. To be used to create a
+ * RetryProxy.
+ */
+ @Private
+ static T getProxy(final Configuration conf,
final Class protocol, final InetSocketAddress rmAddress)
throws IOException {
return UserGroupInformation.getCurrentUser().doAs(
new PrivilegedAction() {
-
@Override
public T run() {
return (T) YarnRPC.create(conf).getProxy(protocol, rmAddress, conf);
@@ -70,6 +137,50 @@ public class RMProxy {
});
}
+ /**
+ * Helper method to create FailoverProxyProvider.
+ */
+ private RMFailoverProxyProvider createRMFailoverProxyProvider(
+ Configuration conf, Class protocol) {
+ Class extends RMFailoverProxyProvider> defaultProviderClass;
+ try {
+ defaultProviderClass = (Class extends RMFailoverProxyProvider>)
+ Class.forName(
+ YarnConfiguration.DEFAULT_CLIENT_FAILOVER_PROXY_PROVIDER);
+ } catch (Exception e) {
+ throw new YarnRuntimeException("Invalid default failover provider class" +
+ YarnConfiguration.DEFAULT_CLIENT_FAILOVER_PROXY_PROVIDER, e);
+ }
+
+ RMFailoverProxyProvider provider = ReflectionUtils.newInstance(
+ conf.getClass(YarnConfiguration.CLIENT_FAILOVER_PROXY_PROVIDER,
+ defaultProviderClass, RMFailoverProxyProvider.class), conf);
+ provider.init(conf, (RMProxy) this, protocol);
+ return provider;
+ }
+
+ /**
+ * A RetryPolicy to allow failing over upto the specified maximum time.
+ */
+ private static class FailoverUptoMaximumTimePolicy implements RetryPolicy {
+ private long maxTime;
+
+ FailoverUptoMaximumTimePolicy(long maxTime) {
+ this.maxTime = maxTime;
+ }
+
+ @Override
+ public RetryAction shouldRetry(Exception e, int retries, int failovers,
+ boolean isIdempotentOrAtMostOnce) throws Exception {
+ return System.currentTimeMillis() < maxTime
+ ? RetryAction.FAILOVER_AND_RETRY
+ : RetryAction.FAIL;
+ }
+ }
+
+ /**
+ * Fetch retry policy from Configuration
+ */
@Private
@VisibleForTesting
public static RetryPolicy createRetryPolicy(Configuration conf) {
@@ -81,19 +192,10 @@ public class RMProxy {
conf.getLong(
YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS,
YarnConfiguration
- .DEFAULT_RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS);
-
- if (rmConnectionRetryIntervalMS < 0) {
- throw new YarnRuntimeException("Invalid Configuration. " +
- YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS +
- " should not be negative.");
- }
+ .DEFAULT_RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS);
boolean waitForEver = (rmConnectWaitMS == -1);
-
- if (waitForEver) {
- return RetryPolicies.RETRY_FOREVER;
- } else {
+ if (!waitForEver) {
if (rmConnectWaitMS < 0) {
throw new YarnRuntimeException("Invalid Configuration. "
+ YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS
@@ -110,18 +212,54 @@ public class RMProxy {
}
}
+ // Handle HA case first
+ if (HAUtil.isHAEnabled(conf)) {
+ final long failoverSleepBaseMs = conf.getLong(
+ YarnConfiguration.CLIENT_FAILOVER_SLEEPTIME_BASE_MS,
+ rmConnectionRetryIntervalMS);
+
+ final long failoverSleepMaxMs = conf.getLong(
+ YarnConfiguration.CLIENT_FAILOVER_SLEEPTIME_MAX_MS,
+ rmConnectionRetryIntervalMS);
+
+ int maxFailoverAttempts = conf.getInt(
+ YarnConfiguration.CLIENT_FAILOVER_MAX_ATTEMPTS, -1);
+
+ RetryPolicy basePolicy = RetryPolicies.TRY_ONCE_THEN_FAIL;
+ if (maxFailoverAttempts == -1) {
+ if (waitForEver) {
+ basePolicy = RetryPolicies.FAILOVER_FOREVER;
+ } else {
+ basePolicy = new FailoverUptoMaximumTimePolicy(
+ System.currentTimeMillis() + rmConnectWaitMS);
+ }
+ maxFailoverAttempts = 0;
+ }
+
+ return RetryPolicies.failoverOnNetworkException(basePolicy,
+ maxFailoverAttempts, failoverSleepBaseMs, failoverSleepMaxMs);
+ }
+
+ if (waitForEver) {
+ return RetryPolicies.RETRY_FOREVER;
+ }
+
+ if (rmConnectionRetryIntervalMS < 0) {
+ throw new YarnRuntimeException("Invalid Configuration. " +
+ YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS +
+ " should not be negative.");
+ }
+
RetryPolicy retryPolicy =
RetryPolicies.retryUpToMaximumTimeWithFixedSleep(rmConnectWaitMS,
- rmConnectionRetryIntervalMS,
- TimeUnit.MILLISECONDS);
+ rmConnectionRetryIntervalMS, TimeUnit.MILLISECONDS);
Map, RetryPolicy> exceptionToPolicyMap =
new HashMap, RetryPolicy>();
exceptionToPolicyMap.put(ConnectException.class, retryPolicy);
//TO DO: after HADOOP-9576, IOException can be changed to EOFException
exceptionToPolicyMap.put(IOException.class, retryPolicy);
-
- return RetryPolicies.retryByException(RetryPolicies.TRY_ONCE_THEN_FAIL,
- exceptionToPolicyMap);
+ return RetryPolicies.retryByException(
+ RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 9673826c2ae..f13d92be1c4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -424,6 +424,61 @@
+
+ When HA is enabled, the class to be used by Clients, AMs and
+ NMs to failover to the Active RM. It should extend
+ org.apache.hadoop.yarn.client.RMFailoverProxyProvider
+ yarn.client.failover-proxy-provider
+ org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider
+
+
+
+ When HA is enabled, the max number of times
+ FailoverProxyProvider should attempt failover. When set,
+ this overrides the yarn.resourcemanager.connect.max-wait.ms. When
+ not set, this is inferred from
+ yarn.resourcemanager.connect.max-wait.ms.
+ yarn.client.failover-max-attempts
+
+
+
+
+ When HA is enabled, the sleep base (in milliseconds) to be
+ used for calculating the exponential delay between failovers. When set,
+ this overrides the yarn.resourcemanager.connect.* settings. When
+ not set, yarn.resourcemanager.connect.retry-interval.ms is used instead.
+
+ yarn.client.failover-sleep-base-ms
+
+
+
+
+ When HA is enabled, the maximum sleep time (in milliseconds)
+ between failovers. When set, this overrides the
+ yarn.resourcemanager.connect.* settings. When not set,
+ yarn.resourcemanager.connect.retry-interval.ms is used instead.
+ yarn.client.failover-sleep-max-ms
+
+
+
+
+ When HA is enabled, the number of retries per
+ attempt to connect to a ResourceManager. In other words,
+ it is the ipc.client.connect.max.retries to be used during
+ failover attempts
+ yarn.client.failover-retries
+ 0
+
+
+
+ When HA is enabled, the number of retries per
+ attempt to connect to a ResourceManager on socket timeouts. In other
+ words, it is the ipc.client.connect.max.retries.on.timeouts to be used
+ during failover attempts
+ yarn.client.failover-retries-on-socket-timeouts
+ 0
+
+
The maximum number of completed applications RM keeps.
yarn.resourcemanager.max-completed-applications
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java
index c25c5977b80..15a26e51260 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java
@@ -23,25 +23,43 @@ import java.net.InetSocketAddress;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.client.RMProxy;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
-public class ServerRMProxy extends RMProxy {
+import com.google.common.base.Preconditions;
+public class ServerRMProxy extends RMProxy {
private static final Log LOG = LogFactory.getLog(ServerRMProxy.class);
- public static T createRMProxy(final Configuration configuration,
- final Class protocol) throws IOException {
- YarnConfiguration conf = (configuration instanceof YarnConfiguration)
- ? (YarnConfiguration) configuration
- : new YarnConfiguration(configuration);
- InetSocketAddress rmAddress = getRMAddress(conf, protocol);
- return createRMProxy(conf, protocol, rmAddress);
+ static {
+ INSTANCE = new ServerRMProxy();
}
- private static InetSocketAddress getRMAddress(YarnConfiguration conf,
- Class> protocol) {
+ private ServerRMProxy() {
+ super();
+ }
+
+ /**
+ * Create a proxy to the ResourceManager for the specified protocol.
+ * @param configuration Configuration with all the required information.
+ * @param protocol Server protocol for which proxy is being requested.
+ * @param Type of proxy.
+ * @return Proxy to the ResourceManager for the specified server protocol.
+ * @throws IOException
+ */
+ public static T createRMProxy(final Configuration configuration,
+ final Class protocol) throws IOException {
+ // This method exists only to initiate this class' static INSTANCE. TODO:
+ // FIX if possible
+ return RMProxy.createRMProxy(configuration, protocol);
+ }
+
+ @InterfaceAudience.Private
+ @Override
+ protected InetSocketAddress getRMAddress(YarnConfiguration conf,
+ Class> protocol) {
if (protocol == ResourceTracker.class) {
return conf.getSocketAddr(
YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
@@ -55,4 +73,12 @@ public class ServerRMProxy extends RMProxy {
throw new IllegalStateException(message);
}
}
+
+ @InterfaceAudience.Private
+ @Override
+ protected void checkAllowedProtocols(Class> protocol) {
+ Preconditions.checkArgument(
+ protocol.isAssignableFrom(ResourceTracker.class),
+ "ResourceManager does not support this protocol");
+ }
}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
index 54de419a63d..78bbea43852 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
@@ -22,6 +22,7 @@ import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
+import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -38,6 +39,7 @@ import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -65,6 +67,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAt
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
+import static org.junit.Assert.fail;
+
/**
* Embedded Yarn minicluster for testcases that need to interact with a cluster.
*
@@ -91,9 +95,11 @@ public class MiniYARNCluster extends CompositeService {
private NodeManager[] nodeManagers;
private ResourceManager[] resourceManagers;
+ private String[] rmIds;
+
+ private boolean useFixedPorts;
+ private boolean useRpc = false;
- private ResourceManagerWrapper resourceManagerWrapper;
-
private ConcurrentMap appMasters =
new ConcurrentHashMap(16, 0.75f, 2);
@@ -163,15 +169,7 @@ public class MiniYARNCluster extends CompositeService {
}
resourceManagers = new ResourceManager[numResourceManagers];
- for (int i = 0; i < numResourceManagers; i++) {
- resourceManagers[i] = new ResourceManager();
- addService(new ResourceManagerWrapper(i));
- }
- nodeManagers = new CustomNodeManager[numNodeManagers];
- for(int index = 0; index < numNodeManagers; index++) {
- addService(new NodeManagerWrapper(index));
- nodeManagers[index] = new CustomNodeManager();
- }
+ nodeManagers = new NodeManager[numNodeManagers];
}
/**
@@ -185,20 +183,45 @@ public class MiniYARNCluster extends CompositeService {
this(testName, 1, numNodeManagers, numLocalDirs, numLogDirs);
}
- @Override
+ @Override
public void serviceInit(Configuration conf) throws Exception {
+ useFixedPorts = conf.getBoolean(
+ YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS,
+ YarnConfiguration.DEFAULT_YARN_MINICLUSTER_FIXED_PORTS);
+ useRpc = conf.getBoolean(YarnConfiguration.YARN_MINICLUSTER_USE_RPC,
+ YarnConfiguration.DEFAULT_YARN_MINICLUSTER_USE_RPC);
+
+ if (useRpc && !useFixedPorts) {
+ throw new YarnRuntimeException("Invalid configuration!" +
+ " Minicluster can use rpc only when configured to use fixed ports");
+ }
+
if (resourceManagers.length > 1) {
conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
-
- StringBuilder rmIds = new StringBuilder();
- for (int i = 0; i < resourceManagers.length; i++) {
- if (i != 0) {
- rmIds.append(",");
+ if (conf.get(YarnConfiguration.RM_HA_IDS) == null) {
+ StringBuilder rmIds = new StringBuilder();
+ for (int i = 0; i < resourceManagers.length; i++) {
+ if (i != 0) {
+ rmIds.append(",");
+ }
+ rmIds.append("rm" + i);
}
- rmIds.append("rm" + i);
+ conf.set(YarnConfiguration.RM_HA_IDS, rmIds.toString());
}
- conf.set(YarnConfiguration.RM_HA_IDS, rmIds.toString());
+ Collection rmIdsCollection = HAUtil.getRMHAIds(conf);
+ rmIds = rmIdsCollection.toArray(new String[rmIdsCollection.size()]);
}
+
+ for (int i = 0; i < resourceManagers.length; i++) {
+ resourceManagers[i] = new ResourceManager();
+ addService(new ResourceManagerWrapper(i));
+ }
+ for(int index = 0; index < nodeManagers.length; index++) {
+ nodeManagers[index] =
+ useRpc ? new CustomNodeManager() : new ShortCircuitedNodeManager();
+ addService(new NodeManagerWrapper(index));
+ }
+
super.serviceInit(
conf instanceof YarnConfiguration ? conf : new YarnConfiguration(conf));
}
@@ -213,11 +236,12 @@ public class MiniYARNCluster extends CompositeService {
*
* In an non-HA cluster, return the index of the only RM.
*
- * @return index of the active RM
+ * @return index of the active RM or -1 if none of them transition to
+ * active even after 5 seconds of waiting
*/
@InterfaceAudience.Private
@VisibleForTesting
- int getActiveRMIndex() {
+ public int getActiveRMIndex() {
if (resourceManagers.length == 1) {
return 0;
}
@@ -292,9 +316,7 @@ public class MiniYARNCluster extends CompositeService {
}
private void setHARMConfiguration(Configuration conf) {
- String rmId = "rm" + index;
String hostname = MiniYARNCluster.getHostname();
- conf.set(YarnConfiguration.RM_HA_ID, rmId);
for (String confKey : YarnConfiguration.RM_SERVICES_ADDRESS_CONF_KEYS) {
for (String id : HAUtil.getRMHAIds(conf)) {
conf.set(HAUtil.addSuffix(confKey, id), hostname + ":0");
@@ -306,15 +328,17 @@ public class MiniYARNCluster extends CompositeService {
protected synchronized void serviceInit(Configuration conf)
throws Exception {
conf.setBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, true);
- if (!conf.getBoolean(
- YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS,
- YarnConfiguration.DEFAULT_YARN_MINICLUSTER_FIXED_PORTS)) {
+
+ if (!useFixedPorts) {
if (HAUtil.isHAEnabled(conf)) {
setHARMConfiguration(conf);
} else {
setNonHARMConfiguration(conf);
}
}
+ if (HAUtil.isHAEnabled(conf)) {
+ conf.set(YarnConfiguration.RM_HA_ID, rmIds[index]);
+ }
resourceManagers[index].init(conf);
resourceManagers[index].getRMContext().getDispatcher().register
(RMAppAttemptEventType.class,
@@ -500,7 +524,9 @@ public class MiniYARNCluster extends CompositeService {
protected void doSecureLogin() throws IOException {
// Don't try to login using keytab in the testcase.
}
+ }
+ private class ShortCircuitedNodeManager extends CustomNodeManager {
@Override
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
@@ -553,4 +579,28 @@ public class MiniYARNCluster extends CompositeService {
};
}
}
+
+ /**
+ * Wait for all the NodeManagers to connect to the ResourceManager.
+ *
+ * @param timeout Time to wait (sleeps in 100 ms intervals) in milliseconds.
+ * @return true if all NodeManagers connect to the (Active)
+ * ResourceManager, false otherwise.
+ * @throws YarnException
+ * @throws InterruptedException
+ */
+ public boolean waitForNodeManagersToConnect(long timeout)
+ throws YarnException, InterruptedException {
+ ResourceManager rm = getResourceManager();
+ GetClusterMetricsRequest req = GetClusterMetricsRequest.newInstance();
+
+ for (int i = 0; i < timeout / 100; i++) {
+ if (nodeManagers.length == rm.getClientRMService().getClusterMetrics(req)
+ .getClusterMetrics().getNumNodeManagers()) {
+ return true;
+ }
+ Thread.sleep(100);
+ }
+ return false;
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYARNClusterForHA.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYARNClusterForHA.java
index f62124e5d39..05266858a22 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYARNClusterForHA.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYARNClusterForHA.java
@@ -33,6 +33,7 @@ import java.io.IOException;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class TestMiniYARNClusterForHA {
@@ -56,16 +57,7 @@ public class TestMiniYARNClusterForHA {
@Test
public void testClusterWorks() throws YarnException, InterruptedException {
- ResourceManager rm = cluster.getResourceManager(0);
- GetClusterMetricsRequest req = GetClusterMetricsRequest.newInstance();
-
- for (int i = 0; i < 600; i++) {
- if (1 == rm.getClientRMService().getClusterMetrics(req)
- .getClusterMetrics().getNumNodeManagers()) {
- return;
- }
- Thread.sleep(100);
- }
- fail("NodeManager never registered with the RM");
+ assertTrue("NMs fail to connect to the RM",
+ cluster.waitForNodeManagersToConnect(5000));
}
}