YARN-7720. Race condition between second app attempt and UAM timeout when first attempt node is down. (#5672)
This commit is contained in:
parent
97afb33c73
commit
86c250a54a
|
@ -394,7 +394,8 @@ public class YarnConfiguration extends Configuration {
|
||||||
/** The expiry interval for application master reporting.*/
|
/** The expiry interval for application master reporting.*/
|
||||||
public static final String RM_AM_EXPIRY_INTERVAL_MS =
|
public static final String RM_AM_EXPIRY_INTERVAL_MS =
|
||||||
YARN_PREFIX + "am.liveness-monitor.expiry-interval-ms";
|
YARN_PREFIX + "am.liveness-monitor.expiry-interval-ms";
|
||||||
public static final int DEFAULT_RM_AM_EXPIRY_INTERVAL_MS = 600000;
|
public static final long DEFAULT_RM_AM_EXPIRY_INTERVAL_MS =
|
||||||
|
TimeUnit.MINUTES.toMillis(15);
|
||||||
|
|
||||||
/** How long to wait until a node manager is considered dead.*/
|
/** How long to wait until a node manager is considered dead.*/
|
||||||
public static final String RM_NM_EXPIRY_INTERVAL_MS =
|
public static final String RM_NM_EXPIRY_INTERVAL_MS =
|
||||||
|
|
|
@ -83,7 +83,7 @@ public abstract class AbstractLivelinessMonitor<O> extends AbstractService {
|
||||||
|
|
||||||
protected abstract void expire(O ob);
|
protected abstract void expire(O ob);
|
||||||
|
|
||||||
protected void setExpireInterval(int expireInterval) {
|
protected void setExpireInterval(long expireInterval) {
|
||||||
this.expireInterval = expireInterval;
|
this.expireInterval = expireInterval;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -169,9 +169,12 @@
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<description>The expiry interval for application master reporting.</description>
|
<description>
|
||||||
|
The expiry interval for application master reporting.
|
||||||
|
The default is 900000 ms, or 15m.
|
||||||
|
</description>
|
||||||
<name>yarn.am.liveness-monitor.expiry-interval-ms</name>
|
<name>yarn.am.liveness-monitor.expiry-interval-ms</name>
|
||||||
<value>600000</value>
|
<value>900000</value>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
|
|
|
@ -19,7 +19,9 @@
|
||||||
package org.apache.hadoop.yarn.conf;
|
package org.apache.hadoop.yarn.conf;
|
||||||
|
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import org.apache.commons.lang3.math.NumberUtils;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
|
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
|
||||||
|
@ -247,4 +249,21 @@ public class TestYarnConfiguration {
|
||||||
assertNull(conf.get(
|
assertNull(conf.get(
|
||||||
HAUtil.addSuffix(YarnConfiguration.NM_LOCALIZER_ADDRESS, "rm1")));
|
HAUtil.addSuffix(YarnConfiguration.NM_LOCALIZER_ADDRESS, "rm1")));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void checkRmAmExpiryIntervalSetting() throws Exception {
|
||||||
|
YarnConfiguration conf = new YarnConfiguration();
|
||||||
|
|
||||||
|
// 30m, 1800000ms
|
||||||
|
conf.set(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, "30m");
|
||||||
|
long rmAmExpiryIntervalMS = conf.getTimeDuration(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS,
|
||||||
|
YarnConfiguration.DEFAULT_RM_AM_EXPIRY_INTERVAL_MS, TimeUnit.MILLISECONDS);
|
||||||
|
assertEquals(1800000, rmAmExpiryIntervalMS);
|
||||||
|
|
||||||
|
// 10m, 600000ms
|
||||||
|
conf.set(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, "600000");
|
||||||
|
String rmAmExpiryIntervalMS1 = conf.get(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS);
|
||||||
|
assertTrue(NumberUtils.isDigits(rmAmExpiryIntervalMS1));
|
||||||
|
assertEquals(600000, Long.parseLong(rmAmExpiryIntervalMS1));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,9 +24,12 @@ import java.util.HashSet;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.Timer;
|
import java.util.Timer;
|
||||||
import java.util.TimerTask;
|
import java.util.TimerTask;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.locks.Lock;
|
import java.util.concurrent.locks.Lock;
|
||||||
import java.util.concurrent.locks.ReadWriteLock;
|
import java.util.concurrent.locks.ReadWriteLock;
|
||||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
|
|
||||||
|
import org.apache.commons.lang3.math.NumberUtils;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@ -88,12 +91,17 @@ public class AMRMProxyTokenSecretManager extends
|
||||||
YarnConfiguration.DEFAULT_RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS) * 1000;
|
YarnConfiguration.DEFAULT_RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS) * 1000;
|
||||||
// Adding delay = 1.5 * expiry interval makes sure that all active AMs get
|
// Adding delay = 1.5 * expiry interval makes sure that all active AMs get
|
||||||
// the updated shared-key.
|
// the updated shared-key.
|
||||||
this.activationDelay =
|
String rmAmExpiryIntervalMS = conf.get(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS);
|
||||||
(long) (conf.getLong(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS,
|
if (NumberUtils.isDigits(rmAmExpiryIntervalMS)) {
|
||||||
|
this.activationDelay = (long) (conf.getLong(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS,
|
||||||
YarnConfiguration.DEFAULT_RM_AM_EXPIRY_INTERVAL_MS) * 1.5);
|
YarnConfiguration.DEFAULT_RM_AM_EXPIRY_INTERVAL_MS) * 1.5);
|
||||||
LOG.info("AMRMTokenKeyRollingInterval: " + this.rollingInterval
|
} else {
|
||||||
+ "ms and AMRMTokenKeyActivationDelay: " + this.activationDelay
|
this.activationDelay = (long) (conf.getTimeDuration(
|
||||||
+ " ms");
|
YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS,
|
||||||
|
YarnConfiguration.DEFAULT_RM_AM_EXPIRY_INTERVAL_MS, TimeUnit.MILLISECONDS) * 1.5);
|
||||||
|
}
|
||||||
|
LOG.info("AMRMTokenKeyRollingInterval: {} ms and AMRMTokenKeyActivationDelay: {} ms.",
|
||||||
|
this.rollingInterval, this.activationDelay);
|
||||||
if (rollingInterval <= activationDelay * 2) {
|
if (rollingInterval <= activationDelay * 2) {
|
||||||
throw new IllegalArgumentException(
|
throw new IllegalArgumentException(
|
||||||
YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS
|
YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager;
|
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||||
|
|
||||||
|
import org.apache.commons.lang3.math.NumberUtils;
|
||||||
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
import org.apache.hadoop.classification.VisibleForTesting;
|
import org.apache.hadoop.classification.VisibleForTesting;
|
||||||
import com.sun.jersey.spi.container.servlet.ServletContainer;
|
import com.sun.jersey.spi.container.servlet.ServletContainer;
|
||||||
|
@ -703,6 +704,32 @@ public class ResourceManager extends CompositeService
|
||||||
+ ", " + YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS + "="
|
+ ", " + YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS + "="
|
||||||
+ heartbeatIntvl);
|
+ heartbeatIntvl);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (HAUtil.isFederationEnabled(conf)) {
|
||||||
|
/*
|
||||||
|
* In Yarn Federation, we need UAMs in secondary sub-clusters to stay
|
||||||
|
* alive when the next attempt AM in home sub-cluster gets launched. If
|
||||||
|
* the previous AM died because the node is lost after NM timeout. It will
|
||||||
|
* already be too late if AM timeout is even shorter.
|
||||||
|
*/
|
||||||
|
String rmAmExpiryIntervalMS = conf.get(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS);
|
||||||
|
long amExpireIntvl;
|
||||||
|
if (NumberUtils.isDigits(rmAmExpiryIntervalMS)) {
|
||||||
|
amExpireIntvl = conf.getLong(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS,
|
||||||
|
YarnConfiguration.DEFAULT_RM_AM_EXPIRY_INTERVAL_MS);
|
||||||
|
} else {
|
||||||
|
amExpireIntvl = conf.getTimeDuration(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS,
|
||||||
|
YarnConfiguration.DEFAULT_RM_AM_EXPIRY_INTERVAL_MS, TimeUnit.MILLISECONDS);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (amExpireIntvl <= expireIntvl) {
|
||||||
|
throw new YarnRuntimeException("When Yarn Federation is enabled, "
|
||||||
|
+ "AM expiry interval should be no less than NM expiry interval, "
|
||||||
|
+ YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS + "=" + amExpireIntvl
|
||||||
|
+ ", " + YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS + "="
|
||||||
|
+ expireIntvl);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt;
|
package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt;
|
||||||
|
|
||||||
|
import org.apache.commons.lang3.math.NumberUtils;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
@ -27,6 +28,8 @@ import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
import org.apache.hadoop.yarn.util.AbstractLivelinessMonitor;
|
import org.apache.hadoop.yarn.util.AbstractLivelinessMonitor;
|
||||||
import org.apache.hadoop.yarn.util.Clock;
|
import org.apache.hadoop.yarn.util.Clock;
|
||||||
|
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
public class AMLivelinessMonitor extends AbstractLivelinessMonitor<ApplicationAttemptId> {
|
public class AMLivelinessMonitor extends AbstractLivelinessMonitor<ApplicationAttemptId> {
|
||||||
|
|
||||||
private EventHandler<Event> dispatcher;
|
private EventHandler<Event> dispatcher;
|
||||||
|
@ -43,8 +46,15 @@ public class AMLivelinessMonitor extends AbstractLivelinessMonitor<ApplicationAt
|
||||||
|
|
||||||
public void serviceInit(Configuration conf) throws Exception {
|
public void serviceInit(Configuration conf) throws Exception {
|
||||||
super.serviceInit(conf);
|
super.serviceInit(conf);
|
||||||
int expireIntvl = conf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS,
|
long expireIntvl;
|
||||||
|
String rmAmExpiryIntervalMS = conf.get(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS);
|
||||||
|
if (NumberUtils.isDigits(rmAmExpiryIntervalMS)) {
|
||||||
|
expireIntvl = conf.getLong(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS,
|
||||||
YarnConfiguration.DEFAULT_RM_AM_EXPIRY_INTERVAL_MS);
|
YarnConfiguration.DEFAULT_RM_AM_EXPIRY_INTERVAL_MS);
|
||||||
|
} else {
|
||||||
|
expireIntvl = conf.getTimeDuration(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS,
|
||||||
|
YarnConfiguration.DEFAULT_RM_AM_EXPIRY_INTERVAL_MS, TimeUnit.MILLISECONDS);
|
||||||
|
}
|
||||||
setExpireInterval(expireIntvl);
|
setExpireInterval(expireIntvl);
|
||||||
setMonitorInterval(expireIntvl/3);
|
setMonitorInterval(expireIntvl/3);
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,10 +24,12 @@ import java.util.HashSet;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.Timer;
|
import java.util.Timer;
|
||||||
import java.util.TimerTask;
|
import java.util.TimerTask;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.locks.Lock;
|
import java.util.concurrent.locks.Lock;
|
||||||
import java.util.concurrent.locks.ReadWriteLock;
|
import java.util.concurrent.locks.ReadWriteLock;
|
||||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
|
|
||||||
|
import org.apache.commons.lang3.math.NumberUtils;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
|
@ -90,11 +92,18 @@ public class AMRMTokenSecretManager extends
|
||||||
YarnConfiguration.DEFAULT_RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS) * 1000;
|
YarnConfiguration.DEFAULT_RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS) * 1000;
|
||||||
// Adding delay = 1.5 * expiry interval makes sure that all active AMs get
|
// Adding delay = 1.5 * expiry interval makes sure that all active AMs get
|
||||||
// the updated shared-key.
|
// the updated shared-key.
|
||||||
this.activationDelay =
|
String rmAmExpiryIntervalMS = conf.get(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS);
|
||||||
(long) (conf.getLong(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS,
|
if (NumberUtils.isDigits(rmAmExpiryIntervalMS)) {
|
||||||
|
this.activationDelay = (long) (conf.getLong(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS,
|
||||||
YarnConfiguration.DEFAULT_RM_AM_EXPIRY_INTERVAL_MS) * 1.5);
|
YarnConfiguration.DEFAULT_RM_AM_EXPIRY_INTERVAL_MS) * 1.5);
|
||||||
LOG.info("AMRMTokenKeyRollingInterval: " + this.rollingInterval
|
} else {
|
||||||
+ "ms and AMRMTokenKeyActivationDelay: " + this.activationDelay + " ms");
|
this.activationDelay =
|
||||||
|
(long) (conf.getTimeDuration(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS,
|
||||||
|
YarnConfiguration.DEFAULT_RM_AM_EXPIRY_INTERVAL_MS, TimeUnit.MILLISECONDS) * 1.5);
|
||||||
|
}
|
||||||
|
|
||||||
|
LOG.info("AMRMTokenKeyRollingInterval: {} ms and AMRMTokenKeyActivationDelay: {} ms",
|
||||||
|
this.rollingInterval, this.activationDelay);
|
||||||
if (rollingInterval <= activationDelay * 2) {
|
if (rollingInterval <= activationDelay * 2) {
|
||||||
throw new IllegalArgumentException(
|
throw new IllegalArgumentException(
|
||||||
YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS
|
YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS
|
||||||
|
|
Loading…
Reference in New Issue