YARN-1559. Race between ServerRMProxy and ClientRMProxy setting RMProxy#INSTANCE. (kasha and vinodkv via kasha)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1555970 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Karthik Kambatla 2014-01-06 18:40:15 +00:00
parent 76238b9722
commit 2cddd21db9
5 changed files with 11 additions and 47 deletions

View File

@ -286,6 +286,9 @@ Release 2.4.0 - UNRELEASED
YARN-1549. Fixed a bug in ResourceManager's ApplicationMasterService that YARN-1549. Fixed a bug in ResourceManager's ApplicationMasterService that
was causing unamanged AMs to not finish correctly. (haosdent via vinodkv) was causing unamanged AMs to not finish correctly. (haosdent via vinodkv)
YARN-1559. Race between ServerRMProxy and ClientRMProxy setting
RMProxy#INSTANCE. (kasha and vinodkv via kasha)
Release 2.3.0 - UNRELEASED Release 2.3.0 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -309,13 +309,4 @@
<Class name="org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore" /> <Class name="org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore" />
<Bug pattern="IS2_INCONSISTENT_SYNC" /> <Bug pattern="IS2_INCONSISTENT_SYNC" />
</Match> </Match>
<!-- Ignore INSTANCE not being final as it is created in sub-classes -->
<Match>
<Class name="org.apache.hadoop.yarn.client.RMProxy" />
<Field name="INSTANCE" />
<Bug pattern="MS_SHOULD_BE_FINAL"/>
</Match>
</FindBugsFilter> </FindBugsFilter>

View File

@ -39,16 +39,13 @@ import com.google.common.base.Preconditions;
public class ClientRMProxy<T> extends RMProxy<T> { public class ClientRMProxy<T> extends RMProxy<T> {
private static final Log LOG = LogFactory.getLog(ClientRMProxy.class); private static final Log LOG = LogFactory.getLog(ClientRMProxy.class);
private static final ClientRMProxy INSTANCE = new ClientRMProxy();
private interface ClientRMProtocols extends ApplicationClientProtocol, private interface ClientRMProtocols extends ApplicationClientProtocol,
ApplicationMasterProtocol, ResourceManagerAdministrationProtocol { ApplicationMasterProtocol, ResourceManagerAdministrationProtocol {
// Add nothing // Add nothing
} }
static {
INSTANCE = new ClientRMProxy();
}
private ClientRMProxy(){ private ClientRMProxy(){
super(); super();
} }
@ -63,9 +60,7 @@ public class ClientRMProxy<T> extends RMProxy<T> {
*/ */
public static <T> T createRMProxy(final Configuration configuration, public static <T> T createRMProxy(final Configuration configuration,
final Class<T> protocol) throws IOException { final Class<T> protocol) throws IOException {
// This method exists only to initiate this class' static INSTANCE. TODO: return createRMProxy(configuration, protocol, INSTANCE);
// FIX if possible
return RMProxy.createRMProxy(configuration, protocol);
} }
private static void setupTokens(InetSocketAddress resourceManagerAddress) private static void setupTokens(InetSocketAddress resourceManagerAddress)

View File

@ -50,7 +50,6 @@ import com.google.common.annotations.VisibleForTesting;
public class RMProxy<T> { public class RMProxy<T> {
private static final Log LOG = LogFactory.getLog(RMProxy.class); private static final Log LOG = LogFactory.getLog(RMProxy.class);
protected static RMProxy INSTANCE;
protected RMProxy() {} protected RMProxy() {}
@ -79,17 +78,17 @@ public class RMProxy<T> {
*/ */
@Private @Private
protected static <T> T createRMProxy(final Configuration configuration, protected static <T> T createRMProxy(final Configuration configuration,
final Class<T> protocol) throws IOException { final Class<T> protocol, RMProxy instance) throws IOException {
YarnConfiguration conf = (configuration instanceof YarnConfiguration) YarnConfiguration conf = (configuration instanceof YarnConfiguration)
? (YarnConfiguration) configuration ? (YarnConfiguration) configuration
: new YarnConfiguration(configuration); : new YarnConfiguration(configuration);
RetryPolicy retryPolicy = createRetryPolicy(conf); RetryPolicy retryPolicy = createRetryPolicy(conf);
if (HAUtil.isHAEnabled(conf)) { if (HAUtil.isHAEnabled(conf)) {
RMFailoverProxyProvider<T> provider = RMFailoverProxyProvider<T> provider =
INSTANCE.createRMFailoverProxyProvider(conf, protocol); instance.createRMFailoverProxyProvider(conf, protocol);
return (T) RetryProxy.create(protocol, provider, retryPolicy); return (T) RetryProxy.create(protocol, provider, retryPolicy);
} else { } else {
InetSocketAddress rmAddress = INSTANCE.getRMAddress(conf, protocol); InetSocketAddress rmAddress = instance.getRMAddress(conf, protocol);
LOG.info("Connecting to ResourceManager at " + rmAddress); LOG.info("Connecting to ResourceManager at " + rmAddress);
T proxy = RMProxy.<T>getProxy(conf, protocol, rmAddress); T proxy = RMProxy.<T>getProxy(conf, protocol, rmAddress);
return (T) RetryProxy.create(protocol, proxy, retryPolicy); return (T) RetryProxy.create(protocol, proxy, retryPolicy);
@ -159,25 +158,6 @@ public class RMProxy<T> {
return provider; return provider;
} }
/**
* A RetryPolicy to allow failing over upto the specified maximum time.
*/
private static class FailoverUptoMaximumTimePolicy implements RetryPolicy {
private long maxTime;
FailoverUptoMaximumTimePolicy(long maxTime) {
this.maxTime = maxTime;
}
@Override
public RetryAction shouldRetry(Exception e, int retries, int failovers,
boolean isIdempotentOrAtMostOnce) throws Exception {
return System.currentTimeMillis() < maxTime
? RetryAction.FAILOVER_AND_RETRY
: RetryAction.FAIL;
}
}
/** /**
* Fetch retry policy from Configuration * Fetch retry policy from Configuration
*/ */

View File

@ -32,10 +32,7 @@ import com.google.common.base.Preconditions;
public class ServerRMProxy<T> extends RMProxy<T> { public class ServerRMProxy<T> extends RMProxy<T> {
private static final Log LOG = LogFactory.getLog(ServerRMProxy.class); private static final Log LOG = LogFactory.getLog(ServerRMProxy.class);
private static final ServerRMProxy INSTANCE = new ServerRMProxy();
static {
INSTANCE = new ServerRMProxy();
}
private ServerRMProxy() { private ServerRMProxy() {
super(); super();
@ -51,9 +48,7 @@ public class ServerRMProxy<T> extends RMProxy<T> {
*/ */
public static <T> T createRMProxy(final Configuration configuration, public static <T> T createRMProxy(final Configuration configuration,
final Class<T> protocol) throws IOException { final Class<T> protocol) throws IOException {
// This method exists only to initiate this class' static INSTANCE. TODO: return createRMProxy(configuration, protocol, INSTANCE);
// FIX if possible
return RMProxy.createRMProxy(configuration, protocol);
} }
@InterfaceAudience.Private @InterfaceAudience.Private