YARN-7053. Move curator transaction support to ZKCuratorManager. (Jonathan Hung via Subru).

(cherry picked from commit 4249172e1419acdb2b69ae3db43dc59da2aa2e03)
This commit is contained in:
Subru Krishnan 2017-08-22 19:20:57 -07:00
parent 2299c8dffa
commit a45ffdcddc
3 changed files with 164 additions and 102 deletions

View File

@ -26,6 +26,8 @@
import org.apache.curator.framework.AuthInfo;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.transaction.CuratorTransaction;
import org.apache.curator.framework.api.transaction.CuratorTransactionFinal;
import org.apache.curator.retry.RetryNTimes;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
@ -54,7 +56,6 @@ public final class ZKCuratorManager {
/** Curator for ZooKeeper. */
private CuratorFramework curator;
public ZKCuratorManager(Configuration config) throws IOException {
this.conf = config;
}
@ -119,7 +120,6 @@ public static List<ZKUtil.ZKAuthInfo> getZKAuths(Configuration conf)
/**
* Start the connection to the ZooKeeper ensemble.
* @param conf Configuration for the connection.
* @throws IOException If the connection cannot be started.
*/
public void start() throws IOException {
@ -128,7 +128,6 @@ public void start() throws IOException {
/**
* Start the connection to the ZooKeeper ensemble.
* @param conf Configuration for the connection.
* @param authInfos List of authentication keys.
* @throws IOException If the connection cannot be started.
*/
@ -337,4 +336,87 @@ public boolean delete(final String path) throws Exception {
public static String getNodePath(String root, String nodeName) {
return root + "/" + nodeName;
}
public void safeCreate(String path, byte[] data, List<ACL> acl,
CreateMode mode, List<ACL> fencingACL, String fencingNodePath)
throws Exception {
if (!exists(path)) {
SafeTransaction transaction = createTransaction(fencingACL,
fencingNodePath);
transaction.create(path, data, acl, mode);
transaction.commit();
}
}
/**
* Deletes the path. Checks for existence of path as well.
* @param path Path to be deleted.
* @throws Exception if any problem occurs while performing deletion.
*/
public void safeDelete(final String path, List<ACL> fencingACL,
String fencingNodePath) throws Exception {
if (exists(path)) {
SafeTransaction transaction = createTransaction(fencingACL,
fencingNodePath);
transaction.delete(path);
transaction.commit();
}
}
public void safeSetData(String path, byte[] data, int version,
List<ACL> fencingACL, String fencingNodePath)
throws Exception {
SafeTransaction transaction = createTransaction(fencingACL,
fencingNodePath);
transaction.setData(path, data, version);
transaction.commit();
}
public SafeTransaction createTransaction(List<ACL> fencingACL,
String fencingNodePath) throws Exception {
return new SafeTransaction(fencingACL, fencingNodePath);
}
/**
* Use curator transactions to ensure zk-operations are performed in an all
* or nothing fashion. This is equivalent to using ZooKeeper#multi.
*
* TODO (YARN-3774): Curator 3.0 introduces CuratorOp similar to Op. We ll
* have to rewrite this inner class when we adopt that.
*/
public class SafeTransaction {
private CuratorTransactionFinal transactionFinal;
private String fencingNodePath;
SafeTransaction(List<ACL> fencingACL, String fencingNodePath)
throws Exception {
this.fencingNodePath = fencingNodePath;
CuratorTransaction transaction = curator.inTransaction();
transactionFinal = transaction.create()
.withMode(CreateMode.PERSISTENT).withACL(fencingACL)
.forPath(fencingNodePath, new byte[0]).and();
}
public void commit() throws Exception {
transactionFinal = transactionFinal.delete()
.forPath(fencingNodePath).and();
transactionFinal.commit();
}
public void create(String path, byte[] data, List<ACL> acl, CreateMode mode)
throws Exception {
transactionFinal = transactionFinal.create()
.withMode(mode).withACL(acl).forPath(path, data).and();
}
public void delete(String path) throws Exception {
transactionFinal = transactionFinal.delete().forPath(path).and();
}
public void setData(String path, byte[] data, int version)
throws Exception {
transactionFinal = transactionFinal.setData()
.withVersion(version).forPath(path, data).and();
}
}
}

View File

@ -21,11 +21,15 @@
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.util.Arrays;
import java.util.List;
import org.apache.curator.test.TestingServer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.util.ZKUtil;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.ACL;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -92,4 +96,39 @@ public void testChildren() throws Exception {
children = curator.getChildren("/");
assertEquals(2, children.size());
}
@Test
public void testTransaction() throws Exception {
List<ACL> zkAcl = ZKUtil.parseACLs(CommonConfigurationKeys.ZK_ACL_DEFAULT);
String fencingNodePath = "/fencing";
String node1 = "/node1";
String node2 = "/node2";
byte[] testData = "testData".getBytes("UTF-8");
assertFalse(curator.exists(fencingNodePath));
assertFalse(curator.exists(node1));
assertFalse(curator.exists(node2));
ZKCuratorManager.SafeTransaction txn = curator.createTransaction(
zkAcl, fencingNodePath);
txn.create(node1, testData, zkAcl, CreateMode.PERSISTENT);
txn.create(node2, testData, zkAcl, CreateMode.PERSISTENT);
assertFalse(curator.exists(fencingNodePath));
assertFalse(curator.exists(node1));
assertFalse(curator.exists(node2));
txn.commit();
assertFalse(curator.exists(fencingNodePath));
assertTrue(curator.exists(node1));
assertTrue(curator.exists(node2));
assertTrue(Arrays.equals(testData, curator.getData(node1)));
assertTrue(Arrays.equals(testData, curator.getData(node2)));
byte[] setData = "setData".getBytes("UTF-8");
txn = curator.createTransaction(zkAcl, fencingNodePath);
txn.setData(node1, setData, -1);
txn.delete(node2);
assertTrue(curator.exists(node2));
assertTrue(Arrays.equals(testData, curator.getData(node1)));
txn.commit();
assertFalse(curator.exists(node2));
assertTrue(Arrays.equals(setData, curator.getData(node1)));
}
}

View File

@ -22,8 +22,6 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.transaction.CuratorTransaction;
import org.apache.curator.framework.api.transaction.CuratorTransactionFinal;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
@ -31,6 +29,7 @@
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.util.ZKUtil;
import org.apache.hadoop.util.curator.ZKCuratorManager;
import org.apache.hadoop.util.curator.ZKCuratorManager.SafeTransaction;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ReservationId;
@ -416,9 +415,10 @@ protected synchronized void storeVersion() throws Exception {
((VersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray();
if (exists(versionNodePath)) {
safeSetData(versionNodePath, data, -1);
zkManager.safeSetData(versionNodePath, data, -1, zkAcl, fencingNodePath);
} else {
safeCreate(versionNodePath, data, zkAcl, CreateMode.PERSISTENT);
zkManager.safeCreate(versionNodePath, data, zkAcl, CreateMode.PERSISTENT,
zkAcl, fencingNodePath);
}
}
@ -447,12 +447,14 @@ public synchronized long getAndIncrementEpoch() throws Exception {
// increment epoch and store it
byte[] storeData = Epoch.newInstance(currentEpoch + 1).getProto()
.toByteArray();
safeSetData(epochNodePath, storeData, -1);
zkManager.safeSetData(epochNodePath, storeData, -1, zkAcl,
fencingNodePath);
} else {
// initialize epoch node with 1 for the next time.
byte[] storeData = Epoch.newInstance(currentEpoch + 1).getProto()
.toByteArray();
safeCreate(epochNodePath, storeData, zkAcl, CreateMode.PERSISTENT);
zkManager.safeCreate(epochNodePath, storeData, zkAcl,
CreateMode.PERSISTENT, zkAcl, fencingNodePath);
}
return currentEpoch;
@ -722,7 +724,7 @@ private void checkRemoveParentAppNode(String appIdPath, int splitIndex)
// No apps stored under parent path.
if (children != null && children.isEmpty()) {
try {
safeDelete(parentAppNode);
zkManager.safeDelete(parentAppNode, zkAcl, fencingNodePath);
if (LOG.isDebugEnabled()) {
LOG.debug("No leaf app node exists. Removing parent node " +
parentAppNode);
@ -750,7 +752,8 @@ public synchronized void storeApplicationStateInternal(ApplicationId appId,
byte[] appStateData = appStateDataPB.getProto().toByteArray();
if (appStateData.length <= zknodeLimit) {
safeCreate(nodeCreatePath, appStateData, zkAcl, CreateMode.PERSISTENT);
zkManager.safeCreate(nodeCreatePath, appStateData, zkAcl,
CreateMode.PERSISTENT, zkAcl, fencingNodePath);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Application state data size for " + appId + " is "
@ -781,7 +784,8 @@ protected synchronized void updateApplicationStateInternal(
String rootNode =
getSplitAppNodeParent(nodeUpdatePath, appIdNodeSplitIndex);
if (!exists(rootNode)) {
safeCreate(rootNode, null, zkAcl, CreateMode.PERSISTENT);
zkManager.safeCreate(rootNode, null, zkAcl, CreateMode.PERSISTENT,
zkAcl, fencingNodePath);
}
}
}
@ -795,9 +799,11 @@ protected synchronized void updateApplicationStateInternal(
byte[] appStateData = appStateDataPB.getProto().toByteArray();
if (pathExists) {
safeSetData(nodeUpdatePath, appStateData, -1);
zkManager.safeSetData(nodeUpdatePath, appStateData, -1, zkAcl,
fencingNodePath);
} else {
safeCreate(nodeUpdatePath, appStateData, zkAcl, CreateMode.PERSISTENT);
zkManager.safeCreate(nodeUpdatePath, appStateData, zkAcl,
CreateMode.PERSISTENT, zkAcl, fencingNodePath);
if (LOG.isDebugEnabled()) {
LOG.debug("Path " + nodeUpdatePath + " for " + appId + " didn't " +
"exist. Creating a new znode to update the application state.");
@ -840,9 +846,11 @@ private void handleApplicationAttemptStateOp(
switch (operation) {
case UPDATE:
if (exists(path)) {
safeSetData(path, attemptStateData, -1);
zkManager.safeSetData(path, attemptStateData, -1, zkAcl,
fencingNodePath);
} else {
safeCreate(path, attemptStateData, zkAcl, CreateMode.PERSISTENT);
zkManager.safeCreate(path, attemptStateData, zkAcl,
CreateMode.PERSISTENT, zkAcl, fencingNodePath);
if (LOG.isDebugEnabled()) {
LOG.debug("Path " + path + " for " + appAttemptId + " didn't exist." +
" Created a new znode to update the application attempt state.");
@ -850,10 +858,11 @@ private void handleApplicationAttemptStateOp(
}
break;
case STORE:
safeCreate(path, attemptStateData, zkAcl, CreateMode.PERSISTENT);
zkManager.safeCreate(path, attemptStateData, zkAcl, CreateMode.PERSISTENT,
zkAcl, fencingNodePath);
break;
case REMOVE:
safeDelete(path);
zkManager.safeDelete(path, zkAcl, fencingNodePath);
break;
default:
break;
@ -931,10 +940,10 @@ private void removeApp(String removeAppId, boolean safeRemove,
for (ApplicationAttemptId attemptId : attempts) {
String attemptRemovePath =
getNodePath(appIdRemovePath, attemptId.toString());
safeDelete(attemptRemovePath);
zkManager.safeDelete(attemptRemovePath, zkAcl, fencingNodePath);
}
}
safeDelete(appIdRemovePath);
zkManager.safeDelete(appIdRemovePath, zkAcl, fencingNodePath);
} else {
CuratorFramework curatorFramework = zkManager.getCurator();
curatorFramework.delete().deletingChildrenIfNeeded().
@ -948,7 +957,7 @@ private void removeApp(String removeAppId, boolean safeRemove,
protected synchronized void storeRMDelegationTokenState(
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate)
throws Exception {
SafeTransaction trx = new SafeTransaction();
SafeTransaction trx = zkManager.createTransaction(zkAcl, fencingNodePath);
addStoreOrUpdateOps(trx, rmDTIdentifier, renewDate, false);
trx.commit();
}
@ -965,14 +974,14 @@ protected synchronized void removeRMDelegationTokenState(
+ rmDTIdentifier.getSequenceNumber());
}
safeDelete(nodeRemovePath);
zkManager.safeDelete(nodeRemovePath, zkAcl, fencingNodePath);
}
@Override
protected synchronized void updateRMDelegationTokenState(
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate)
throws Exception {
SafeTransaction trx = new SafeTransaction();
SafeTransaction trx = zkManager.createTransaction(zkAcl, fencingNodePath);
String nodeRemovePath =
getNodePath(delegationTokensRootPath, DELEGATION_TOKEN_PREFIX
+ rmDTIdentifier.getSequenceNumber());
@ -1036,8 +1045,8 @@ protected synchronized void storeRMDTMasterKeyState(
ByteArrayOutputStream os = new ByteArrayOutputStream();
try(DataOutputStream fsOut = new DataOutputStream(os)) {
delegationKey.write(fsOut);
safeCreate(nodeCreatePath, os.toByteArray(), zkAcl,
CreateMode.PERSISTENT);
zkManager.safeCreate(nodeCreatePath, os.toByteArray(), zkAcl,
CreateMode.PERSISTENT, zkAcl, fencingNodePath);
}
}
@ -1052,7 +1061,7 @@ protected synchronized void removeRMDTMasterKeyState(
LOG.debug("Removing RMDelegationKey_" + delegationKey.getKeyId());
}
safeDelete(nodeRemovePath);
zkManager.safeDelete(nodeRemovePath, zkAcl, fencingNodePath);
}
@Override
@ -1079,7 +1088,8 @@ protected synchronized void storeOrUpdateAMRMTokenSecretManagerState(
AMRMTokenSecretManagerState.newInstance(amrmTokenSecretManagerState);
byte[] stateData = data.getProto().toByteArray();
safeSetData(amrmTokenSecretManagerRoot, stateData, -1);
zkManager.safeSetData(amrmTokenSecretManagerRoot, stateData, -1, zkAcl,
fencingNodePath);
}
@Override
@ -1093,12 +1103,12 @@ protected synchronized void removeReservationState(String planName,
+ " for" + " plan " + planName);
}
safeDelete(reservationPath);
zkManager.safeDelete(reservationPath, zkAcl, fencingNodePath);
List<String> reservationNodes = getChildren(planNodePath);
if (reservationNodes.isEmpty()) {
safeDelete(planNodePath);
zkManager.safeDelete(planNodePath, zkAcl, fencingNodePath);
}
}
@ -1106,7 +1116,7 @@ protected synchronized void removeReservationState(String planName,
protected synchronized void storeReservationState(
ReservationAllocationStateProto reservationAllocation, String planName,
String reservationIdName) throws Exception {
SafeTransaction trx = new SafeTransaction();
SafeTransaction trx = zkManager.createTransaction(zkAcl, fencingNodePath);
addOrUpdateReservationState(reservationAllocation, planName,
reservationIdName, trx, false);
trx.commit();
@ -1192,7 +1202,8 @@ private String getLeafAppIdNodePath(String appId, String rootNode,
getNodePath(rootNode, nodeName.substring(0, splitIdx));
if (createParentIfNotExists && !exists(rootNodePath)) {
try {
safeCreate(rootNodePath, null, zkAcl, CreateMode.PERSISTENT);
zkManager.safeCreate(rootNodePath, null, zkAcl, CreateMode.PERSISTENT,
zkAcl, fencingNodePath);
} catch (KeeperException.NodeExistsException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Unable to create app parent node " + rootNodePath +
@ -1249,76 +1260,6 @@ void delete(final String path) throws Exception {
zkManager.delete(path);
}
private void safeCreate(String path, byte[] data, List<ACL> acl,
CreateMode mode) throws Exception {
if (!exists(path)) {
SafeTransaction transaction = new SafeTransaction();
transaction.create(path, data, acl, mode);
transaction.commit();
}
}
/**
* Deletes the path. Checks for existence of path as well.
* @param path Path to be deleted.
* @throws Exception if any problem occurs while performing deletion.
*/
private void safeDelete(final String path) throws Exception {
if (exists(path)) {
SafeTransaction transaction = new SafeTransaction();
transaction.delete(path);
transaction.commit();
}
}
private void safeSetData(String path, byte[] data, int version)
throws Exception {
SafeTransaction transaction = new SafeTransaction();
transaction.setData(path, data, version);
transaction.commit();
}
/**
* Use curator transactions to ensure zk-operations are performed in an all
* or nothing fashion. This is equivalent to using ZooKeeper#multi.
*
* TODO (YARN-3774): Curator 3.0 introduces CuratorOp similar to Op. We ll
* have to rewrite this inner class when we adopt that.
*/
private class SafeTransaction {
private CuratorTransactionFinal transactionFinal;
SafeTransaction() throws Exception {
CuratorFramework curatorFramework = zkManager.getCurator();
CuratorTransaction transaction = curatorFramework.inTransaction();
transactionFinal = transaction.create()
.withMode(CreateMode.PERSISTENT).withACL(zkAcl)
.forPath(fencingNodePath, new byte[0]).and();
}
public void commit() throws Exception {
transactionFinal = transactionFinal.delete()
.forPath(fencingNodePath).and();
transactionFinal.commit();
}
public void create(String path, byte[] data, List<ACL> acl, CreateMode mode)
throws Exception {
transactionFinal = transactionFinal.create()
.withMode(mode).withACL(acl).forPath(path, data).and();
}
public void delete(String path) throws Exception {
transactionFinal = transactionFinal.delete().forPath(path).and();
}
public void setData(String path, byte[] data, int version)
throws Exception {
transactionFinal = transactionFinal.setData()
.withVersion(version).forPath(path, data).and();
}
}
/**
* Helper class that periodically attempts creating a znode to ensure that
* this RM continues to be the Active.
@ -1333,7 +1274,7 @@ public void run() {
try {
while (!isFencedState()) {
// Create and delete fencing node
new SafeTransaction().commit();
zkManager.createTransaction(zkAcl, fencingNodePath).commit();
Thread.sleep(zkSessionTimeout);
}
} catch (InterruptedException ie) {