From 3990e8b478b1d958479c173c74946e38360cfd17 Mon Sep 17 00:00:00 2001 From: Bikas Saha Date: Tue, 16 Jul 2013 22:54:55 +0000 Subject: [PATCH] Merge r1503933 from trunk to branch-2 for YARN-513. Create common proxy client for communicating with RM (Xuan Gong & Jian He via bikas) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1503935 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 3 + .../hadoop/yarn/conf/YarnConfiguration.java | 12 +- .../hadoop/yarn/client/ClientRMProxy.java | 65 +++++++ .../hadoop/yarn/client/api/YarnClient.java | 20 --- .../yarn/client/api/impl/AMRMClientImpl.java | 26 +-- .../yarn/client/api/impl/YarnClientImpl.java | 29 +-- .../hadoop/yarn/client/cli/RMAdminCLI.java | 28 +-- .../apache/hadoop/yarn/client/RMProxy.java | 125 +++++++++++++ .../hadoop/yarn/server/api/ServerRMProxy.java | 55 ++++++ .../client/ResourceTrackerPBClientImpl.java | 12 +- .../nodemanager/NodeStatusUpdaterImpl.java | 138 +++------------ .../nodemanager/MockNodeStatusUpdater.java | 4 + .../server/nodemanager/TestEventFlow.java | 5 + .../nodemanager/TestNodeStatusUpdater.java | 167 ++++++++++++------ .../BaseContainerManagerTest.java | 5 + .../hadoop/yarn/server/MiniYARNCluster.java | 5 + 16 files changed, 433 insertions(+), 266 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 65d19bff9d8..4d6cb00b23e 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -465,6 +465,9 @@ Release 2.1.0-beta - 2013-07-02 YARN-521. Augment AM - RM client module to be able to request containers only at specific locations (Sandy Ryza via bikas) + YARN-513. Create common proxy client for communicating with RM. (Xuan Gong + & Jian He via bikas) + OPTIMIZATIONS YARN-512. Log aggregation root directory check is more expensive than it diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 44c35c3d58b..b14e6522520 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -655,17 +655,17 @@ public class YarnConfiguration extends Configuration { public static final long DEFAULT_NM_PROCESS_KILL_WAIT_MS = 2000; - /** Max time to wait to establish a connection to RM when NM starts + /** Max time to wait to establish a connection to RM */ - public static final String RESOURCEMANAGER_CONNECT_WAIT_SECS = - NM_PREFIX + "resourcemanager.connect.wait.secs"; - public static final int DEFAULT_RESOURCEMANAGER_CONNECT_WAIT_SECS = + public static final String RESOURCEMANAGER_CONNECT_MAX_WAIT_SECS = + RM_PREFIX + "resourcemanager.connect.max.wait.secs"; + public static final int DEFAULT_RESOURCEMANAGER_CONNECT_MAX_WAIT_SECS = 15*60; - /** Time interval between each NM attempt to connect to RM + /** Time interval between each attempt to connect to RM */ public static final String RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS = - NM_PREFIX + "resourcemanager.connect.retry_interval.secs"; + RM_PREFIX + "resourcemanager.connect.retry_interval.secs"; public static final long DEFAULT_RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS = 30; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java new file mode 100644 index 00000000000..f70b44ce3a8 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java @@ -0,0 +1,65 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.client; + +import java.io.IOException; +import java.net.InetSocketAddress; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.ApplicationClientProtocol; +import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol; + +public class ClientRMProxy extends RMProxy{ + + private static final Log LOG = LogFactory.getLog(ClientRMProxy.class); + + public static T createRMProxy(final Configuration conf, + final Class protocol) throws IOException { + InetSocketAddress rmAddress = getRMAddress(conf, protocol); + return createRMProxy(conf, protocol, rmAddress); + } + + private static InetSocketAddress getRMAddress(Configuration conf, Class protocol) { + if (protocol == ApplicationClientProtocol.class) { + return conf.getSocketAddr(YarnConfiguration.RM_ADDRESS, + YarnConfiguration.DEFAULT_RM_ADDRESS, + YarnConfiguration.DEFAULT_RM_PORT); + } else if (protocol == ResourceManagerAdministrationProtocol.class) { + return conf.getSocketAddr( + YarnConfiguration.RM_ADMIN_ADDRESS, + YarnConfiguration.DEFAULT_RM_ADMIN_ADDRESS, + YarnConfiguration.DEFAULT_RM_ADMIN_PORT); + } else if (protocol == ApplicationMasterProtocol.class) { + return conf.getSocketAddr( + YarnConfiguration.RM_SCHEDULER_ADDRESS, + YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS, + YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT); + } else { + String message = "Unsupported protocol found when creating the proxy " + + "connection to ResourceManager: " + + ((protocol != null) ? protocol.getClass().getName() : "null"); + LOG.error(message); + throw new IllegalStateException(message); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java index e8dca61d32a..22d80c6e8d9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java @@ -19,7 +19,6 @@ package org.apache.hadoop.yarn.client.api; import java.io.IOException; -import java.net.InetSocketAddress; import java.util.List; import java.util.Set; @@ -54,25 +53,6 @@ public abstract class YarnClient extends AbstractService { return client; } - /** - * Create a new instance of YarnClient. - */ - @Public - public static YarnClient createYarnClient(InetSocketAddress rmAddress) { - YarnClient client = new YarnClientImpl(rmAddress); - return client; - } - - /** - * Create a new instance of YarnClient. - */ - @Public - public static YarnClient createYarnClient(String name, - InetSocketAddress rmAddress) { - YarnClient client = new YarnClientImpl(name, rmAddress); - return client; - } - @Private protected YarnClient(String name) { super(name); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java index 0f088a0604b..4119a0cb1de 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java @@ -19,8 +19,6 @@ package org.apache.hadoop.yarn.client.api.impl; import java.io.IOException; -import java.net.InetSocketAddress; -import java.security.PrivilegedAction; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -42,7 +40,6 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; @@ -56,16 +53,16 @@ import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.client.ClientRMProxy; import org.apache.hadoop.yarn.client.api.AMRMClient; +import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.client.api.InvalidContainerRequestException; import org.apache.hadoop.yarn.client.api.NMTokenCache; -import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; -import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.util.RackResolver; import com.google.common.annotations.VisibleForTesting; @@ -171,28 +168,11 @@ public class AMRMClientImpl extends AMRMClient { @Override protected void serviceStart() throws Exception { final YarnConfiguration conf = new YarnConfiguration(getConfig()); - final YarnRPC rpc = YarnRPC.create(conf); - final InetSocketAddress rmAddress = conf.getSocketAddr( - YarnConfiguration.RM_SCHEDULER_ADDRESS, - YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS, - YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT); - - UserGroupInformation currentUser; try { - currentUser = UserGroupInformation.getCurrentUser(); + rmClient = ClientRMProxy.createRMProxy(conf, ApplicationMasterProtocol.class); } catch (IOException e) { throw new YarnRuntimeException(e); } - - // CurrentUser should already have AMToken loaded. - rmClient = currentUser.doAs(new PrivilegedAction() { - @Override - public ApplicationMasterProtocol run() { - return (ApplicationMasterProtocol) rpc.getProxy(ApplicationMasterProtocol.class, rmAddress, - conf); - } - }); - LOG.debug("Connecting to ResourceManager at " + rmAddress); super.serviceStart(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java index b3b8bdf4316..4398359862b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java @@ -59,11 +59,12 @@ import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; +import org.apache.hadoop.yarn.client.ClientRMProxy; import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.client.api.YarnClientApplication; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.util.Records; import com.google.common.annotations.VisibleForTesting; @@ -81,16 +82,7 @@ public class YarnClientImpl extends YarnClient { private static final String ROOT = "root"; public YarnClientImpl() { - this(null); - } - - public YarnClientImpl(InetSocketAddress rmAddress) { - this(YarnClientImpl.class.getName(), rmAddress); - } - - public YarnClientImpl(String name, InetSocketAddress rmAddress) { - super(name); - this.rmAddress = rmAddress; + super(YarnClientImpl.class.getName()); } private static InetSocketAddress getRmAddress(Configuration conf) { @@ -100,9 +92,7 @@ public class YarnClientImpl extends YarnClient { @Override protected void serviceInit(Configuration conf) throws Exception { - if (this.rmAddress == null) { - this.rmAddress = getRmAddress(conf); - } + this.rmAddress = getRmAddress(conf); statePollIntervalMillis = conf.getLong( YarnConfiguration.YARN_CLIENT_APP_SUBMISSION_POLL_INTERVAL_MS, YarnConfiguration.DEFAULT_YARN_CLIENT_APP_SUBMISSION_POLL_INTERVAL_MS); @@ -111,12 +101,11 @@ public class YarnClientImpl extends YarnClient { @Override protected void serviceStart() throws Exception { - YarnRPC rpc = YarnRPC.create(getConfig()); - - this.rmClient = (ApplicationClientProtocol) rpc.getProxy( - ApplicationClientProtocol.class, rmAddress, getConfig()); - if (LOG.isDebugEnabled()) { - LOG.debug("Connecting to ResourceManager at " + rmAddress); + try { + rmClient = ClientRMProxy.createRMProxy(getConfig(), + ApplicationClientProtocol.class); + } catch (IOException e) { + throw new YarnRuntimeException(e); } super.serviceStart(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java index 6426fe9dbc7..11335c0d8f6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java @@ -19,8 +19,6 @@ package org.apache.hadoop.yarn.client.cli; import java.io.IOException; -import java.net.InetSocketAddress; -import java.security.PrivilegedAction; import java.util.Arrays; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -31,11 +29,11 @@ import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; +import org.apache.hadoop.yarn.client.ClientRMProxy; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; -import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest; @@ -164,32 +162,10 @@ public class RMAdminCLI extends Configured implements Tool { } } - private static UserGroupInformation getUGI(Configuration conf - ) throws IOException { - return UserGroupInformation.getCurrentUser(); - } - private ResourceManagerAdministrationProtocol createAdminProtocol() throws IOException { // Get the current configuration final YarnConfiguration conf = new YarnConfiguration(getConf()); - - // Create the client - final InetSocketAddress addr = conf.getSocketAddr( - YarnConfiguration.RM_ADMIN_ADDRESS, - YarnConfiguration.DEFAULT_RM_ADMIN_ADDRESS, - YarnConfiguration.DEFAULT_RM_ADMIN_PORT); - final YarnRPC rpc = YarnRPC.create(conf); - - ResourceManagerAdministrationProtocol adminProtocol = - getUGI(conf).doAs(new PrivilegedAction() { - @Override - public ResourceManagerAdministrationProtocol run() { - return (ResourceManagerAdministrationProtocol) rpc.getProxy(ResourceManagerAdministrationProtocol.class, - addr, conf); - } - }); - - return adminProtocol; + return ClientRMProxy.createRMProxy(conf, ResourceManagerAdministrationProtocol.class); } private int refreshQueues() throws IOException, YarnException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java new file mode 100644 index 00000000000..e4493b5a469 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.client; + +import java.io.IOException; +import java.net.ConnectException; +import java.net.InetSocketAddress; +import java.security.PrivilegedAction; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.retry.RetryPolicies; +import org.apache.hadoop.io.retry.RetryPolicy; +import org.apache.hadoop.io.retry.RetryProxy; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.ipc.YarnRPC; + +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class RMProxy { + + private static final Log LOG = LogFactory.getLog(RMProxy.class); + + @SuppressWarnings("unchecked") + public static T createRMProxy(final Configuration conf, + final Class protocol, InetSocketAddress rmAddress) throws IOException { + RetryPolicy retryPolicy = createRetryPolicy(conf); + T proxy = RMProxy.getProxy(conf, protocol, rmAddress); + LOG.info("Connecting to ResourceManager at " + rmAddress); + return (T) RetryProxy.create(protocol, proxy, retryPolicy); + } + + @SuppressWarnings("unchecked") + protected static T getProxy(final Configuration conf, + final Class protocol, final InetSocketAddress rmAddress) + throws IOException { + return (T) UserGroupInformation.getCurrentUser().doAs( + new PrivilegedAction() { + + @Override + public T run() { + return (T) YarnRPC.create(conf).getProxy(protocol, rmAddress, conf); + } + }); + } + + public static RetryPolicy createRetryPolicy(Configuration conf) { + long rmConnectWaitMS = + conf.getInt( + YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_SECS, + YarnConfiguration.DEFAULT_RESOURCEMANAGER_CONNECT_MAX_WAIT_SECS) + * 1000; + long rmConnectionRetryIntervalMS = + conf.getLong( + YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS, + YarnConfiguration + .DEFAULT_RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS) + * 1000; + + if (rmConnectionRetryIntervalMS < 0) { + throw new YarnRuntimeException("Invalid Configuration. " + + YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS + + " should not be negative."); + } + + boolean waitForEver = (rmConnectWaitMS == -1000); + + if (waitForEver) { + return RetryPolicies.RETRY_FOREVER; + } else { + if (rmConnectWaitMS < 0) { + throw new YarnRuntimeException("Invalid Configuration. " + + YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_SECS + + " can be -1, but can not be other negative numbers"); + } + + // try connect once + if (rmConnectWaitMS < rmConnectionRetryIntervalMS) { + LOG.warn(YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_SECS + + " is smaller than " + + YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS + + ". Only try connect once."); + rmConnectWaitMS = 0; + } + } + + RetryPolicy retryPolicy = + RetryPolicies.retryUpToMaximumTimeWithFixedSleep(rmConnectWaitMS, + rmConnectionRetryIntervalMS, + TimeUnit.MILLISECONDS); + + Map, RetryPolicy> exceptionToPolicyMap = + new HashMap, RetryPolicy>(); + exceptionToPolicyMap.put(ConnectException.class, retryPolicy); + //TO DO: after HADOOP-9576, IOException can be changed to EOFException + exceptionToPolicyMap.put(IOException.class, retryPolicy); + + return RetryPolicies.retryByException(RetryPolicies.TRY_ONCE_THEN_FAIL, + exceptionToPolicyMap); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java new file mode 100644 index 00000000000..ef9154fde1b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java @@ -0,0 +1,55 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.api; + +import java.io.IOException; +import java.net.InetSocketAddress; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.client.RMProxy; +import org.apache.hadoop.yarn.conf.YarnConfiguration; + +public class ServerRMProxy extends RMProxy{ + + private static final Log LOG = LogFactory.getLog(ServerRMProxy.class); + + public static T createRMProxy(final Configuration conf, + final Class protocol) throws IOException { + InetSocketAddress rmAddress = getRMAddress(conf, protocol); + return createRMProxy(conf, protocol, rmAddress); + } + + private static InetSocketAddress getRMAddress(Configuration conf, Class protocol) { + if (protocol == ResourceTracker.class) { + return conf.getSocketAddr( + YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, + YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS, + YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT); + } + else { + String message = "Unsupported protocol found when creating the proxy " + + "connection to ResourceManager: " + + ((protocol != null) ? protocol.getClass().getName() : "null"); + LOG.error(message); + throw new IllegalStateException(message); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/ResourceTrackerPBClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/ResourceTrackerPBClientImpl.java index 396204cf2db..40f6874623f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/ResourceTrackerPBClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/ResourceTrackerPBClientImpl.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.api.impl.pb.client; +import java.io.Closeable; import java.io.IOException; import java.net.InetSocketAddress; @@ -41,7 +42,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterNodeMan import com.google.protobuf.ServiceException; -public class ResourceTrackerPBClientImpl implements ResourceTracker { +public class ResourceTrackerPBClientImpl implements ResourceTracker, Closeable { private ResourceTrackerPB proxy; @@ -50,7 +51,14 @@ private ResourceTrackerPB proxy; proxy = (ResourceTrackerPB)RPC.getProxy( ResourceTrackerPB.class, clientVersion, addr, conf); } - + + @Override + public void close() { + if(this.proxy != null) { + RPC.stopProxy(this.proxy); + } + } + @Override public RegisterNodeManagerResponse registerNodeManager( RegisterNodeManagerRequest request) throws YarnException, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 550cdc5a98f..b0e71e91563 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -19,7 +19,7 @@ package org.apache.hadoop.yarn.server.nodemanager; import java.io.IOException; -import java.net.InetSocketAddress; +import java.net.ConnectException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -33,6 +33,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -47,9 +48,9 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; -import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.server.api.ResourceManagerConstants; import org.apache.hadoop.yarn.server.api.ResourceTracker; +import org.apache.hadoop.yarn.server.api.ServerRMProxy; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; @@ -77,7 +78,6 @@ public class NodeStatusUpdaterImpl extends AbstractService implements private NodeId nodeId; private long nextHeartBeatInterval; private ResourceTracker resourceTracker; - private InetSocketAddress rmAddress; private Resource totalResource; private int httpPort; private volatile boolean isStopped; @@ -91,9 +91,6 @@ public class NodeStatusUpdaterImpl extends AbstractService implements private final NodeHealthCheckerService healthChecker; private final NodeManagerMetrics metrics; - private long rmConnectWaitMS; - private long rmConnectionRetryIntervalMS; - private boolean waitForEver; private Runnable statusUpdaterRunnable; private Thread statusUpdater; @@ -110,11 +107,6 @@ public class NodeStatusUpdaterImpl extends AbstractService implements @Override protected void serviceInit(Configuration conf) throws Exception { - this.rmAddress = conf.getSocketAddr( - YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, - YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS, - YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT); - int memoryMb = conf.getInt( YarnConfiguration.NM_PMEM_MB, YarnConfiguration.DEFAULT_NM_PMEM_MB); @@ -153,6 +145,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements try { // Registration has to be in start so that ContainerManager can get the // perNM tokens needed to authenticate ContainerTokens. + this.resourceTracker = getRMClient(); registerWithRM(); super.serviceStart(); startStatusUpdater(); @@ -167,6 +160,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements protected void serviceStop() throws Exception { // Interrupt the updater. this.isStopped = true; + stopRMProxy(); super.serviceStop(); } @@ -188,6 +182,13 @@ public class NodeStatusUpdaterImpl extends AbstractService implements } } + @VisibleForTesting + protected void stopRMProxy() { + if(this.resourceTracker != null) { + RPC.stopProxy(this.resourceTracker); + } + } + @Private protected boolean isTokenKeepAliveEnabled(Configuration conf) { return conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, @@ -195,93 +196,22 @@ public class NodeStatusUpdaterImpl extends AbstractService implements && UserGroupInformation.isSecurityEnabled(); } - protected ResourceTracker getRMClient() { + @VisibleForTesting + protected ResourceTracker getRMClient() throws IOException { Configuration conf = getConfig(); - YarnRPC rpc = YarnRPC.create(conf); - return (ResourceTracker) rpc.getProxy(ResourceTracker.class, rmAddress, - conf); + return ServerRMProxy.createRMProxy(conf, ResourceTracker.class); } @VisibleForTesting protected void registerWithRM() throws YarnException, IOException { - Configuration conf = getConfig(); - rmConnectWaitMS = - conf.getInt( - YarnConfiguration.RESOURCEMANAGER_CONNECT_WAIT_SECS, - YarnConfiguration.DEFAULT_RESOURCEMANAGER_CONNECT_WAIT_SECS) - * 1000; - rmConnectionRetryIntervalMS = - conf.getLong( - YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS, - YarnConfiguration - .DEFAULT_RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS) - * 1000; - - if(rmConnectionRetryIntervalMS < 0) { - throw new YarnRuntimeException("Invalid Configuration. " + - YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS + - " should not be negative."); - } - - waitForEver = (rmConnectWaitMS == -1000); - - if(! waitForEver) { - if(rmConnectWaitMS < 0) { - throw new YarnRuntimeException("Invalid Configuration. " + - YarnConfiguration.RESOURCEMANAGER_CONNECT_WAIT_SECS + - " can be -1, but can not be other negative numbers"); - } - - //try connect once - if(rmConnectWaitMS < rmConnectionRetryIntervalMS) { - LOG.warn(YarnConfiguration.RESOURCEMANAGER_CONNECT_WAIT_SECS - + " is smaller than " - + YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS - + ". Only try connect once."); - rmConnectWaitMS = 0; - } - } - - int rmRetryCount = 0; - long waitStartTime = System.currentTimeMillis(); - RegisterNodeManagerRequest request = recordFactory.newRecordInstance(RegisterNodeManagerRequest.class); request.setHttpPort(this.httpPort); request.setResource(this.totalResource); request.setNodeId(this.nodeId); - RegisterNodeManagerResponse regNMResponse; - - while(true) { - try { - rmRetryCount++; - LOG.info("Connecting to ResourceManager at " + this.rmAddress - + ". current no. of attempts is " + rmRetryCount); - this.resourceTracker = getRMClient(); - regNMResponse = - this.resourceTracker.registerNodeManager(request); - this.rmIdentifier = regNMResponse.getRMIdentifier(); - break; - } catch(Throwable e) { - LOG.warn("Trying to connect to ResourceManager, " + - "current no. of failed attempts is "+rmRetryCount); - if(System.currentTimeMillis() - waitStartTime < rmConnectWaitMS - || waitForEver) { - try { - LOG.info("Sleeping for " + rmConnectionRetryIntervalMS/1000 - + " seconds before next connection retry to RM"); - Thread.sleep(rmConnectionRetryIntervalMS); - } catch(InterruptedException ex) { - //done nothing - } - } else { - String errorMessage = "Failed to Connect to RM, " + - "no. of failed attempts is "+rmRetryCount; - LOG.error(errorMessage,e); - throw new YarnRuntimeException(errorMessage,e); - } - } - } + RegisterNodeManagerResponse regNMResponse = + resourceTracker.registerNodeManager(request); + this.rmIdentifier = regNMResponse.getRMIdentifier(); // if the Resourcemanager instructs NM to shutdown. if (NodeAction.SHUTDOWN.equals(regNMResponse.getNodeAction())) { String message = @@ -426,8 +356,6 @@ public class NodeStatusUpdaterImpl extends AbstractService implements // Send heartbeat try { NodeHeartbeatResponse response = null; - int rmRetryCount = 0; - long waitStartTime = System.currentTimeMillis(); NodeStatus nodeStatus = getNodeStatusAndUpdateContainersInContext(); nodeStatus.setResponseId(lastHeartBeatID); @@ -440,31 +368,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements request .setLastKnownNMTokenMasterKey(NodeStatusUpdaterImpl.this.context .getNMTokenSecretManager().getCurrentKey()); - while (!isStopped) { - try { - rmRetryCount++; - response = resourceTracker.nodeHeartbeat(request); - break; - } catch (Throwable e) { - LOG.warn("Trying to heartbeat to ResourceManager, " - + "current no. of failed attempts is " + rmRetryCount); - if(System.currentTimeMillis() - waitStartTime < rmConnectWaitMS - || waitForEver) { - try { - LOG.info("Sleeping for " + rmConnectionRetryIntervalMS/1000 - + " seconds before next heartbeat to RM"); - Thread.sleep(rmConnectionRetryIntervalMS); - } catch(InterruptedException ex) { - //done nothing - } - } else { - String errorMessage = "Failed to heartbeat to RM, " + - "no. of failed attempts is "+rmRetryCount; - LOG.error(errorMessage,e); - throw new YarnRuntimeException(errorMessage,e); - } - } - } + response = resourceTracker.nodeHeartbeat(request); //get next heartbeat interval from response nextHeartBeatInterval = response.getNextHeartBeatInterval(); updateMasterKeys(response); @@ -508,11 +412,11 @@ public class NodeStatusUpdaterImpl extends AbstractService implements dispatcher.getEventHandler().handle( new CMgrCompletedAppsEvent(appsToCleanup)); } - } catch (YarnRuntimeException e) { + } catch (ConnectException e) { //catch and throw the exception if tried MAX wait time to connect RM dispatcher.getEventHandler().handle( new NodeManagerEvent(NodeManagerEventType.SHUTDOWN)); - throw e; + throw new YarnRuntimeException(e); } catch (Throwable e) { // TODO Better error handling. Thread can die with the rest of the // NM still running. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/MockNodeStatusUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/MockNodeStatusUpdater.java index e93778e2987..a3e1faf310e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/MockNodeStatusUpdater.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/MockNodeStatusUpdater.java @@ -61,6 +61,10 @@ public class MockNodeStatusUpdater extends NodeStatusUpdaterImpl { protected ResourceTracker getRMClient() { return resourceTracker; } + @Override + protected void stopRMProxy() { + return; + } private static class MockResourceTracker implements ResourceTracker { private int heartBeatID; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java index 668b85b6511..294c93ed3b8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java @@ -107,6 +107,11 @@ public class TestEventFlow { return new LocalRMInterface(); }; + @Override + protected void stopRMProxy() { + return; + } + @Override protected void startStatusUpdater() { return; // Don't start any updating thread. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java index e17131fd3a1..2a3e3d579ca 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java @@ -41,6 +41,8 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.retry.RetryPolicy; +import org.apache.hadoop.io.retry.RetryProxy; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.service.ServiceOperations; @@ -53,6 +55,7 @@ import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.client.RMProxy; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; @@ -60,9 +63,9 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; -import org.apache.hadoop.yarn.ipc.RPCUtil; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.api.ResourceTracker; +import org.apache.hadoop.yarn.server.api.ServerRMProxy; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; @@ -103,11 +106,17 @@ public class TestNodeStatusUpdater { volatile int heartBeatID = 0; volatile Throwable nmStartError = null; private final List registeredNodes = new ArrayList(); - private final Configuration conf = createNMConfig(); + private boolean triggered = false; + private Configuration conf; private NodeManager nm; private boolean containerStatusBackupSuccessfully = true; private List completedContainerStatusList = new ArrayList(); + @Before + public void setUp() { + conf = createNMConfig(); + } + @After public void tearDown() { this.registeredNodes.clear(); @@ -274,6 +283,11 @@ public class TestNodeStatusUpdater { protected ResourceTracker getRMClient() { return resourceTracker; } + + @Override + protected void stopRMProxy() { + return; + } } private class MyNodeStatusUpdater2 extends NodeStatusUpdaterImpl { @@ -290,6 +304,10 @@ public class TestNodeStatusUpdater { return resourceTracker; } + @Override + protected void stopRMProxy() { + return; + } } private class MyNodeStatusUpdater3 extends NodeStatusUpdaterImpl { @@ -307,7 +325,12 @@ public class TestNodeStatusUpdater { protected ResourceTracker getRMClient() { return resourceTracker; } - + + @Override + protected void stopRMProxy() { + return; + } + @Override protected boolean isTokenKeepAliveEnabled(Configuration conf) { return true; @@ -315,21 +338,16 @@ public class TestNodeStatusUpdater { } private class MyNodeStatusUpdater4 extends NodeStatusUpdaterImpl { - public ResourceTracker resourceTracker = - new MyResourceTracker(this.context); + private Context context; - private long waitStartTime; private final long rmStartIntervalMS; private final boolean rmNeverStart; - private volatile boolean triggered = false; - private long durationWhenTriggered = -1; - + public ResourceTracker resourceTracker; public MyNodeStatusUpdater4(Context context, Dispatcher dispatcher, NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics, long rmStartIntervalMS, boolean rmNeverStart) { super(context, dispatcher, healthChecker, metrics); this.context = context; - this.waitStartTime = System.currentTimeMillis(); this.rmStartIntervalMS = rmStartIntervalMS; this.rmNeverStart = rmNeverStart; } @@ -337,25 +355,16 @@ public class TestNodeStatusUpdater { @Override protected void serviceStart() throws Exception { //record the startup time - this.waitStartTime = System.currentTimeMillis(); super.serviceStart(); } @Override - protected ResourceTracker getRMClient() { - if (!triggered) { - long t = System.currentTimeMillis(); - long duration = t - waitStartTime; - if (duration <= rmStartIntervalMS - || rmNeverStart) { - throw new YarnRuntimeException("Faking RM start failure as start " + - "delay timer has not expired."); - } else { - //triggering - triggered = true; - durationWhenTriggered = duration; - } - } + protected ResourceTracker getRMClient() throws IOException { + RetryPolicy retryPolicy = RMProxy.createRetryPolicy(conf); + resourceTracker = + (ResourceTracker) RetryProxy.create(ResourceTracker.class, + new MyResourceTracker6(this.context, rmStartIntervalMS, + rmNeverStart), retryPolicy); return resourceTracker; } @@ -363,37 +372,35 @@ public class TestNodeStatusUpdater { return triggered; } - private long getWaitStartTime() { - return waitStartTime; - } - - private long getDurationWhenTriggered() { - return durationWhenTriggered; - } - @Override - public String toString() { - return "MyNodeStatusUpdater4{" + - "rmNeverStart=" + rmNeverStart + - ", triggered=" + triggered + - ", duration=" + durationWhenTriggered + - ", rmStartIntervalMS=" + rmStartIntervalMS + - '}'; + protected void stopRMProxy() { + return; } } + + private class MyNodeStatusUpdater5 extends NodeStatusUpdaterImpl { private ResourceTracker resourceTracker; + private Configuration conf; public MyNodeStatusUpdater5(Context context, Dispatcher dispatcher, - NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { + NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics, Configuration conf) { super(context, dispatcher, healthChecker, metrics); resourceTracker = new MyResourceTracker5(); + this.conf = conf; } @Override protected ResourceTracker getRMClient() { - return resourceTracker; + RetryPolicy retryPolicy = RMProxy.createRetryPolicy(conf); + return (ResourceTracker) RetryProxy.create(ResourceTracker.class, + resourceTracker, retryPolicy); + } + + @Override + protected void stopRMProxy() { + return; } } @@ -417,15 +424,18 @@ public class TestNodeStatusUpdater { public boolean isStopped = false; private NodeStatusUpdater nodeStatusUpdater; private CyclicBarrier syncBarrier; - public MyNodeManager2 (CyclicBarrier syncBarrier) { + private Configuration conf; + + public MyNodeManager2 (CyclicBarrier syncBarrier, Configuration conf) { this.syncBarrier = syncBarrier; + this.conf = conf; } @Override protected NodeStatusUpdater createNodeStatusUpdater(Context context, Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { nodeStatusUpdater = new MyNodeStatusUpdater5(context, dispatcher, healthChecker, - metrics); + metrics, conf); return nodeStatusUpdater; } @@ -577,7 +587,7 @@ public class TestNodeStatusUpdater { .get(4).getState() == ContainerState.RUNNING && request.getNodeStatus().getContainersStatuses().get(4) .getContainerId().getId() == 5); - throw new YarnRuntimeException("Lost the heartbeat response"); + throw new java.net.ConnectException("Lost the heartbeat response"); } else if (heartBeatID == 2) { Assert.assertEquals(request.getNodeStatus().getContainersStatuses() .size(), 7); @@ -646,7 +656,63 @@ public class TestNodeStatusUpdater { public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) throws YarnException, IOException { heartBeatID++; - throw RPCUtil.getRemoteException("NodeHeartbeat exception"); + throw new java.net.ConnectException( + "NodeHeartbeat exception"); + } + } + + private class MyResourceTracker6 implements ResourceTracker { + + private final Context context; + private long rmStartIntervalMS; + private boolean rmNeverStart; + private final long waitStartTime; + + public MyResourceTracker6(Context context, long rmStartIntervalMS, + boolean rmNeverStart) { + this.context = context; + this.rmStartIntervalMS = rmStartIntervalMS; + this.rmNeverStart = rmNeverStart; + this.waitStartTime = System.currentTimeMillis(); + } + + @Override + public RegisterNodeManagerResponse registerNodeManager( + RegisterNodeManagerRequest request) throws YarnException, IOException, + IOException { + if (System.currentTimeMillis() - waitStartTime <= rmStartIntervalMS + || rmNeverStart) { + throw new java.net.ConnectException("Faking RM start failure as start " + + "delay timer has not expired."); + } else { + NodeId nodeId = request.getNodeId(); + Resource resource = request.getResource(); + LOG.info("Registering " + nodeId.toString()); + // NOTE: this really should be checking against the config value + InetSocketAddress expected = NetUtils.getConnectAddress( + conf.getSocketAddr(YarnConfiguration.NM_ADDRESS, null, -1)); + Assert.assertEquals(NetUtils.getHostPortString(expected), + nodeId.toString()); + Assert.assertEquals(5 * 1024, resource.getMemory()); + registeredNodes.add(nodeId); + + RegisterNodeManagerResponse response = recordFactory + .newRecordInstance(RegisterNodeManagerResponse.class); + triggered = true; + return response; + } + } + + @Override + public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) + throws YarnException, IOException { + NodeStatus nodeStatus = request.getNodeStatus(); + nodeStatus.setResponseId(heartBeatID++); + + NodeHeartbeatResponse nhResponse = YarnServerBuilderUtils. + newNodeHeartbeatResponse(heartBeatID, NodeAction.NORMAL, null, + null, null, null, 1000L); + return nhResponse; } } @@ -843,8 +909,7 @@ public class TestNodeStatusUpdater { final long connectionRetryIntervalSecs = 1; //Waiting for rmStartIntervalMS, RM will be started final long rmStartIntervalMS = 2*1000; - YarnConfiguration conf = createNMConfig(); - conf.setLong(YarnConfiguration.RESOURCEMANAGER_CONNECT_WAIT_SECS, + conf.setLong(YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_SECS, connectionWaitSecs); conf.setLong(YarnConfiguration .RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS, @@ -907,8 +972,6 @@ public class TestNodeStatusUpdater { } long duration = System.currentTimeMillis() - waitStartTime; MyNodeStatusUpdater4 myUpdater = (MyNodeStatusUpdater4) updater; - Assert.assertTrue("Updater was never started", - myUpdater.getWaitStartTime()>0); Assert.assertTrue("NM started before updater triggered", myUpdater.isTriggered()); Assert.assertTrue("NM should have connected to RM after " @@ -1037,13 +1100,13 @@ public class TestNodeStatusUpdater { final long connectionWaitSecs = 1; final long connectionRetryIntervalSecs = 1; YarnConfiguration conf = createNMConfig(); - conf.setLong(YarnConfiguration.RESOURCEMANAGER_CONNECT_WAIT_SECS, + conf.setLong(YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_SECS, connectionWaitSecs); conf.setLong(YarnConfiguration .RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS, connectionRetryIntervalSecs); CyclicBarrier syncBarrier = new CyclicBarrier(2); - nm = new MyNodeManager2(syncBarrier); + nm = new MyNodeManager2(syncBarrier, conf); nm.init(conf); nm.start(); try { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java index 83d21e16407..cfcf7f6445e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java @@ -117,6 +117,11 @@ public abstract class BaseContainerManagerTest { return new LocalRMInterface(); }; + @Override + protected void stopRMProxy() { + return; + } + @Override protected void startStatusUpdater() { return; // Don't start any updating thread. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java index cc529739dea..144b111f830 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java @@ -390,6 +390,11 @@ public class MiniYARNCluster extends CompositeService { } }; }; + + @Override + protected void stopRMProxy() { + return; + } }; }; }