Merge r1503933 from trunk to branch-2 for YARN-513. Create common proxy client for communicating with RM (Xuan Gong & Jian He via bikas)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1503935 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bikas Saha 2013-07-16 22:54:55 +00:00
parent 0055e40226
commit 3990e8b478
16 changed files with 433 additions and 266 deletions

View File

@ -465,6 +465,9 @@ Release 2.1.0-beta - 2013-07-02
YARN-521. Augment AM - RM client module to be able to request containers
only at specific locations (Sandy Ryza via bikas)
YARN-513. Create common proxy client for communicating with RM. (Xuan Gong
& Jian He via bikas)
OPTIMIZATIONS
YARN-512. Log aggregation root directory check is more expensive than it

View File

@ -655,17 +655,17 @@ public class YarnConfiguration extends Configuration {
public static final long DEFAULT_NM_PROCESS_KILL_WAIT_MS =
2000;
/** Max time to wait to establish a connection to RM when NM starts
/** Max time to wait to establish a connection to RM
*/
public static final String RESOURCEMANAGER_CONNECT_WAIT_SECS =
NM_PREFIX + "resourcemanager.connect.wait.secs";
public static final int DEFAULT_RESOURCEMANAGER_CONNECT_WAIT_SECS =
public static final String RESOURCEMANAGER_CONNECT_MAX_WAIT_SECS =
RM_PREFIX + "resourcemanager.connect.max.wait.secs";
public static final int DEFAULT_RESOURCEMANAGER_CONNECT_MAX_WAIT_SECS =
15*60;
/** Time interval between each NM attempt to connect to RM
/** Time interval between each attempt to connect to RM
*/
public static final String RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS =
NM_PREFIX + "resourcemanager.connect.retry_interval.secs";
RM_PREFIX + "resourcemanager.connect.retry_interval.secs";
public static final long DEFAULT_RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS
= 30;

View File

@ -0,0 +1,65 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.client;
import java.io.IOException;
import java.net.InetSocketAddress;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
public class ClientRMProxy<T> extends RMProxy<T>{
private static final Log LOG = LogFactory.getLog(ClientRMProxy.class);
public static <T> T createRMProxy(final Configuration conf,
final Class<T> protocol) throws IOException {
InetSocketAddress rmAddress = getRMAddress(conf, protocol);
return createRMProxy(conf, protocol, rmAddress);
}
private static InetSocketAddress getRMAddress(Configuration conf, Class<?> protocol) {
if (protocol == ApplicationClientProtocol.class) {
return conf.getSocketAddr(YarnConfiguration.RM_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADDRESS,
YarnConfiguration.DEFAULT_RM_PORT);
} else if (protocol == ResourceManagerAdministrationProtocol.class) {
return conf.getSocketAddr(
YarnConfiguration.RM_ADMIN_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADMIN_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADMIN_PORT);
} else if (protocol == ApplicationMasterProtocol.class) {
return conf.getSocketAddr(
YarnConfiguration.RM_SCHEDULER_ADDRESS,
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
} else {
String message = "Unsupported protocol found when creating the proxy " +
"connection to ResourceManager: " +
((protocol != null) ? protocol.getClass().getName() : "null");
LOG.error(message);
throw new IllegalStateException(message);
}
}
}

View File

@ -19,7 +19,6 @@
package org.apache.hadoop.yarn.client.api;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Set;
@ -54,25 +53,6 @@ public static YarnClient createYarnClient() {
return client;
}
/**
* Create a new instance of YarnClient.
*/
@Public
public static YarnClient createYarnClient(InetSocketAddress rmAddress) {
YarnClient client = new YarnClientImpl(rmAddress);
return client;
}
/**
* Create a new instance of YarnClient.
*/
@Public
public static YarnClient createYarnClient(String name,
InetSocketAddress rmAddress) {
YarnClient client = new YarnClientImpl(name, rmAddress);
return client;
}
@Private
protected YarnClient(String name) {
super(name);

View File

@ -19,8 +19,6 @@
package org.apache.hadoop.yarn.client.api.impl;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@ -42,7 +40,6 @@
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
@ -56,16 +53,16 @@
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.client.ClientRMProxy;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.api.InvalidContainerRequestException;
import org.apache.hadoop.yarn.client.api.NMTokenCache;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.util.RackResolver;
import com.google.common.annotations.VisibleForTesting;
@ -171,28 +168,11 @@ protected void serviceInit(Configuration conf) throws Exception {
@Override
protected void serviceStart() throws Exception {
final YarnConfiguration conf = new YarnConfiguration(getConfig());
final YarnRPC rpc = YarnRPC.create(conf);
final InetSocketAddress rmAddress = conf.getSocketAddr(
YarnConfiguration.RM_SCHEDULER_ADDRESS,
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
UserGroupInformation currentUser;
try {
currentUser = UserGroupInformation.getCurrentUser();
rmClient = ClientRMProxy.createRMProxy(conf, ApplicationMasterProtocol.class);
} catch (IOException e) {
throw new YarnRuntimeException(e);
}
// CurrentUser should already have AMToken loaded.
rmClient = currentUser.doAs(new PrivilegedAction<ApplicationMasterProtocol>() {
@Override
public ApplicationMasterProtocol run() {
return (ApplicationMasterProtocol) rpc.getProxy(ApplicationMasterProtocol.class, rmAddress,
conf);
}
});
LOG.debug("Connecting to ResourceManager at " + rmAddress);
super.serviceStart();
}

View File

@ -59,11 +59,12 @@
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
import org.apache.hadoop.yarn.client.ClientRMProxy;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.client.api.YarnClientApplication;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.util.Records;
import com.google.common.annotations.VisibleForTesting;
@ -81,16 +82,7 @@ public class YarnClientImpl extends YarnClient {
private static final String ROOT = "root";
public YarnClientImpl() {
this(null);
}
public YarnClientImpl(InetSocketAddress rmAddress) {
this(YarnClientImpl.class.getName(), rmAddress);
}
public YarnClientImpl(String name, InetSocketAddress rmAddress) {
super(name);
this.rmAddress = rmAddress;
super(YarnClientImpl.class.getName());
}
private static InetSocketAddress getRmAddress(Configuration conf) {
@ -100,9 +92,7 @@ private static InetSocketAddress getRmAddress(Configuration conf) {
@Override
protected void serviceInit(Configuration conf) throws Exception {
if (this.rmAddress == null) {
this.rmAddress = getRmAddress(conf);
}
this.rmAddress = getRmAddress(conf);
statePollIntervalMillis = conf.getLong(
YarnConfiguration.YARN_CLIENT_APP_SUBMISSION_POLL_INTERVAL_MS,
YarnConfiguration.DEFAULT_YARN_CLIENT_APP_SUBMISSION_POLL_INTERVAL_MS);
@ -111,12 +101,11 @@ protected void serviceInit(Configuration conf) throws Exception {
@Override
protected void serviceStart() throws Exception {
YarnRPC rpc = YarnRPC.create(getConfig());
this.rmClient = (ApplicationClientProtocol) rpc.getProxy(
ApplicationClientProtocol.class, rmAddress, getConfig());
if (LOG.isDebugEnabled()) {
LOG.debug("Connecting to ResourceManager at " + rmAddress);
try {
rmClient = ClientRMProxy.createRMProxy(getConfig(),
ApplicationClientProtocol.class);
} catch (IOException e) {
throw new YarnRuntimeException(e);
}
super.serviceStart();
}

View File

@ -19,8 +19,6 @@
package org.apache.hadoop.yarn.client.cli;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.security.PrivilegedAction;
import java.util.Arrays;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@ -31,11 +29,11 @@
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.yarn.client.ClientRMProxy;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest;
@ -164,32 +162,10 @@ private static void printUsage(String cmd) {
}
}
private static UserGroupInformation getUGI(Configuration conf
) throws IOException {
return UserGroupInformation.getCurrentUser();
}
private ResourceManagerAdministrationProtocol createAdminProtocol() throws IOException {
// Get the current configuration
final YarnConfiguration conf = new YarnConfiguration(getConf());
// Create the client
final InetSocketAddress addr = conf.getSocketAddr(
YarnConfiguration.RM_ADMIN_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADMIN_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADMIN_PORT);
final YarnRPC rpc = YarnRPC.create(conf);
ResourceManagerAdministrationProtocol adminProtocol =
getUGI(conf).doAs(new PrivilegedAction<ResourceManagerAdministrationProtocol>() {
@Override
public ResourceManagerAdministrationProtocol run() {
return (ResourceManagerAdministrationProtocol) rpc.getProxy(ResourceManagerAdministrationProtocol.class,
addr, conf);
}
});
return adminProtocol;
return ClientRMProxy.createRMProxy(conf, ResourceManagerAdministrationProtocol.class);
}
private int refreshQueues() throws IOException, YarnException {

View File

@ -0,0 +1,125 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.client;
import java.io.IOException;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.security.PrivilegedAction;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryProxy;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.ipc.YarnRPC;
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class RMProxy<T> {
private static final Log LOG = LogFactory.getLog(RMProxy.class);
@SuppressWarnings("unchecked")
public static <T> T createRMProxy(final Configuration conf,
final Class<T> protocol, InetSocketAddress rmAddress) throws IOException {
RetryPolicy retryPolicy = createRetryPolicy(conf);
T proxy = RMProxy.<T>getProxy(conf, protocol, rmAddress);
LOG.info("Connecting to ResourceManager at " + rmAddress);
return (T) RetryProxy.create(protocol, proxy, retryPolicy);
}
@SuppressWarnings("unchecked")
protected static <T> T getProxy(final Configuration conf,
final Class<T> protocol, final InetSocketAddress rmAddress)
throws IOException {
return (T) UserGroupInformation.getCurrentUser().doAs(
new PrivilegedAction<Object>() {
@Override
public T run() {
return (T) YarnRPC.create(conf).getProxy(protocol, rmAddress, conf);
}
});
}
public static RetryPolicy createRetryPolicy(Configuration conf) {
long rmConnectWaitMS =
conf.getInt(
YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_SECS,
YarnConfiguration.DEFAULT_RESOURCEMANAGER_CONNECT_MAX_WAIT_SECS)
* 1000;
long rmConnectionRetryIntervalMS =
conf.getLong(
YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS,
YarnConfiguration
.DEFAULT_RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS)
* 1000;
if (rmConnectionRetryIntervalMS < 0) {
throw new YarnRuntimeException("Invalid Configuration. " +
YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS +
" should not be negative.");
}
boolean waitForEver = (rmConnectWaitMS == -1000);
if (waitForEver) {
return RetryPolicies.RETRY_FOREVER;
} else {
if (rmConnectWaitMS < 0) {
throw new YarnRuntimeException("Invalid Configuration. "
+ YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_SECS
+ " can be -1, but can not be other negative numbers");
}
// try connect once
if (rmConnectWaitMS < rmConnectionRetryIntervalMS) {
LOG.warn(YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_SECS
+ " is smaller than "
+ YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS
+ ". Only try connect once.");
rmConnectWaitMS = 0;
}
}
RetryPolicy retryPolicy =
RetryPolicies.retryUpToMaximumTimeWithFixedSleep(rmConnectWaitMS,
rmConnectionRetryIntervalMS,
TimeUnit.MILLISECONDS);
Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap =
new HashMap<Class<? extends Exception>, RetryPolicy>();
exceptionToPolicyMap.put(ConnectException.class, retryPolicy);
//TO DO: after HADOOP-9576, IOException can be changed to EOFException
exceptionToPolicyMap.put(IOException.class, retryPolicy);
return RetryPolicies.retryByException(RetryPolicies.TRY_ONCE_THEN_FAIL,
exceptionToPolicyMap);
}
}

View File

@ -0,0 +1,55 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.api;
import java.io.IOException;
import java.net.InetSocketAddress;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.client.RMProxy;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
public class ServerRMProxy<T> extends RMProxy<T>{
private static final Log LOG = LogFactory.getLog(ServerRMProxy.class);
public static <T> T createRMProxy(final Configuration conf,
final Class<T> protocol) throws IOException {
InetSocketAddress rmAddress = getRMAddress(conf, protocol);
return createRMProxy(conf, protocol, rmAddress);
}
private static InetSocketAddress getRMAddress(Configuration conf, Class<?> protocol) {
if (protocol == ResourceTracker.class) {
return conf.getSocketAddr(
YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS,
YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT);
}
else {
String message = "Unsupported protocol found when creating the proxy " +
"connection to ResourceManager: " +
((protocol != null) ? protocol.getClass().getName() : "null");
LOG.error(message);
throw new IllegalStateException(message);
}
}
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.api.impl.pb.client;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
@ -41,7 +42,7 @@
import com.google.protobuf.ServiceException;
public class ResourceTrackerPBClientImpl implements ResourceTracker {
public class ResourceTrackerPBClientImpl implements ResourceTracker, Closeable {
private ResourceTrackerPB proxy;
@ -50,7 +51,14 @@ public ResourceTrackerPBClientImpl(long clientVersion, InetSocketAddress addr, C
proxy = (ResourceTrackerPB)RPC.getProxy(
ResourceTrackerPB.class, clientVersion, addr, conf);
}
@Override
public void close() {
if(this.proxy != null) {
RPC.stopProxy(this.proxy);
}
}
@Override
public RegisterNodeManagerResponse registerNodeManager(
RegisterNodeManagerRequest request) throws YarnException,

View File

@ -19,7 +19,7 @@
package org.apache.hadoop.yarn.server.nodemanager;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ConnectException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@ -33,6 +33,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -47,9 +48,9 @@
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.server.api.ResourceManagerConstants;
import org.apache.hadoop.yarn.server.api.ResourceTracker;
import org.apache.hadoop.yarn.server.api.ServerRMProxy;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
@ -77,7 +78,6 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
private NodeId nodeId;
private long nextHeartBeatInterval;
private ResourceTracker resourceTracker;
private InetSocketAddress rmAddress;
private Resource totalResource;
private int httpPort;
private volatile boolean isStopped;
@ -91,9 +91,6 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
private final NodeHealthCheckerService healthChecker;
private final NodeManagerMetrics metrics;
private long rmConnectWaitMS;
private long rmConnectionRetryIntervalMS;
private boolean waitForEver;
private Runnable statusUpdaterRunnable;
private Thread statusUpdater;
@ -110,11 +107,6 @@ public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher,
@Override
protected void serviceInit(Configuration conf) throws Exception {
this.rmAddress = conf.getSocketAddr(
YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS,
YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT);
int memoryMb =
conf.getInt(
YarnConfiguration.NM_PMEM_MB, YarnConfiguration.DEFAULT_NM_PMEM_MB);
@ -153,6 +145,7 @@ protected void serviceStart() throws Exception {
try {
// Registration has to be in start so that ContainerManager can get the
// perNM tokens needed to authenticate ContainerTokens.
this.resourceTracker = getRMClient();
registerWithRM();
super.serviceStart();
startStatusUpdater();
@ -167,6 +160,7 @@ protected void serviceStart() throws Exception {
protected void serviceStop() throws Exception {
// Interrupt the updater.
this.isStopped = true;
stopRMProxy();
super.serviceStop();
}
@ -188,6 +182,13 @@ protected void rebootNodeStatusUpdater() {
}
}
@VisibleForTesting
protected void stopRMProxy() {
if(this.resourceTracker != null) {
RPC.stopProxy(this.resourceTracker);
}
}
@Private
protected boolean isTokenKeepAliveEnabled(Configuration conf) {
return conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
@ -195,93 +196,22 @@ protected boolean isTokenKeepAliveEnabled(Configuration conf) {
&& UserGroupInformation.isSecurityEnabled();
}
protected ResourceTracker getRMClient() {
@VisibleForTesting
protected ResourceTracker getRMClient() throws IOException {
Configuration conf = getConfig();
YarnRPC rpc = YarnRPC.create(conf);
return (ResourceTracker) rpc.getProxy(ResourceTracker.class, rmAddress,
conf);
return ServerRMProxy.createRMProxy(conf, ResourceTracker.class);
}
@VisibleForTesting
protected void registerWithRM() throws YarnException, IOException {
Configuration conf = getConfig();
rmConnectWaitMS =
conf.getInt(
YarnConfiguration.RESOURCEMANAGER_CONNECT_WAIT_SECS,
YarnConfiguration.DEFAULT_RESOURCEMANAGER_CONNECT_WAIT_SECS)
* 1000;
rmConnectionRetryIntervalMS =
conf.getLong(
YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS,
YarnConfiguration
.DEFAULT_RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS)
* 1000;
if(rmConnectionRetryIntervalMS < 0) {
throw new YarnRuntimeException("Invalid Configuration. " +
YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS +
" should not be negative.");
}
waitForEver = (rmConnectWaitMS == -1000);
if(! waitForEver) {
if(rmConnectWaitMS < 0) {
throw new YarnRuntimeException("Invalid Configuration. " +
YarnConfiguration.RESOURCEMANAGER_CONNECT_WAIT_SECS +
" can be -1, but can not be other negative numbers");
}
//try connect once
if(rmConnectWaitMS < rmConnectionRetryIntervalMS) {
LOG.warn(YarnConfiguration.RESOURCEMANAGER_CONNECT_WAIT_SECS
+ " is smaller than "
+ YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS
+ ". Only try connect once.");
rmConnectWaitMS = 0;
}
}
int rmRetryCount = 0;
long waitStartTime = System.currentTimeMillis();
RegisterNodeManagerRequest request =
recordFactory.newRecordInstance(RegisterNodeManagerRequest.class);
request.setHttpPort(this.httpPort);
request.setResource(this.totalResource);
request.setNodeId(this.nodeId);
RegisterNodeManagerResponse regNMResponse;
while(true) {
try {
rmRetryCount++;
LOG.info("Connecting to ResourceManager at " + this.rmAddress
+ ". current no. of attempts is " + rmRetryCount);
this.resourceTracker = getRMClient();
regNMResponse =
this.resourceTracker.registerNodeManager(request);
this.rmIdentifier = regNMResponse.getRMIdentifier();
break;
} catch(Throwable e) {
LOG.warn("Trying to connect to ResourceManager, " +
"current no. of failed attempts is "+rmRetryCount);
if(System.currentTimeMillis() - waitStartTime < rmConnectWaitMS
|| waitForEver) {
try {
LOG.info("Sleeping for " + rmConnectionRetryIntervalMS/1000
+ " seconds before next connection retry to RM");
Thread.sleep(rmConnectionRetryIntervalMS);
} catch(InterruptedException ex) {
//done nothing
}
} else {
String errorMessage = "Failed to Connect to RM, " +
"no. of failed attempts is "+rmRetryCount;
LOG.error(errorMessage,e);
throw new YarnRuntimeException(errorMessage,e);
}
}
}
RegisterNodeManagerResponse regNMResponse =
resourceTracker.registerNodeManager(request);
this.rmIdentifier = regNMResponse.getRMIdentifier();
// if the Resourcemanager instructs NM to shutdown.
if (NodeAction.SHUTDOWN.equals(regNMResponse.getNodeAction())) {
String message =
@ -426,8 +356,6 @@ public void run() {
// Send heartbeat
try {
NodeHeartbeatResponse response = null;
int rmRetryCount = 0;
long waitStartTime = System.currentTimeMillis();
NodeStatus nodeStatus = getNodeStatusAndUpdateContainersInContext();
nodeStatus.setResponseId(lastHeartBeatID);
@ -440,31 +368,7 @@ public void run() {
request
.setLastKnownNMTokenMasterKey(NodeStatusUpdaterImpl.this.context
.getNMTokenSecretManager().getCurrentKey());
while (!isStopped) {
try {
rmRetryCount++;
response = resourceTracker.nodeHeartbeat(request);
break;
} catch (Throwable e) {
LOG.warn("Trying to heartbeat to ResourceManager, "
+ "current no. of failed attempts is " + rmRetryCount);
if(System.currentTimeMillis() - waitStartTime < rmConnectWaitMS
|| waitForEver) {
try {
LOG.info("Sleeping for " + rmConnectionRetryIntervalMS/1000
+ " seconds before next heartbeat to RM");
Thread.sleep(rmConnectionRetryIntervalMS);
} catch(InterruptedException ex) {
//done nothing
}
} else {
String errorMessage = "Failed to heartbeat to RM, " +
"no. of failed attempts is "+rmRetryCount;
LOG.error(errorMessage,e);
throw new YarnRuntimeException(errorMessage,e);
}
}
}
response = resourceTracker.nodeHeartbeat(request);
//get next heartbeat interval from response
nextHeartBeatInterval = response.getNextHeartBeatInterval();
updateMasterKeys(response);
@ -508,11 +412,11 @@ public void run() {
dispatcher.getEventHandler().handle(
new CMgrCompletedAppsEvent(appsToCleanup));
}
} catch (YarnRuntimeException e) {
} catch (ConnectException e) {
//catch and throw the exception if tried MAX wait time to connect RM
dispatcher.getEventHandler().handle(
new NodeManagerEvent(NodeManagerEventType.SHUTDOWN));
throw e;
throw new YarnRuntimeException(e);
} catch (Throwable e) {
// TODO Better error handling. Thread can die with the rest of the
// NM still running.

View File

@ -61,6 +61,10 @@ public MockNodeStatusUpdater(Context context, Dispatcher dispatcher,
protected ResourceTracker getRMClient() {
return resourceTracker;
}
@Override
protected void stopRMProxy() {
return;
}
private static class MockResourceTracker implements ResourceTracker {
private int heartBeatID;

View File

@ -107,6 +107,11 @@ protected ResourceTracker getRMClient() {
return new LocalRMInterface();
};
@Override
protected void stopRMProxy() {
return;
}
@Override
protected void startStatusUpdater() {
return; // Don't start any updating thread.

View File

@ -41,6 +41,8 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryProxy;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.service.ServiceOperations;
@ -53,6 +55,7 @@
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.client.RMProxy;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
@ -60,9 +63,9 @@
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.api.ResourceTracker;
import org.apache.hadoop.yarn.server.api.ServerRMProxy;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
@ -103,11 +106,17 @@ public class TestNodeStatusUpdater {
volatile int heartBeatID = 0;
volatile Throwable nmStartError = null;
private final List<NodeId> registeredNodes = new ArrayList<NodeId>();
private final Configuration conf = createNMConfig();
private boolean triggered = false;
private Configuration conf;
private NodeManager nm;
private boolean containerStatusBackupSuccessfully = true;
private List<ContainerStatus> completedContainerStatusList = new ArrayList<ContainerStatus>();
@Before
public void setUp() {
conf = createNMConfig();
}
@After
public void tearDown() {
this.registeredNodes.clear();
@ -274,6 +283,11 @@ public MyNodeStatusUpdater(Context context, Dispatcher dispatcher,
protected ResourceTracker getRMClient() {
return resourceTracker;
}
@Override
protected void stopRMProxy() {
return;
}
}
private class MyNodeStatusUpdater2 extends NodeStatusUpdaterImpl {
@ -290,6 +304,10 @@ protected ResourceTracker getRMClient() {
return resourceTracker;
}
@Override
protected void stopRMProxy() {
return;
}
}
private class MyNodeStatusUpdater3 extends NodeStatusUpdaterImpl {
@ -307,7 +325,12 @@ public MyNodeStatusUpdater3(Context context, Dispatcher dispatcher,
protected ResourceTracker getRMClient() {
return resourceTracker;
}
@Override
protected void stopRMProxy() {
return;
}
@Override
protected boolean isTokenKeepAliveEnabled(Configuration conf) {
return true;
@ -315,21 +338,16 @@ protected boolean isTokenKeepAliveEnabled(Configuration conf) {
}
private class MyNodeStatusUpdater4 extends NodeStatusUpdaterImpl {
public ResourceTracker resourceTracker =
new MyResourceTracker(this.context);
private Context context;
private long waitStartTime;
private final long rmStartIntervalMS;
private final boolean rmNeverStart;
private volatile boolean triggered = false;
private long durationWhenTriggered = -1;
public ResourceTracker resourceTracker;
public MyNodeStatusUpdater4(Context context, Dispatcher dispatcher,
NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics,
long rmStartIntervalMS, boolean rmNeverStart) {
super(context, dispatcher, healthChecker, metrics);
this.context = context;
this.waitStartTime = System.currentTimeMillis();
this.rmStartIntervalMS = rmStartIntervalMS;
this.rmNeverStart = rmNeverStart;
}
@ -337,25 +355,16 @@ public MyNodeStatusUpdater4(Context context, Dispatcher dispatcher,
@Override
protected void serviceStart() throws Exception {
//record the startup time
this.waitStartTime = System.currentTimeMillis();
super.serviceStart();
}
@Override
protected ResourceTracker getRMClient() {
if (!triggered) {
long t = System.currentTimeMillis();
long duration = t - waitStartTime;
if (duration <= rmStartIntervalMS
|| rmNeverStart) {
throw new YarnRuntimeException("Faking RM start failure as start " +
"delay timer has not expired.");
} else {
//triggering
triggered = true;
durationWhenTriggered = duration;
}
}
protected ResourceTracker getRMClient() throws IOException {
RetryPolicy retryPolicy = RMProxy.createRetryPolicy(conf);
resourceTracker =
(ResourceTracker) RetryProxy.create(ResourceTracker.class,
new MyResourceTracker6(this.context, rmStartIntervalMS,
rmNeverStart), retryPolicy);
return resourceTracker;
}
@ -363,37 +372,35 @@ private boolean isTriggered() {
return triggered;
}
private long getWaitStartTime() {
return waitStartTime;
}
private long getDurationWhenTriggered() {
return durationWhenTriggered;
}
@Override
public String toString() {
return "MyNodeStatusUpdater4{" +
"rmNeverStart=" + rmNeverStart +
", triggered=" + triggered +
", duration=" + durationWhenTriggered +
", rmStartIntervalMS=" + rmStartIntervalMS +
'}';
protected void stopRMProxy() {
return;
}
}
private class MyNodeStatusUpdater5 extends NodeStatusUpdaterImpl {
private ResourceTracker resourceTracker;
private Configuration conf;
public MyNodeStatusUpdater5(Context context, Dispatcher dispatcher,
NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics, Configuration conf) {
super(context, dispatcher, healthChecker, metrics);
resourceTracker = new MyResourceTracker5();
this.conf = conf;
}
@Override
protected ResourceTracker getRMClient() {
return resourceTracker;
RetryPolicy retryPolicy = RMProxy.createRetryPolicy(conf);
return (ResourceTracker) RetryProxy.create(ResourceTracker.class,
resourceTracker, retryPolicy);
}
@Override
protected void stopRMProxy() {
return;
}
}
@ -417,15 +424,18 @@ private class MyNodeManager2 extends NodeManager {
public boolean isStopped = false;
private NodeStatusUpdater nodeStatusUpdater;
private CyclicBarrier syncBarrier;
public MyNodeManager2 (CyclicBarrier syncBarrier) {
private Configuration conf;
public MyNodeManager2 (CyclicBarrier syncBarrier, Configuration conf) {
this.syncBarrier = syncBarrier;
this.conf = conf;
}
@Override
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
nodeStatusUpdater =
new MyNodeStatusUpdater5(context, dispatcher, healthChecker,
metrics);
metrics, conf);
return nodeStatusUpdater;
}
@ -577,7 +587,7 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
.get(4).getState() == ContainerState.RUNNING
&& request.getNodeStatus().getContainersStatuses().get(4)
.getContainerId().getId() == 5);
throw new YarnRuntimeException("Lost the heartbeat response");
throw new java.net.ConnectException("Lost the heartbeat response");
} else if (heartBeatID == 2) {
Assert.assertEquals(request.getNodeStatus().getContainersStatuses()
.size(), 7);
@ -646,7 +656,63 @@ public RegisterNodeManagerResponse registerNodeManager(
public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
throws YarnException, IOException {
heartBeatID++;
throw RPCUtil.getRemoteException("NodeHeartbeat exception");
throw new java.net.ConnectException(
"NodeHeartbeat exception");
}
}
private class MyResourceTracker6 implements ResourceTracker {
private final Context context;
private long rmStartIntervalMS;
private boolean rmNeverStart;
private final long waitStartTime;
public MyResourceTracker6(Context context, long rmStartIntervalMS,
boolean rmNeverStart) {
this.context = context;
this.rmStartIntervalMS = rmStartIntervalMS;
this.rmNeverStart = rmNeverStart;
this.waitStartTime = System.currentTimeMillis();
}
@Override
public RegisterNodeManagerResponse registerNodeManager(
RegisterNodeManagerRequest request) throws YarnException, IOException,
IOException {
if (System.currentTimeMillis() - waitStartTime <= rmStartIntervalMS
|| rmNeverStart) {
throw new java.net.ConnectException("Faking RM start failure as start "
+ "delay timer has not expired.");
} else {
NodeId nodeId = request.getNodeId();
Resource resource = request.getResource();
LOG.info("Registering " + nodeId.toString());
// NOTE: this really should be checking against the config value
InetSocketAddress expected = NetUtils.getConnectAddress(
conf.getSocketAddr(YarnConfiguration.NM_ADDRESS, null, -1));
Assert.assertEquals(NetUtils.getHostPortString(expected),
nodeId.toString());
Assert.assertEquals(5 * 1024, resource.getMemory());
registeredNodes.add(nodeId);
RegisterNodeManagerResponse response = recordFactory
.newRecordInstance(RegisterNodeManagerResponse.class);
triggered = true;
return response;
}
}
@Override
public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
throws YarnException, IOException {
NodeStatus nodeStatus = request.getNodeStatus();
nodeStatus.setResponseId(heartBeatID++);
NodeHeartbeatResponse nhResponse = YarnServerBuilderUtils.
newNodeHeartbeatResponse(heartBeatID, NodeAction.NORMAL, null,
null, null, null, 1000L);
return nhResponse;
}
}
@ -843,8 +909,7 @@ public void testNMConnectionToRM() throws Exception {
final long connectionRetryIntervalSecs = 1;
//Waiting for rmStartIntervalMS, RM will be started
final long rmStartIntervalMS = 2*1000;
YarnConfiguration conf = createNMConfig();
conf.setLong(YarnConfiguration.RESOURCEMANAGER_CONNECT_WAIT_SECS,
conf.setLong(YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_SECS,
connectionWaitSecs);
conf.setLong(YarnConfiguration
.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS,
@ -907,8 +972,6 @@ protected NodeStatusUpdater createUpdater(Context context,
}
long duration = System.currentTimeMillis() - waitStartTime;
MyNodeStatusUpdater4 myUpdater = (MyNodeStatusUpdater4) updater;
Assert.assertTrue("Updater was never started",
myUpdater.getWaitStartTime()>0);
Assert.assertTrue("NM started before updater triggered",
myUpdater.isTriggered());
Assert.assertTrue("NM should have connected to RM after "
@ -1037,13 +1100,13 @@ public void testNodeStatusUpdaterRetryAndNMShutdown()
final long connectionWaitSecs = 1;
final long connectionRetryIntervalSecs = 1;
YarnConfiguration conf = createNMConfig();
conf.setLong(YarnConfiguration.RESOURCEMANAGER_CONNECT_WAIT_SECS,
conf.setLong(YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_SECS,
connectionWaitSecs);
conf.setLong(YarnConfiguration
.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS,
connectionRetryIntervalSecs);
CyclicBarrier syncBarrier = new CyclicBarrier(2);
nm = new MyNodeManager2(syncBarrier);
nm = new MyNodeManager2(syncBarrier, conf);
nm.init(conf);
nm.start();
try {

View File

@ -117,6 +117,11 @@ protected ResourceTracker getRMClient() {
return new LocalRMInterface();
};
@Override
protected void stopRMProxy() {
return;
}
@Override
protected void startStatusUpdater() {
return; // Don't start any updating thread.

View File

@ -390,6 +390,11 @@ public RegisterNodeManagerResponse registerNodeManager(
}
};
};
@Override
protected void stopRMProxy() {
return;
}
};
};
}