YARN-3094. Reset timer for liveness monitors after RM recovery. Contributed by Jun Gong

(cherry picked from commit 0af6a99a3f)
This commit is contained in:
Jian He 2015-02-09 13:47:08 -08:00
parent ff900eb64a
commit 6146680955
5 changed files with 100 additions and 0 deletions

View File

@ -482,6 +482,9 @@ Release 2.7.0 - UNRELEASED
YARN-3143. RM Apps REST API can return NPE or entries missing id and other
fields (jlowe)
YARN-3094. Reset timer for liveness monitors after RM recovery. (Jun Gong
via jianhe)
Release 2.6.0 - 2014-11-18
INCOMPATIBLE CHANGES

View File

@ -59,6 +59,7 @@ public abstract class AbstractLivelinessMonitor<O> extends AbstractService {
@Override
protected void serviceStart() throws Exception {
assert !stopped : "starting when already stopped";
resetTimer();
checkerThread = new Thread(new PingChecker());
checkerThread.setName("Ping Checker");
checkerThread.start();
@ -99,6 +100,13 @@ public abstract class AbstractLivelinessMonitor<O> extends AbstractService {
running.remove(ob);
}
public synchronized void resetTimer() {
long time = clock.getTime();
for (O ob : running.keySet()) {
running.put(ob, time);
}
}
private class PingChecker implements Runnable {
@Override

View File

@ -564,12 +564,14 @@ public class ResourceManager extends CompositeService implements Recoverable {
if(recoveryEnabled) {
try {
LOG.info("Recovery started");
rmStore.checkVersion();
if (rmContext.isWorkPreservingRecoveryEnabled()) {
rmContext.setEpoch(rmStore.getAndIncrementEpoch());
}
RMState state = rmStore.loadState();
recover(state);
LOG.info("Recovery ended");
} catch (Exception e) {
// the Exception from loadState() needs to be handled for
// HA and we need to give up master status if we got fenced

View File

@ -24,6 +24,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.util.AbstractLivelinessMonitor;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
public class AMLivelinessMonitor extends AbstractLivelinessMonitor<ApplicationAttemptId> {
@ -35,6 +36,11 @@ public class AMLivelinessMonitor extends AbstractLivelinessMonitor<ApplicationAt
this.dispatcher = d.getEventHandler();
}
public AMLivelinessMonitor(Dispatcher d, Clock clock) {
super("AMLivelinessMonitor", clock);
this.dispatcher = d.getEventHandler();
}
public void serviceInit(Configuration conf) throws Exception {
super.serviceInit(conf);
int expireIntvl = conf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS,

View File

@ -0,0 +1,81 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
import org.apache.hadoop.yarn.util.ControlledClock;
import org.apache.hadoop.yarn.util.SystemClock;
import org.junit.Assert;
import org.junit.Test;
import static org.mockito.Mockito.mock;
public class TestAMLivelinessMonitor {
@Test(timeout = 10000)
public void testResetTimer() throws Exception {
YarnConfiguration conf = new YarnConfiguration();
UserGroupInformation.setConfiguration(conf);
conf.set(YarnConfiguration.RECOVERY_ENABLED, "true");
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, true);
conf.setInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 6000);
final ControlledClock clock = new ControlledClock(new SystemClock());
clock.setTime(0);
MemoryRMStateStore memStore = new MemoryRMStateStore() {
@Override
public synchronized RMState loadState() throws Exception {
clock.setTime(8000);
return super.loadState();
}
};
memStore.init(conf);
final ApplicationAttemptId attemptId = mock(ApplicationAttemptId.class);
final Dispatcher dispatcher = mock(Dispatcher.class);
final boolean[] expired = new boolean[]{false};
final AMLivelinessMonitor monitor = new AMLivelinessMonitor(
dispatcher, clock) {
@Override
protected void expire(ApplicationAttemptId id) {
Assert.assertEquals(id, attemptId);
expired[0] = true;
}
};
monitor.register(attemptId);
MockRM rm = new MockRM(conf, memStore) {
@Override
protected AMLivelinessMonitor createAMLivelinessMonitor() {
return monitor;
}
};
rm.start();
// make sure that monitor has started
while (monitor.getServiceState() != Service.STATE.STARTED) {
Thread.sleep(100);
}
// expired[0] would be set to true without resetTimer
Assert.assertFalse(expired[0]);
rm.stop();
}
}