YARN-1641. ZK store should attempt a write periodically to ensure it is still Active. (kasha)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1567629 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
4e3aff0e25
commit
ee0d2f6eab
|
@ -161,6 +161,9 @@ Release 2.4.0 - UNRELEASED
|
||||||
YARN-1706. Created an utility method to dump timeline records to JSON
|
YARN-1706. Created an utility method to dump timeline records to JSON
|
||||||
strings. (zjshen)
|
strings. (zjshen)
|
||||||
|
|
||||||
|
YARN-1641. ZK store should attempt a write periodically to ensure it is
|
||||||
|
still Active. (kasha)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
|
@ -676,11 +676,11 @@ public abstract class RMStateStore extends AbstractService {
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
/**
|
/**
|
||||||
* In {#handleStoreEvent}, this method is called to notify the
|
* This method is called to notify the ResourceManager that the store
|
||||||
* ResourceManager that the store operation has failed.
|
* operation has failed.
|
||||||
* @param failureCause the exception due to which the operation failed
|
* @param failureCause the exception due to which the operation failed
|
||||||
*/
|
*/
|
||||||
private void notifyStoreOperationFailed(Exception failureCause) {
|
protected void notifyStoreOperationFailed(Exception failureCause) {
|
||||||
RMFatalEventType type;
|
RMFatalEventType type;
|
||||||
if (failureCause instanceof StoreFencedException) {
|
if (failureCause instanceof StoreFencedException) {
|
||||||
type = RMFatalEventType.STATE_STORE_FENCED;
|
type = RMFatalEventType.STATE_STORE_FENCED;
|
||||||
|
|
|
@ -137,6 +137,7 @@ public class ZKRMStateStore extends RMStateStore {
|
||||||
private String fencingNodePath;
|
private String fencingNodePath;
|
||||||
private Op createFencingNodePathOp;
|
private Op createFencingNodePathOp;
|
||||||
private Op deleteFencingNodePathOp;
|
private Op deleteFencingNodePathOp;
|
||||||
|
private Thread verifyActiveStatusThread;
|
||||||
private String zkRootNodeUsername;
|
private String zkRootNodeUsername;
|
||||||
private final String zkRootNodePassword = Long.toString(random.nextLong());
|
private final String zkRootNodePassword = Long.toString(random.nextLong());
|
||||||
|
|
||||||
|
@ -258,6 +259,8 @@ public class ZKRMStateStore extends RMStateStore {
|
||||||
createRootDir(zkRootNodePath);
|
createRootDir(zkRootNodePath);
|
||||||
if (HAUtil.isHAEnabled(getConfig())){
|
if (HAUtil.isHAEnabled(getConfig())){
|
||||||
fence();
|
fence();
|
||||||
|
verifyActiveStatusThread = new VerifyActiveStatusThread();
|
||||||
|
verifyActiveStatusThread.start();
|
||||||
}
|
}
|
||||||
createRootDir(rmAppRoot);
|
createRootDir(rmAppRoot);
|
||||||
createRootDir(rmDTSecretManagerRoot);
|
createRootDir(rmDTSecretManagerRoot);
|
||||||
|
@ -350,6 +353,10 @@ public class ZKRMStateStore extends RMStateStore {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected synchronized void closeInternal() throws Exception {
|
protected synchronized void closeInternal() throws Exception {
|
||||||
|
if (verifyActiveStatusThread != null) {
|
||||||
|
verifyActiveStatusThread.interrupt();
|
||||||
|
verifyActiveStatusThread.join(1000);
|
||||||
|
}
|
||||||
closeZkClients();
|
closeZkClients();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -856,6 +863,32 @@ public class ZKRMStateStore extends RMStateStore {
|
||||||
}.runWithRetries();
|
}.runWithRetries();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Helper class that periodically attempts creating a znode to ensure that
|
||||||
|
* this RM continues to be the Active.
|
||||||
|
*/
|
||||||
|
private class VerifyActiveStatusThread extends Thread {
|
||||||
|
private List<Op> emptyOpList = new ArrayList<Op>();
|
||||||
|
|
||||||
|
VerifyActiveStatusThread() {
|
||||||
|
super(VerifyActiveStatusThread.class.getName());
|
||||||
|
}
|
||||||
|
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
while (true) {
|
||||||
|
doMultiWithRetries(emptyOpList);
|
||||||
|
Thread.sleep(zkSessionTimeout);
|
||||||
|
}
|
||||||
|
} catch (InterruptedException ie) {
|
||||||
|
LOG.info(VerifyActiveStatusThread.class.getName() + " thread " +
|
||||||
|
"interrupted! Exiting!");
|
||||||
|
} catch (Exception e) {
|
||||||
|
notifyStoreOperationFailed(new StoreFencedException());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private abstract class ZKAction<T> {
|
private abstract class ZKAction<T> {
|
||||||
// run() expects synchronization on ZKRMStateStore.this
|
// run() expects synchronization on ZKRMStateStore.this
|
||||||
abstract T run() throws KeeperException, InterruptedException;
|
abstract T run() throws KeeperException, InterruptedException;
|
||||||
|
|
|
@ -23,10 +23,7 @@ import static org.junit.Assert.assertTrue;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.ByteBuffer;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
@ -34,15 +31,8 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.ha.HAServiceProtocol;
|
import org.apache.hadoop.ha.HAServiceProtocol;
|
||||||
import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
|
import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
|
||||||
import org.apache.hadoop.service.Service;
|
import org.apache.hadoop.service.Service;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
|
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
|
||||||
import org.apache.hadoop.yarn.api.records.Priority;
|
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
|
||||||
import org.apache.hadoop.yarn.conf.HAUtil;
|
import org.apache.hadoop.yarn.conf.HAUtil;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.RMStateVersionPBImpl;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.RMStateVersionPBImpl;
|
||||||
|
@ -54,6 +44,7 @@ import org.junit.Test;
|
||||||
public class TestZKRMStateStore extends RMStateStoreTestBase {
|
public class TestZKRMStateStore extends RMStateStoreTestBase {
|
||||||
|
|
||||||
public static final Log LOG = LogFactory.getLog(TestZKRMStateStore.class);
|
public static final Log LOG = LogFactory.getLog(TestZKRMStateStore.class);
|
||||||
|
private static final int ZK_TIMEOUT_MS = 1000;
|
||||||
|
|
||||||
class TestZKRMStateStoreTester implements RMStateStoreHelper {
|
class TestZKRMStateStoreTester implements RMStateStoreHelper {
|
||||||
|
|
||||||
|
@ -141,6 +132,7 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
|
||||||
conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
|
conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
|
||||||
conf.set(YarnConfiguration.RM_STORE, ZKRMStateStore.class.getName());
|
conf.set(YarnConfiguration.RM_STORE, ZKRMStateStore.class.getName());
|
||||||
conf.set(YarnConfiguration.RM_ZK_ADDRESS, hostPort);
|
conf.set(YarnConfiguration.RM_ZK_ADDRESS, hostPort);
|
||||||
|
conf.setInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, ZK_TIMEOUT_MS);
|
||||||
conf.set(YarnConfiguration.RM_HA_ID, rmId);
|
conf.set(YarnConfiguration.RM_HA_ID, rmId);
|
||||||
for (String rpcAddress : YarnConfiguration.RM_SERVICES_ADDRESS_CONF_KEYS) {
|
for (String rpcAddress : YarnConfiguration.RM_SERVICES_ADDRESS_CONF_KEYS) {
|
||||||
for (String id : HAUtil.getRMHAIds(conf)) {
|
for (String id : HAUtil.getRMHAIds(conf)) {
|
||||||
|
@ -182,26 +174,7 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
|
||||||
HAServiceProtocol.HAServiceState.ACTIVE,
|
HAServiceProtocol.HAServiceState.ACTIVE,
|
||||||
rm2.getRMContext().getRMAdminService().getServiceStatus().getState());
|
rm2.getRMContext().getRMAdminService().getServiceStatus().getState());
|
||||||
|
|
||||||
// Submitting an application to RM1 to trigger a state store operation.
|
for (int i = 0; i < ZK_TIMEOUT_MS / 50; i++) {
|
||||||
// RM1 should realize that it got fenced and is not the Active RM anymore.
|
|
||||||
Map mockMap = mock(Map.class);
|
|
||||||
ApplicationSubmissionContext asc =
|
|
||||||
ApplicationSubmissionContext.newInstance(
|
|
||||||
ApplicationId.newInstance(1000, 1),
|
|
||||||
"testApplication", // app Name
|
|
||||||
"default", // queue name
|
|
||||||
Priority.newInstance(0),
|
|
||||||
ContainerLaunchContext.newInstance(mockMap, mockMap,
|
|
||||||
new ArrayList<String>(), mockMap, mock(ByteBuffer.class),
|
|
||||||
mockMap),
|
|
||||||
false, // unmanaged AM
|
|
||||||
true, // cancelTokens
|
|
||||||
1, // max app attempts
|
|
||||||
Resource.newInstance(1024, 1));
|
|
||||||
ClientRMService rmService = rm1.getClientRMService();
|
|
||||||
rmService.submitApplication(SubmitApplicationRequest.newInstance(asc));
|
|
||||||
|
|
||||||
for (int i = 0; i < 30; i++) {
|
|
||||||
if (HAServiceProtocol.HAServiceState.ACTIVE ==
|
if (HAServiceProtocol.HAServiceState.ACTIVE ==
|
||||||
rm1.getRMContext().getRMAdminService().getServiceStatus().getState()) {
|
rm1.getRMContext().getRMAdminService().getServiceStatus().getState()) {
|
||||||
Thread.sleep(100);
|
Thread.sleep(100);
|
||||||
|
|
Loading…
Reference in New Issue