diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index efb08326fbe..8b95dcb667a 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -176,6 +176,9 @@ Release 2.4.0 - UNRELEASED YARN-1706. Created an utility method to dump timeline records to JSON strings. (zjshen) + YARN-1641. ZK store should attempt a write periodically to ensure it is + still Active. (kasha) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java index e603e9f8e66..05bfb3bb49e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java @@ -676,11 +676,11 @@ public abstract class RMStateStore extends AbstractService { @SuppressWarnings("unchecked") /** - * In {#handleStoreEvent}, this method is called to notify the - * ResourceManager that the store operation has failed. + * This method is called to notify the ResourceManager that the store + * operation has failed. * @param failureCause the exception due to which the operation failed */ - private void notifyStoreOperationFailed(Exception failureCause) { + protected void notifyStoreOperationFailed(Exception failureCause) { RMFatalEventType type; if (failureCause instanceof StoreFencedException) { type = RMFatalEventType.STATE_STORE_FENCED; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java index d8fdaae0fdd..eebeee791bb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java @@ -137,6 +137,7 @@ public class ZKRMStateStore extends RMStateStore { private String fencingNodePath; private Op createFencingNodePathOp; private Op deleteFencingNodePathOp; + private Thread verifyActiveStatusThread; private String zkRootNodeUsername; private final String zkRootNodePassword = Long.toString(random.nextLong()); @@ -258,6 +259,8 @@ public class ZKRMStateStore extends RMStateStore { createRootDir(zkRootNodePath); if (HAUtil.isHAEnabled(getConfig())){ fence(); + verifyActiveStatusThread = new VerifyActiveStatusThread(); + verifyActiveStatusThread.start(); } createRootDir(rmAppRoot); createRootDir(rmDTSecretManagerRoot); @@ -350,6 +353,10 @@ public class ZKRMStateStore extends RMStateStore { @Override protected synchronized void closeInternal() throws Exception { + if (verifyActiveStatusThread != null) { + verifyActiveStatusThread.interrupt(); + verifyActiveStatusThread.join(1000); + } closeZkClients(); } @@ -856,6 +863,32 @@ public class ZKRMStateStore extends RMStateStore { }.runWithRetries(); } + /** + * Helper class that periodically attempts creating a znode to ensure that + * this RM continues to be the Active. + */ + private class VerifyActiveStatusThread extends Thread { + private List emptyOpList = new ArrayList(); + + VerifyActiveStatusThread() { + super(VerifyActiveStatusThread.class.getName()); + } + + public void run() { + try { + while (true) { + doMultiWithRetries(emptyOpList); + Thread.sleep(zkSessionTimeout); + } + } catch (InterruptedException ie) { + LOG.info(VerifyActiveStatusThread.class.getName() + " thread " + + "interrupted! Exiting!"); + } catch (Exception e) { + notifyStoreOperationFailed(new StoreFencedException()); + } + } + } + private abstract class ZKAction { // run() expects synchronization on ZKRMStateStore.this abstract T run() throws KeeperException, InterruptedException; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java index 41fdca24aef..48fede8c930 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java @@ -23,10 +23,7 @@ import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.ArrayList; import java.util.List; -import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -34,15 +31,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ha.HAServiceProtocol; import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo; import org.apache.hadoop.service.Service; -import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; -import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; -import org.apache.hadoop.yarn.api.records.Priority; -import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.RMStateVersionPBImpl; @@ -54,6 +44,7 @@ import org.junit.Test; public class TestZKRMStateStore extends RMStateStoreTestBase { public static final Log LOG = LogFactory.getLog(TestZKRMStateStore.class); + private static final int ZK_TIMEOUT_MS = 1000; class TestZKRMStateStoreTester implements RMStateStoreHelper { @@ -141,6 +132,7 @@ public class TestZKRMStateStore extends RMStateStoreTestBase { conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); conf.set(YarnConfiguration.RM_STORE, ZKRMStateStore.class.getName()); conf.set(YarnConfiguration.RM_ZK_ADDRESS, hostPort); + conf.setInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, ZK_TIMEOUT_MS); conf.set(YarnConfiguration.RM_HA_ID, rmId); for (String rpcAddress : YarnConfiguration.RM_SERVICES_ADDRESS_CONF_KEYS) { for (String id : HAUtil.getRMHAIds(conf)) { @@ -182,26 +174,7 @@ public class TestZKRMStateStore extends RMStateStoreTestBase { HAServiceProtocol.HAServiceState.ACTIVE, rm2.getRMContext().getRMAdminService().getServiceStatus().getState()); - // Submitting an application to RM1 to trigger a state store operation. - // RM1 should realize that it got fenced and is not the Active RM anymore. - Map mockMap = mock(Map.class); - ApplicationSubmissionContext asc = - ApplicationSubmissionContext.newInstance( - ApplicationId.newInstance(1000, 1), - "testApplication", // app Name - "default", // queue name - Priority.newInstance(0), - ContainerLaunchContext.newInstance(mockMap, mockMap, - new ArrayList(), mockMap, mock(ByteBuffer.class), - mockMap), - false, // unmanaged AM - true, // cancelTokens - 1, // max app attempts - Resource.newInstance(1024, 1)); - ClientRMService rmService = rm1.getClientRMService(); - rmService.submitApplication(SubmitApplicationRequest.newInstance(asc)); - - for (int i = 0; i < 30; i++) { + for (int i = 0; i < ZK_TIMEOUT_MS / 50; i++) { if (HAServiceProtocol.HAServiceState.ACTIVE == rm1.getRMContext().getRMAdminService().getServiceStatus().getState()) { Thread.sleep(100);