YARN-4584. RM startup failure when AM attempts greater than max-attempts. (Bibin A Chundatt via rohithsharmaks)
This commit is contained in:
parent
b2ffcc2915
commit
e30668106d
|
@ -137,6 +137,9 @@ Release 2.9.0 - UNRELEASED
|
||||||
YARN-4611. Fix scheduler load simulator to support multi-layer network
|
YARN-4611. Fix scheduler load simulator to support multi-layer network
|
||||||
location. (Ming Ma via xgong)
|
location. (Ming Ma via xgong)
|
||||||
|
|
||||||
|
YARN-4584. RM startup failure when AM attempts greater than max-attempts.
|
||||||
|
(Bibin A Chundatt via rohithsharmaks)
|
||||||
|
|
||||||
Release 2.8.0 - UNRELEASED
|
Release 2.8.0 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -841,7 +841,7 @@ public class RMAppImpl implements RMApp, Recoverable {
|
||||||
this.startTime = appState.getStartTime();
|
this.startTime = appState.getStartTime();
|
||||||
this.callerContext = appState.getCallerContext();
|
this.callerContext = appState.getCallerContext();
|
||||||
// If interval > 0, some attempts might have been deleted.
|
// If interval > 0, some attempts might have been deleted.
|
||||||
if (submissionContext.getAttemptFailuresValidityInterval() > 0) {
|
if (this.attemptFailuresValidityInterval > 0) {
|
||||||
this.firstAttemptIdInStateStore = appState.getFirstAttemptId();
|
this.firstAttemptIdInStateStore = appState.getFirstAttemptId();
|
||||||
this.nextAttemptId = firstAttemptIdInStateStore;
|
this.nextAttemptId = firstAttemptIdInStateStore;
|
||||||
}
|
}
|
||||||
|
@ -1341,7 +1341,9 @@ public class RMAppImpl implements RMApp, Recoverable {
|
||||||
+ "is " + numberOfFailure + ". The max attempts is "
|
+ "is " + numberOfFailure + ". The max attempts is "
|
||||||
+ app.maxAppAttempts);
|
+ app.maxAppAttempts);
|
||||||
|
|
||||||
removeExcessAttempts(app);
|
if (app.attemptFailuresValidityInterval > 0) {
|
||||||
|
removeExcessAttempts(app);
|
||||||
|
}
|
||||||
|
|
||||||
if (!app.submissionContext.getUnmanagedAM()
|
if (!app.submissionContext.getUnmanagedAM()
|
||||||
&& numberOfFailure < app.maxAppAttempts) {
|
&& numberOfFailure < app.maxAppAttempts) {
|
||||||
|
@ -1381,15 +1383,22 @@ public class RMAppImpl implements RMApp, Recoverable {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void removeExcessAttempts(RMAppImpl app) {
|
private void removeExcessAttempts(RMAppImpl app) {
|
||||||
while (app.nextAttemptId - app.firstAttemptIdInStateStore
|
while (app.nextAttemptId
|
||||||
> app.maxAppAttempts) {
|
- app.firstAttemptIdInStateStore > app.maxAppAttempts) {
|
||||||
// attempts' first element is oldest attempt because it is a
|
// attempts' first element is oldest attempt because it is a
|
||||||
// LinkedHashMap
|
// LinkedHashMap
|
||||||
ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(
|
ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(
|
||||||
app.getApplicationId(), app.firstAttemptIdInStateStore);
|
app.getApplicationId(), app.firstAttemptIdInStateStore);
|
||||||
app.firstAttemptIdInStateStore++;
|
RMAppAttempt rmAppAttempt = app.getRMAppAttempt(attemptId);
|
||||||
LOG.info("Remove attempt from state store : " + attemptId);
|
long endTime = app.systemClock.getTime();
|
||||||
app.rmContext.getStateStore().removeApplicationAttempt(attemptId);
|
if (rmAppAttempt.getFinishTime() < (endTime
|
||||||
|
- app.attemptFailuresValidityInterval)) {
|
||||||
|
app.firstAttemptIdInStateStore++;
|
||||||
|
LOG.info("Remove attempt from state store : " + attemptId);
|
||||||
|
app.rmContext.getStateStore().removeApplicationAttempt(attemptId);
|
||||||
|
} else {
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -42,6 +42,8 @@ import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
import org.apache.commons.io.FileUtils;
|
import org.apache.commons.io.FileUtils;
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||||
import org.apache.hadoop.io.DataOutputBuffer;
|
import org.apache.hadoop.io.DataOutputBuffer;
|
||||||
|
@ -104,6 +106,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
||||||
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||||
|
@ -121,6 +125,7 @@ import com.google.common.collect.ImmutableMap;
|
||||||
import com.google.common.collect.Sets;
|
import com.google.common.collect.Sets;
|
||||||
|
|
||||||
public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
||||||
|
private static final Log LOG = LogFactory.getLog(TestRMRestart.class);
|
||||||
private final static File TEMP_DIR = new File(System.getProperty(
|
private final static File TEMP_DIR = new File(System.getProperty(
|
||||||
"test.build.data", "/tmp"), "decommision");
|
"test.build.data", "/tmp"), "decommision");
|
||||||
private File hostFile = new File(TEMP_DIR + File.separator + "hostFile.txt");
|
private File hostFile = new File(TEMP_DIR + File.separator + "hostFile.txt");
|
||||||
|
@ -2321,4 +2326,57 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
||||||
rm2.stop();
|
rm2.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 120000)
|
||||||
|
public void testRMRestartAfterPreemption() throws Exception {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
|
||||||
|
if (!getSchedulerType().equals(SchedulerType.CAPACITY)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||||
|
memStore.init(conf);
|
||||||
|
// start RM
|
||||||
|
MockRM rm1 = new MockRM(conf, memStore);
|
||||||
|
rm1.start();
|
||||||
|
MockNM nm1 =
|
||||||
|
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
|
||||||
|
nm1.registerNode();
|
||||||
|
int CONTAINER_MEMORY = 1024;
|
||||||
|
// create app and launch the AM
|
||||||
|
RMApp app0 = rm1.submitApp(CONTAINER_MEMORY);
|
||||||
|
MockAM am0 = MockRM.launchAM(app0, rm1, nm1);
|
||||||
|
nm1.nodeHeartbeat(am0.getApplicationAttemptId(), 1,
|
||||||
|
ContainerState.COMPLETE);
|
||||||
|
am0.waitForState(RMAppAttemptState.FAILED);
|
||||||
|
for (int i = 0; i < 4; i++) {
|
||||||
|
am0 = MockRM.launchAM(app0, rm1, nm1);
|
||||||
|
am0.registerAppAttempt();
|
||||||
|
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
|
||||||
|
// get scheduler app
|
||||||
|
FiCaSchedulerApp schedulerAppAttempt = cs.getSchedulerApplications()
|
||||||
|
.get(app0.getApplicationId()).getCurrentAppAttempt();
|
||||||
|
// kill app0-attempt
|
||||||
|
cs.killPreemptedContainer(schedulerAppAttempt.getRMContainer(
|
||||||
|
app0.getCurrentAppAttempt().getMasterContainer().getId()));
|
||||||
|
}
|
||||||
|
am0 = MockRM.launchAM(app0, rm1, nm1);
|
||||||
|
am0.registerAppAttempt();
|
||||||
|
rm1.killApp(app0.getApplicationId());
|
||||||
|
rm1.waitForState(app0.getCurrentAppAttempt().getAppAttemptId(),
|
||||||
|
RMAppAttemptState.KILLED);
|
||||||
|
|
||||||
|
MockRM rm2 = null;
|
||||||
|
// start RM2
|
||||||
|
try {
|
||||||
|
rm2 = new MockRM(conf, memStore);
|
||||||
|
rm2.start();
|
||||||
|
Assert.assertTrue("RM start successfully", true);
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.debug("Exception on start", e);
|
||||||
|
Assert.fail("RM should start with out any issue");
|
||||||
|
} finally {
|
||||||
|
rm1.stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue