YARN-2613. Support retry in NMClient for rolling-upgrades. (Contributed by Jian He)
This commit is contained in:
parent
364c110c2b
commit
8871bf37fe
|
@ -73,6 +73,9 @@ Release 2.6.0 - UNRELEASED
|
|||
YARN-2581. Passed LogAggregationContext to NM via ContainerTokenIdentifier.
|
||||
(Xuan Gong via zjshen)
|
||||
|
||||
YARN-2613. Support retry in NMClient for rolling-upgrades. (Jian He via
|
||||
junping_du)
|
||||
|
||||
IMPROVEMENTS
|
||||
|
||||
YARN-2242. Improve exception information on AM launch crashes. (Li Lu
|
||||
|
|
|
@ -1027,7 +1027,7 @@ public class YarnConfiguration extends Configuration {
|
|||
/** Max time to wait to establish a connection to RM */
|
||||
public static final String RESOURCEMANAGER_CONNECT_MAX_WAIT_MS =
|
||||
RM_PREFIX + "connect.max-wait.ms";
|
||||
public static final int DEFAULT_RESOURCEMANAGER_CONNECT_MAX_WAIT_MS =
|
||||
public static final long DEFAULT_RESOURCEMANAGER_CONNECT_MAX_WAIT_MS =
|
||||
15 * 60 * 1000;
|
||||
|
||||
/** Time interval between each attempt to connect to RM */
|
||||
|
@ -1342,6 +1342,18 @@ public class YarnConfiguration extends Configuration {
|
|||
YARN_PREFIX + "client.max-nodemanagers-proxies";
|
||||
public static final int DEFAULT_NM_CLIENT_MAX_NM_PROXIES = 500;
|
||||
|
||||
/** Max time to wait to establish a connection to NM */
|
||||
public static final String CLIENT_NM_CONNECT_MAX_WAIT_MS =
|
||||
YARN_PREFIX + "client.nodemanager-connect.max-wait-ms";
|
||||
public static final long DEFAULT_CLIENT_NM_CONNECT_MAX_WAIT_MS =
|
||||
15 * 60 * 1000;
|
||||
|
||||
/** Time interval between each attempt to connect to NM */
|
||||
public static final String CLIENT_NM_CONNECT_RETRY_INTERVAL_MS =
|
||||
YARN_PREFIX + "client.nodemanager-connect.retry-interval-ms";
|
||||
public static final long DEFAULT_CLIENT_NM_CONNECT_RETRY_INTERVAL_MS
|
||||
= 10 * 1000;
|
||||
|
||||
public static final String YARN_HTTP_POLICY_KEY = YARN_PREFIX + "http.policy";
|
||||
public static final String YARN_HTTP_POLICY_DEFAULT = HttpConfig.Policy.HTTP_ONLY
|
||||
.name();
|
||||
|
|
|
@ -19,7 +19,6 @@
|
|||
package org.apache.hadoop.yarn.client.api.impl;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
import java.security.PrivilegedAction;
|
||||
import java.util.ArrayList;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
|
@ -35,6 +34,7 @@ import org.apache.hadoop.security.token.SecretManager.InvalidToken;
|
|||
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.Token;
|
||||
import org.apache.hadoop.yarn.client.NMProxy;
|
||||
import org.apache.hadoop.yarn.client.api.NMTokenCache;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
|
@ -219,16 +219,8 @@ public class ContainerManagementProtocolProxy {
|
|||
ConverterUtils.convertFromYarn(token, cmAddr);
|
||||
user.addToken(nmToken);
|
||||
|
||||
ContainerManagementProtocol proxy = user
|
||||
.doAs(new PrivilegedAction<ContainerManagementProtocol>() {
|
||||
|
||||
@Override
|
||||
public ContainerManagementProtocol run() {
|
||||
return (ContainerManagementProtocol) rpc.getProxy(
|
||||
ContainerManagementProtocol.class, cmAddr, conf);
|
||||
}
|
||||
});
|
||||
return proxy;
|
||||
return NMProxy.createNMProxy(conf, ContainerManagementProtocol.class,
|
||||
user, rpc, cmAddr);
|
||||
}
|
||||
|
||||
public ContainerManagementProtocol getContainerManagementProtocol() {
|
||||
|
|
|
@ -0,0 +1,49 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.client;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.io.retry.RetryPolicy;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||
|
||||
@Public
|
||||
@Unstable
|
||||
public class NMProxy extends ServerProxy {
|
||||
|
||||
public static <T> T createNMProxy(final Configuration conf,
|
||||
final Class<T> protocol, final UserGroupInformation ugi,
|
||||
final YarnRPC rpc, final InetSocketAddress serverAddress) {
|
||||
|
||||
RetryPolicy retryPolicy =
|
||||
createRetryPolicy(conf,
|
||||
YarnConfiguration.CLIENT_NM_CONNECT_MAX_WAIT_MS,
|
||||
YarnConfiguration.DEFAULT_CLIENT_NM_CONNECT_MAX_WAIT_MS,
|
||||
YarnConfiguration.CLIENT_NM_CONNECT_RETRY_INTERVAL_MS,
|
||||
YarnConfiguration.DEFAULT_CLIENT_NM_CONNECT_RETRY_INTERVAL_MS);
|
||||
|
||||
return createRetriableProxy(conf, protocol, ugi, rpc, serverAddress,
|
||||
retryPolicy);
|
||||
}
|
||||
}
|
|
@ -21,6 +21,9 @@ package org.apache.hadoop.yarn.client;
|
|||
import java.io.IOException;
|
||||
import java.net.ConnectException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.NoRouteToHostException;
|
||||
import java.net.SocketException;
|
||||
import java.net.UnknownHostException;
|
||||
import java.security.PrivilegedAction;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
@ -35,6 +38,8 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.io.retry.RetryPolicies;
|
||||
import org.apache.hadoop.io.retry.RetryPolicy;
|
||||
import org.apache.hadoop.io.retry.RetryProxy;
|
||||
import org.apache.hadoop.ipc.RetriableException;
|
||||
import org.apache.hadoop.net.ConnectTimeoutException;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
import org.apache.hadoop.yarn.conf.HAUtil;
|
||||
|
@ -165,7 +170,7 @@ public class RMProxy<T> {
|
|||
@VisibleForTesting
|
||||
public static RetryPolicy createRetryPolicy(Configuration conf) {
|
||||
long rmConnectWaitMS =
|
||||
conf.getInt(
|
||||
conf.getLong(
|
||||
YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS,
|
||||
YarnConfiguration.DEFAULT_RESOURCEMANAGER_CONNECT_MAX_WAIT_MS);
|
||||
long rmConnectionRetryIntervalMS =
|
||||
|
@ -234,9 +239,14 @@ public class RMProxy<T> {
|
|||
|
||||
Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap =
|
||||
new HashMap<Class<? extends Exception>, RetryPolicy>();
|
||||
|
||||
exceptionToPolicyMap.put(ConnectException.class, retryPolicy);
|
||||
//TO DO: after HADOOP-9576, IOException can be changed to EOFException
|
||||
exceptionToPolicyMap.put(IOException.class, retryPolicy);
|
||||
exceptionToPolicyMap.put(NoRouteToHostException.class, retryPolicy);
|
||||
exceptionToPolicyMap.put(UnknownHostException.class, retryPolicy);
|
||||
exceptionToPolicyMap.put(ConnectTimeoutException.class, retryPolicy);
|
||||
exceptionToPolicyMap.put(RetriableException.class, retryPolicy);
|
||||
exceptionToPolicyMap.put(SocketException.class, retryPolicy);
|
||||
|
||||
return RetryPolicies.retryByException(
|
||||
RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
|
||||
}
|
||||
|
|
|
@ -0,0 +1,94 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.client;
|
||||
|
||||
import java.net.ConnectException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.NoRouteToHostException;
|
||||
import java.net.SocketException;
|
||||
import java.net.UnknownHostException;
|
||||
import java.security.PrivilegedAction;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.io.retry.RetryPolicies;
|
||||
import org.apache.hadoop.io.retry.RetryPolicy;
|
||||
import org.apache.hadoop.io.retry.RetryProxy;
|
||||
import org.apache.hadoop.ipc.RetriableException;
|
||||
import org.apache.hadoop.net.ConnectTimeoutException;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
|
||||
@Public
|
||||
@Unstable
|
||||
public class ServerProxy {
|
||||
|
||||
protected static RetryPolicy createRetryPolicy(Configuration conf,
|
||||
String maxWaitTimeStr, long defMaxWaitTime,
|
||||
String connectRetryIntervalStr, long defRetryInterval) {
|
||||
long maxWaitTime = conf.getLong(maxWaitTimeStr, defMaxWaitTime);
|
||||
long retryIntervalMS =
|
||||
conf.getLong(connectRetryIntervalStr, defRetryInterval);
|
||||
if (maxWaitTime == -1) {
|
||||
// wait forever.
|
||||
return RetryPolicies.RETRY_FOREVER;
|
||||
}
|
||||
|
||||
Preconditions.checkArgument(maxWaitTime > 0, "Invalid Configuration. "
|
||||
+ maxWaitTimeStr + " should be a positive value.");
|
||||
Preconditions.checkArgument(retryIntervalMS > 0, "Invalid Configuration. "
|
||||
+ connectRetryIntervalStr + "should be a positive value.");
|
||||
|
||||
RetryPolicy retryPolicy =
|
||||
RetryPolicies.retryUpToMaximumTimeWithFixedSleep(maxWaitTime,
|
||||
retryIntervalMS, TimeUnit.MILLISECONDS);
|
||||
|
||||
Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap =
|
||||
new HashMap<Class<? extends Exception>, RetryPolicy>();
|
||||
exceptionToPolicyMap.put(ConnectException.class, retryPolicy);
|
||||
exceptionToPolicyMap.put(NoRouteToHostException.class, retryPolicy);
|
||||
exceptionToPolicyMap.put(UnknownHostException.class, retryPolicy);
|
||||
exceptionToPolicyMap.put(ConnectTimeoutException.class, retryPolicy);
|
||||
exceptionToPolicyMap.put(RetriableException.class, retryPolicy);
|
||||
exceptionToPolicyMap.put(SocketException.class, retryPolicy);
|
||||
|
||||
return RetryPolicies.retryByException(RetryPolicies.TRY_ONCE_THEN_FAIL,
|
||||
exceptionToPolicyMap);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
protected static <T> T createRetriableProxy(final Configuration conf,
|
||||
final Class<T> protocol, final UserGroupInformation user,
|
||||
final YarnRPC rpc, final InetSocketAddress serverAddress,
|
||||
RetryPolicy retryPolicy) {
|
||||
T proxy = user.doAs(new PrivilegedAction<T>() {
|
||||
@Override
|
||||
public T run() {
|
||||
return (T) rpc.getProxy(protocol, serverAddress, conf);
|
||||
}
|
||||
});
|
||||
return (T) RetryProxy.create(protocol, proxy, retryPolicy);
|
||||
}
|
||||
}
|
|
@ -18,15 +18,18 @@
|
|||
|
||||
package org.apache.hadoop.yarn.factories.impl.pb;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.lang.reflect.Constructor;
|
||||
import java.lang.reflect.InvocationHandler;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.lang.reflect.Method;
|
||||
import java.lang.reflect.Proxy;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.HadoopIllegalArgumentException;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
|
@ -87,15 +90,23 @@ public class RpcClientFactoryPBImpl implements RpcClientFactory {
|
|||
@Override
|
||||
public void stopClient(Object proxy) {
|
||||
try {
|
||||
Method closeMethod = proxy.getClass().getMethod("close");
|
||||
closeMethod.invoke(proxy);
|
||||
} catch (InvocationTargetException e) {
|
||||
throw new YarnRuntimeException(e);
|
||||
if (proxy instanceof Closeable) {
|
||||
((Closeable) proxy).close();
|
||||
return;
|
||||
} else {
|
||||
InvocationHandler handler = Proxy.getInvocationHandler(proxy);
|
||||
if (handler instanceof Closeable) {
|
||||
((Closeable) handler).close();
|
||||
return;
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.error("Cannot call close method due to Exception. "
|
||||
+ "Ignoring.", e);
|
||||
LOG.error("Cannot call close method due to Exception. " + "Ignoring.", e);
|
||||
throw new YarnRuntimeException(e);
|
||||
}
|
||||
throw new HadoopIllegalArgumentException(
|
||||
"Cannot close proxy - is not Closeable or "
|
||||
+ "does not provide closeable invocation handler " + proxy.getClass());
|
||||
}
|
||||
|
||||
private String getPBImplClassName(Class<?> clazz) {
|
||||
|
|
|
@ -1080,7 +1080,19 @@
|
|||
<name>yarn.client.nodemanager-client-async.thread-pool-max-size</name>
|
||||
<value>500</value>
|
||||
</property>
|
||||
|
||||
|
||||
<property>
|
||||
<description>Max time to wait to establish a connection to NM</description>
|
||||
<name>yarn.client.nodemanager-connect.max-wait-ms</name>
|
||||
<value>900000</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description>Time interval between each attempt to connect to NM</description>
|
||||
<name>yarn.client.nodemanager-connect.retry-interval-ms</name>
|
||||
<value>10000</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description>
|
||||
Maximum number of proxy connections for node manager. It should always be
|
||||
|
|
|
@ -0,0 +1,141 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.nodemanager.containermanager;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
|
||||
import org.apache.hadoop.fs.UnsupportedFileSystemException;
|
||||
import org.apache.hadoop.security.SecurityUtil;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.client.NMProxy;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||
import org.apache.hadoop.yarn.security.NMTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
|
||||
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestNMProxy extends BaseContainerManagerTest {
|
||||
|
||||
public TestNMProxy() throws UnsupportedFileSystemException {
|
||||
super();
|
||||
}
|
||||
|
||||
int retryCount = 0;
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
conf.setLong(YarnConfiguration.CLIENT_NM_CONNECT_MAX_WAIT_MS, 10000);
|
||||
conf.setLong(YarnConfiguration.CLIENT_NM_CONNECT_RETRY_INTERVAL_MS, 100);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ContainerManagerImpl
|
||||
createContainerManager(DeletionService delSrvc) {
|
||||
return new ContainerManagerImpl(context, exec, delSrvc, nodeStatusUpdater,
|
||||
metrics, new ApplicationACLsManager(conf), dirsHandler) {
|
||||
|
||||
@Override
|
||||
public StartContainersResponse startContainers(
|
||||
StartContainersRequest requests) throws YarnException, IOException {
|
||||
if (retryCount < 5) {
|
||||
retryCount++;
|
||||
throw new java.net.ConnectException("start container exception");
|
||||
}
|
||||
return super.startContainers(requests);
|
||||
}
|
||||
|
||||
@Override
|
||||
public StopContainersResponse stopContainers(
|
||||
StopContainersRequest requests) throws YarnException, IOException {
|
||||
if (retryCount < 5) {
|
||||
retryCount++;
|
||||
throw new java.net.ConnectException("stop container exception");
|
||||
}
|
||||
return super.stopContainers(requests);
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetContainerStatusesResponse getContainerStatuses(
|
||||
GetContainerStatusesRequest request) throws YarnException,
|
||||
IOException {
|
||||
if (retryCount < 5) {
|
||||
retryCount++;
|
||||
throw new java.net.ConnectException("get container status exception");
|
||||
}
|
||||
return super.getContainerStatuses(request);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Test(timeout = 20000)
|
||||
public void testNMProxyRetry() throws Exception {
|
||||
containerManager.start();
|
||||
containerManager.setBlockNewContainerRequests(false);
|
||||
StartContainersRequest allRequests =
|
||||
Records.newRecord(StartContainersRequest.class);
|
||||
ApplicationId appId = ApplicationId.newInstance(1, 1);
|
||||
ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1);
|
||||
|
||||
org.apache.hadoop.yarn.api.records.Token nmToken =
|
||||
context.getNMTokenSecretManager().createNMToken(attemptId,
|
||||
context.getNodeId(), user);
|
||||
final InetSocketAddress address =
|
||||
conf.getSocketAddr(YarnConfiguration.NM_BIND_HOST,
|
||||
YarnConfiguration.NM_ADDRESS, YarnConfiguration.DEFAULT_NM_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_NM_PORT);
|
||||
Token<NMTokenIdentifier> token =
|
||||
ConverterUtils.convertFromYarn(nmToken,
|
||||
SecurityUtil.buildTokenService(address));
|
||||
UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user);
|
||||
ugi.addToken(token);
|
||||
|
||||
ContainerManagementProtocol proxy =
|
||||
NMProxy.createNMProxy(conf, ContainerManagementProtocol.class, ugi,
|
||||
YarnRPC.create(conf), address);
|
||||
|
||||
proxy.startContainers(allRequests);
|
||||
Assert.assertEquals(5, retryCount);
|
||||
|
||||
retryCount = 0;
|
||||
proxy.stopContainers(Records.newRecord(StopContainersRequest.class));
|
||||
Assert.assertEquals(5, retryCount);
|
||||
|
||||
retryCount = 0;
|
||||
proxy.getContainerStatuses(Records
|
||||
.newRecord(GetContainerStatusesRequest.class));
|
||||
Assert.assertEquals(5, retryCount);
|
||||
}
|
||||
}
|
|
@ -23,7 +23,6 @@ import static org.junit.Assert.fail;
|
|||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.security.PrivilegedAction;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
|
@ -57,6 +56,7 @@ import org.apache.hadoop.yarn.api.records.Priority;
|
|||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.SerializedException;
|
||||
import org.apache.hadoop.yarn.api.records.Token;
|
||||
import org.apache.hadoop.yarn.client.NMProxy;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
|
@ -607,17 +607,9 @@ public class TestContainerManagerSecurity extends KerberosSecurityTestcase {
|
|||
if (nmToken != null) {
|
||||
ugi.addToken(ConverterUtils.convertFromYarn(nmToken, addr));
|
||||
}
|
||||
|
||||
proxy = ugi
|
||||
.doAs(new PrivilegedAction<ContainerManagementProtocol>() {
|
||||
|
||||
@Override
|
||||
public ContainerManagementProtocol run() {
|
||||
return (ContainerManagementProtocol) rpc.getProxy(
|
||||
ContainerManagementProtocol.class,
|
||||
addr, conf);
|
||||
}
|
||||
});
|
||||
proxy =
|
||||
NMProxy.createNMProxy(conf, ContainerManagementProtocol.class, ugi,
|
||||
rpc, addr);
|
||||
return proxy;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue