YARN-2613. Support retry in NMClient for rolling-upgrades. (Contributed by Jian He)
This commit is contained in:
parent
8dfe54f6d3
commit
0708827a93
|
@ -121,6 +121,9 @@ Release 2.6.0 - UNRELEASED
|
||||||
YARN-1972. Added a secure container-executor for Windows. (Remus Rusanu via
|
YARN-1972. Added a secure container-executor for Windows. (Remus Rusanu via
|
||||||
vinodkv)
|
vinodkv)
|
||||||
|
|
||||||
|
YARN-2613. Support retry in NMClient for rolling-upgrades. (Jian He via
|
||||||
|
junping_du)
|
||||||
|
|
||||||
IMPROVEMENTS
|
IMPROVEMENTS
|
||||||
|
|
||||||
YARN-2197. Add a link to YARN CHANGES.txt in the left side of doc
|
YARN-2197. Add a link to YARN CHANGES.txt in the left side of doc
|
||||||
|
|
|
@ -1033,7 +1033,7 @@ public class YarnConfiguration extends Configuration {
|
||||||
/** Max time to wait to establish a connection to RM */
|
/** Max time to wait to establish a connection to RM */
|
||||||
public static final String RESOURCEMANAGER_CONNECT_MAX_WAIT_MS =
|
public static final String RESOURCEMANAGER_CONNECT_MAX_WAIT_MS =
|
||||||
RM_PREFIX + "connect.max-wait.ms";
|
RM_PREFIX + "connect.max-wait.ms";
|
||||||
public static final int DEFAULT_RESOURCEMANAGER_CONNECT_MAX_WAIT_MS =
|
public static final long DEFAULT_RESOURCEMANAGER_CONNECT_MAX_WAIT_MS =
|
||||||
15 * 60 * 1000;
|
15 * 60 * 1000;
|
||||||
|
|
||||||
/** Time interval between each attempt to connect to RM */
|
/** Time interval between each attempt to connect to RM */
|
||||||
|
@ -1370,6 +1370,18 @@ public class YarnConfiguration extends Configuration {
|
||||||
YARN_PREFIX + "client.max-nodemanagers-proxies";
|
YARN_PREFIX + "client.max-nodemanagers-proxies";
|
||||||
public static final int DEFAULT_NM_CLIENT_MAX_NM_PROXIES = 500;
|
public static final int DEFAULT_NM_CLIENT_MAX_NM_PROXIES = 500;
|
||||||
|
|
||||||
|
/** Max time to wait to establish a connection to NM */
|
||||||
|
public static final String CLIENT_NM_CONNECT_MAX_WAIT_MS =
|
||||||
|
YARN_PREFIX + "client.nodemanager-connect.max-wait-ms";
|
||||||
|
public static final long DEFAULT_CLIENT_NM_CONNECT_MAX_WAIT_MS =
|
||||||
|
15 * 60 * 1000;
|
||||||
|
|
||||||
|
/** Time interval between each attempt to connect to NM */
|
||||||
|
public static final String CLIENT_NM_CONNECT_RETRY_INTERVAL_MS =
|
||||||
|
YARN_PREFIX + "client.nodemanager-connect.retry-interval-ms";
|
||||||
|
public static final long DEFAULT_CLIENT_NM_CONNECT_RETRY_INTERVAL_MS
|
||||||
|
= 10 * 1000;
|
||||||
|
|
||||||
public static final String YARN_HTTP_POLICY_KEY = YARN_PREFIX + "http.policy";
|
public static final String YARN_HTTP_POLICY_KEY = YARN_PREFIX + "http.policy";
|
||||||
public static final String YARN_HTTP_POLICY_DEFAULT = HttpConfig.Policy.HTTP_ONLY
|
public static final String YARN_HTTP_POLICY_DEFAULT = HttpConfig.Policy.HTTP_ONLY
|
||||||
.name();
|
.name();
|
||||||
|
|
|
@ -19,7 +19,6 @@
|
||||||
package org.apache.hadoop.yarn.client.api.impl;
|
package org.apache.hadoop.yarn.client.api.impl;
|
||||||
|
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.security.PrivilegedAction;
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.LinkedHashMap;
|
import java.util.LinkedHashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -35,6 +34,7 @@ import org.apache.hadoop.security.token.SecretManager.InvalidToken;
|
||||||
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
|
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.Token;
|
import org.apache.hadoop.yarn.api.records.Token;
|
||||||
|
import org.apache.hadoop.yarn.client.NMProxy;
|
||||||
import org.apache.hadoop.yarn.client.api.NMTokenCache;
|
import org.apache.hadoop.yarn.client.api.NMTokenCache;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||||
|
@ -219,16 +219,8 @@ public class ContainerManagementProtocolProxy {
|
||||||
ConverterUtils.convertFromYarn(token, cmAddr);
|
ConverterUtils.convertFromYarn(token, cmAddr);
|
||||||
user.addToken(nmToken);
|
user.addToken(nmToken);
|
||||||
|
|
||||||
ContainerManagementProtocol proxy = user
|
return NMProxy.createNMProxy(conf, ContainerManagementProtocol.class,
|
||||||
.doAs(new PrivilegedAction<ContainerManagementProtocol>() {
|
user, rpc, cmAddr);
|
||||||
|
|
||||||
@Override
|
|
||||||
public ContainerManagementProtocol run() {
|
|
||||||
return (ContainerManagementProtocol) rpc.getProxy(
|
|
||||||
ContainerManagementProtocol.class, cmAddr, conf);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
return proxy;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public ContainerManagementProtocol getContainerManagementProtocol() {
|
public ContainerManagementProtocol getContainerManagementProtocol() {
|
||||||
|
|
|
@ -0,0 +1,49 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.client;
|
||||||
|
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.io.retry.RetryPolicy;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||||
|
|
||||||
|
@Public
|
||||||
|
@Unstable
|
||||||
|
public class NMProxy extends ServerProxy {
|
||||||
|
|
||||||
|
public static <T> T createNMProxy(final Configuration conf,
|
||||||
|
final Class<T> protocol, final UserGroupInformation ugi,
|
||||||
|
final YarnRPC rpc, final InetSocketAddress serverAddress) {
|
||||||
|
|
||||||
|
RetryPolicy retryPolicy =
|
||||||
|
createRetryPolicy(conf,
|
||||||
|
YarnConfiguration.CLIENT_NM_CONNECT_MAX_WAIT_MS,
|
||||||
|
YarnConfiguration.DEFAULT_CLIENT_NM_CONNECT_MAX_WAIT_MS,
|
||||||
|
YarnConfiguration.CLIENT_NM_CONNECT_RETRY_INTERVAL_MS,
|
||||||
|
YarnConfiguration.DEFAULT_CLIENT_NM_CONNECT_RETRY_INTERVAL_MS);
|
||||||
|
|
||||||
|
return createRetriableProxy(conf, protocol, ugi, rpc, serverAddress,
|
||||||
|
retryPolicy);
|
||||||
|
}
|
||||||
|
}
|
|
@ -21,6 +21,9 @@ package org.apache.hadoop.yarn.client;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.ConnectException;
|
import java.net.ConnectException;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
|
import java.net.NoRouteToHostException;
|
||||||
|
import java.net.SocketException;
|
||||||
|
import java.net.UnknownHostException;
|
||||||
import java.security.PrivilegedAction;
|
import java.security.PrivilegedAction;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -35,6 +38,8 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.io.retry.RetryPolicies;
|
import org.apache.hadoop.io.retry.RetryPolicies;
|
||||||
import org.apache.hadoop.io.retry.RetryPolicy;
|
import org.apache.hadoop.io.retry.RetryPolicy;
|
||||||
import org.apache.hadoop.io.retry.RetryProxy;
|
import org.apache.hadoop.io.retry.RetryProxy;
|
||||||
|
import org.apache.hadoop.ipc.RetriableException;
|
||||||
|
import org.apache.hadoop.net.ConnectTimeoutException;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.util.ReflectionUtils;
|
import org.apache.hadoop.util.ReflectionUtils;
|
||||||
import org.apache.hadoop.yarn.conf.HAUtil;
|
import org.apache.hadoop.yarn.conf.HAUtil;
|
||||||
|
@ -165,7 +170,7 @@ public class RMProxy<T> {
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public static RetryPolicy createRetryPolicy(Configuration conf) {
|
public static RetryPolicy createRetryPolicy(Configuration conf) {
|
||||||
long rmConnectWaitMS =
|
long rmConnectWaitMS =
|
||||||
conf.getInt(
|
conf.getLong(
|
||||||
YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS,
|
YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS,
|
||||||
YarnConfiguration.DEFAULT_RESOURCEMANAGER_CONNECT_MAX_WAIT_MS);
|
YarnConfiguration.DEFAULT_RESOURCEMANAGER_CONNECT_MAX_WAIT_MS);
|
||||||
long rmConnectionRetryIntervalMS =
|
long rmConnectionRetryIntervalMS =
|
||||||
|
@ -234,9 +239,14 @@ public class RMProxy<T> {
|
||||||
|
|
||||||
Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap =
|
Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap =
|
||||||
new HashMap<Class<? extends Exception>, RetryPolicy>();
|
new HashMap<Class<? extends Exception>, RetryPolicy>();
|
||||||
|
|
||||||
exceptionToPolicyMap.put(ConnectException.class, retryPolicy);
|
exceptionToPolicyMap.put(ConnectException.class, retryPolicy);
|
||||||
//TO DO: after HADOOP-9576, IOException can be changed to EOFException
|
exceptionToPolicyMap.put(NoRouteToHostException.class, retryPolicy);
|
||||||
exceptionToPolicyMap.put(IOException.class, retryPolicy);
|
exceptionToPolicyMap.put(UnknownHostException.class, retryPolicy);
|
||||||
|
exceptionToPolicyMap.put(ConnectTimeoutException.class, retryPolicy);
|
||||||
|
exceptionToPolicyMap.put(RetriableException.class, retryPolicy);
|
||||||
|
exceptionToPolicyMap.put(SocketException.class, retryPolicy);
|
||||||
|
|
||||||
return RetryPolicies.retryByException(
|
return RetryPolicies.retryByException(
|
||||||
RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
|
RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,94 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.client;
|
||||||
|
|
||||||
|
import java.net.ConnectException;
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
|
import java.net.NoRouteToHostException;
|
||||||
|
import java.net.SocketException;
|
||||||
|
import java.net.UnknownHostException;
|
||||||
|
import java.security.PrivilegedAction;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.io.retry.RetryPolicies;
|
||||||
|
import org.apache.hadoop.io.retry.RetryPolicy;
|
||||||
|
import org.apache.hadoop.io.retry.RetryProxy;
|
||||||
|
import org.apache.hadoop.ipc.RetriableException;
|
||||||
|
import org.apache.hadoop.net.ConnectTimeoutException;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||||
|
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
|
|
||||||
|
@Public
|
||||||
|
@Unstable
|
||||||
|
public class ServerProxy {
|
||||||
|
|
||||||
|
protected static RetryPolicy createRetryPolicy(Configuration conf,
|
||||||
|
String maxWaitTimeStr, long defMaxWaitTime,
|
||||||
|
String connectRetryIntervalStr, long defRetryInterval) {
|
||||||
|
long maxWaitTime = conf.getLong(maxWaitTimeStr, defMaxWaitTime);
|
||||||
|
long retryIntervalMS =
|
||||||
|
conf.getLong(connectRetryIntervalStr, defRetryInterval);
|
||||||
|
if (maxWaitTime == -1) {
|
||||||
|
// wait forever.
|
||||||
|
return RetryPolicies.RETRY_FOREVER;
|
||||||
|
}
|
||||||
|
|
||||||
|
Preconditions.checkArgument(maxWaitTime > 0, "Invalid Configuration. "
|
||||||
|
+ maxWaitTimeStr + " should be a positive value.");
|
||||||
|
Preconditions.checkArgument(retryIntervalMS > 0, "Invalid Configuration. "
|
||||||
|
+ connectRetryIntervalStr + "should be a positive value.");
|
||||||
|
|
||||||
|
RetryPolicy retryPolicy =
|
||||||
|
RetryPolicies.retryUpToMaximumTimeWithFixedSleep(maxWaitTime,
|
||||||
|
retryIntervalMS, TimeUnit.MILLISECONDS);
|
||||||
|
|
||||||
|
Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap =
|
||||||
|
new HashMap<Class<? extends Exception>, RetryPolicy>();
|
||||||
|
exceptionToPolicyMap.put(ConnectException.class, retryPolicy);
|
||||||
|
exceptionToPolicyMap.put(NoRouteToHostException.class, retryPolicy);
|
||||||
|
exceptionToPolicyMap.put(UnknownHostException.class, retryPolicy);
|
||||||
|
exceptionToPolicyMap.put(ConnectTimeoutException.class, retryPolicy);
|
||||||
|
exceptionToPolicyMap.put(RetriableException.class, retryPolicy);
|
||||||
|
exceptionToPolicyMap.put(SocketException.class, retryPolicy);
|
||||||
|
|
||||||
|
return RetryPolicies.retryByException(RetryPolicies.TRY_ONCE_THEN_FAIL,
|
||||||
|
exceptionToPolicyMap);
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
protected static <T> T createRetriableProxy(final Configuration conf,
|
||||||
|
final Class<T> protocol, final UserGroupInformation user,
|
||||||
|
final YarnRPC rpc, final InetSocketAddress serverAddress,
|
||||||
|
RetryPolicy retryPolicy) {
|
||||||
|
T proxy = user.doAs(new PrivilegedAction<T>() {
|
||||||
|
@Override
|
||||||
|
public T run() {
|
||||||
|
return (T) rpc.getProxy(protocol, serverAddress, conf);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
return (T) RetryProxy.create(protocol, proxy, retryPolicy);
|
||||||
|
}
|
||||||
|
}
|
|
@ -18,15 +18,18 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.factories.impl.pb;
|
package org.apache.hadoop.yarn.factories.impl.pb;
|
||||||
|
|
||||||
|
import java.io.Closeable;
|
||||||
import java.lang.reflect.Constructor;
|
import java.lang.reflect.Constructor;
|
||||||
|
import java.lang.reflect.InvocationHandler;
|
||||||
import java.lang.reflect.InvocationTargetException;
|
import java.lang.reflect.InvocationTargetException;
|
||||||
import java.lang.reflect.Method;
|
import java.lang.reflect.Proxy;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.ConcurrentMap;
|
import java.util.concurrent.ConcurrentMap;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.HadoopIllegalArgumentException;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||||
|
@ -87,15 +90,23 @@ public class RpcClientFactoryPBImpl implements RpcClientFactory {
|
||||||
@Override
|
@Override
|
||||||
public void stopClient(Object proxy) {
|
public void stopClient(Object proxy) {
|
||||||
try {
|
try {
|
||||||
Method closeMethod = proxy.getClass().getMethod("close");
|
if (proxy instanceof Closeable) {
|
||||||
closeMethod.invoke(proxy);
|
((Closeable) proxy).close();
|
||||||
} catch (InvocationTargetException e) {
|
return;
|
||||||
throw new YarnRuntimeException(e);
|
} else {
|
||||||
|
InvocationHandler handler = Proxy.getInvocationHandler(proxy);
|
||||||
|
if (handler instanceof Closeable) {
|
||||||
|
((Closeable) handler).close();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOG.error("Cannot call close method due to Exception. "
|
LOG.error("Cannot call close method due to Exception. " + "Ignoring.", e);
|
||||||
+ "Ignoring.", e);
|
|
||||||
throw new YarnRuntimeException(e);
|
throw new YarnRuntimeException(e);
|
||||||
}
|
}
|
||||||
|
throw new HadoopIllegalArgumentException(
|
||||||
|
"Cannot close proxy - is not Closeable or "
|
||||||
|
+ "does not provide closeable invocation handler " + proxy.getClass());
|
||||||
}
|
}
|
||||||
|
|
||||||
private String getPBImplClassName(Class<?> clazz) {
|
private String getPBImplClassName(Class<?> clazz) {
|
||||||
|
|
|
@ -1082,6 +1082,18 @@
|
||||||
<value>500</value>
|
<value>500</value>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<description>Max time to wait to establish a connection to NM</description>
|
||||||
|
<name>yarn.client.nodemanager-connect.max-wait-ms</name>
|
||||||
|
<value>900000</value>
|
||||||
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<description>Time interval between each attempt to connect to NM</description>
|
||||||
|
<name>yarn.client.nodemanager-connect.retry-interval-ms</name>
|
||||||
|
<value>10000</value>
|
||||||
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<description>
|
<description>
|
||||||
Maximum number of proxy connections for node manager. It should always be
|
Maximum number of proxy connections for node manager. It should always be
|
||||||
|
|
|
@ -0,0 +1,141 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.nodemanager.containermanager;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.UnsupportedFileSystemException;
|
||||||
|
import org.apache.hadoop.security.SecurityUtil;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.hadoop.security.token.Token;
|
||||||
|
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
import org.apache.hadoop.yarn.client.NMProxy;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
|
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||||
|
import org.apache.hadoop.yarn.security.NMTokenIdentifier;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
|
||||||
|
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||||
|
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||||
|
import org.apache.hadoop.yarn.util.Records;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class TestNMProxy extends BaseContainerManagerTest {
|
||||||
|
|
||||||
|
public TestNMProxy() throws UnsupportedFileSystemException {
|
||||||
|
super();
|
||||||
|
}
|
||||||
|
|
||||||
|
int retryCount = 0;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
conf.setLong(YarnConfiguration.CLIENT_NM_CONNECT_MAX_WAIT_MS, 10000);
|
||||||
|
conf.setLong(YarnConfiguration.CLIENT_NM_CONNECT_RETRY_INTERVAL_MS, 100);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected ContainerManagerImpl
|
||||||
|
createContainerManager(DeletionService delSrvc) {
|
||||||
|
return new ContainerManagerImpl(context, exec, delSrvc, nodeStatusUpdater,
|
||||||
|
metrics, new ApplicationACLsManager(conf), dirsHandler) {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public StartContainersResponse startContainers(
|
||||||
|
StartContainersRequest requests) throws YarnException, IOException {
|
||||||
|
if (retryCount < 5) {
|
||||||
|
retryCount++;
|
||||||
|
throw new java.net.ConnectException("start container exception");
|
||||||
|
}
|
||||||
|
return super.startContainers(requests);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public StopContainersResponse stopContainers(
|
||||||
|
StopContainersRequest requests) throws YarnException, IOException {
|
||||||
|
if (retryCount < 5) {
|
||||||
|
retryCount++;
|
||||||
|
throw new java.net.ConnectException("stop container exception");
|
||||||
|
}
|
||||||
|
return super.stopContainers(requests);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public GetContainerStatusesResponse getContainerStatuses(
|
||||||
|
GetContainerStatusesRequest request) throws YarnException,
|
||||||
|
IOException {
|
||||||
|
if (retryCount < 5) {
|
||||||
|
retryCount++;
|
||||||
|
throw new java.net.ConnectException("get container status exception");
|
||||||
|
}
|
||||||
|
return super.getContainerStatuses(request);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 20000)
|
||||||
|
public void testNMProxyRetry() throws Exception {
|
||||||
|
containerManager.start();
|
||||||
|
containerManager.setBlockNewContainerRequests(false);
|
||||||
|
StartContainersRequest allRequests =
|
||||||
|
Records.newRecord(StartContainersRequest.class);
|
||||||
|
ApplicationId appId = ApplicationId.newInstance(1, 1);
|
||||||
|
ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1);
|
||||||
|
|
||||||
|
org.apache.hadoop.yarn.api.records.Token nmToken =
|
||||||
|
context.getNMTokenSecretManager().createNMToken(attemptId,
|
||||||
|
context.getNodeId(), user);
|
||||||
|
final InetSocketAddress address =
|
||||||
|
conf.getSocketAddr(YarnConfiguration.NM_BIND_HOST,
|
||||||
|
YarnConfiguration.NM_ADDRESS, YarnConfiguration.DEFAULT_NM_ADDRESS,
|
||||||
|
YarnConfiguration.DEFAULT_NM_PORT);
|
||||||
|
Token<NMTokenIdentifier> token =
|
||||||
|
ConverterUtils.convertFromYarn(nmToken,
|
||||||
|
SecurityUtil.buildTokenService(address));
|
||||||
|
UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user);
|
||||||
|
ugi.addToken(token);
|
||||||
|
|
||||||
|
ContainerManagementProtocol proxy =
|
||||||
|
NMProxy.createNMProxy(conf, ContainerManagementProtocol.class, ugi,
|
||||||
|
YarnRPC.create(conf), address);
|
||||||
|
|
||||||
|
proxy.startContainers(allRequests);
|
||||||
|
Assert.assertEquals(5, retryCount);
|
||||||
|
|
||||||
|
retryCount = 0;
|
||||||
|
proxy.stopContainers(Records.newRecord(StopContainersRequest.class));
|
||||||
|
Assert.assertEquals(5, retryCount);
|
||||||
|
|
||||||
|
retryCount = 0;
|
||||||
|
proxy.getContainerStatuses(Records
|
||||||
|
.newRecord(GetContainerStatusesRequest.class));
|
||||||
|
Assert.assertEquals(5, retryCount);
|
||||||
|
}
|
||||||
|
}
|
|
@ -23,7 +23,6 @@ import static org.junit.Assert.fail;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.security.PrivilegedAction;
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
@ -57,6 +56,7 @@ import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.api.records.SerializedException;
|
import org.apache.hadoop.yarn.api.records.SerializedException;
|
||||||
import org.apache.hadoop.yarn.api.records.Token;
|
import org.apache.hadoop.yarn.api.records.Token;
|
||||||
|
import org.apache.hadoop.yarn.client.NMProxy;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||||
|
@ -607,17 +607,9 @@ public class TestContainerManagerSecurity extends KerberosSecurityTestcase {
|
||||||
if (nmToken != null) {
|
if (nmToken != null) {
|
||||||
ugi.addToken(ConverterUtils.convertFromYarn(nmToken, addr));
|
ugi.addToken(ConverterUtils.convertFromYarn(nmToken, addr));
|
||||||
}
|
}
|
||||||
|
proxy =
|
||||||
proxy = ugi
|
NMProxy.createNMProxy(conf, ContainerManagementProtocol.class, ugi,
|
||||||
.doAs(new PrivilegedAction<ContainerManagementProtocol>() {
|
rpc, addr);
|
||||||
|
|
||||||
@Override
|
|
||||||
public ContainerManagementProtocol run() {
|
|
||||||
return (ContainerManagementProtocol) rpc.getProxy(
|
|
||||||
ContainerManagementProtocol.class,
|
|
||||||
addr, conf);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
return proxy;
|
return proxy;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue