diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ZKConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ZKConfigurationStore.java index 934f70b2166..61d27cc294f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ZKConfigurationStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ZKConfigurationStore.java @@ -56,8 +56,6 @@ public class ZKConfigurationStore extends YarnConfigurationStore { private Configuration conf; private LogMutation pendingMutation; - private String znodeParentPath; - private static final String ZK_VERSION_PATH = "VERSION"; private static final String LOGS_PATH = "LOGS"; private static final String CONF_STORE_PATH = "CONF_STORE"; @@ -70,19 +68,20 @@ public class ZKConfigurationStore extends YarnConfigurationStore { private String fencingNodePath; private String confVersionPath; - @VisibleForTesting - protected ZKCuratorManager zkManager; + private ZKCuratorManager zkManager; private List zkAcl; @Override public void initialize(Configuration config, Configuration schedConf, RMContext rmContext) throws Exception { this.conf = config; + + String znodeParentPath = conf.get( + YarnConfiguration.RM_SCHEDCONF_STORE_ZK_PARENT_PATH, + YarnConfiguration.DEFAULT_RM_SCHEDCONF_STORE_ZK_PARENT_PATH); + this.maxLogs = conf.getLong(YarnConfiguration.RM_SCHEDCONF_MAX_LOGS, YarnConfiguration.DEFAULT_RM_SCHEDCONF_ZK_MAX_LOGS); - this.znodeParentPath = - conf.get(YarnConfiguration.RM_SCHEDCONF_STORE_ZK_PARENT_PATH, - YarnConfiguration.DEFAULT_RM_SCHEDCONF_STORE_ZK_PARENT_PATH); this.zkManager = rmContext.getResourceManager().createAndStartZKManager(conf); this.zkAcl = ZKCuratorManager.getZKAcls(conf); @@ -96,37 +95,31 @@ public class ZKConfigurationStore extends YarnConfigurationStore { zkManager.createRootDirRecursively(znodeParentPath, zkAcl); zkManager.delete(fencingNodePath); - if (!zkManager.exists(logsPath)) { - zkManager.create(logsPath); - zkManager.setData(logsPath, - serializeObject(new LinkedList()), -1); + if (createNewZkPath(logsPath)) { + setZkData(logsPath, new LinkedList()); } - if (!zkManager.exists(confVersionPath)) { - zkManager.create(confVersionPath); - zkManager.setData(confVersionPath, String.valueOf(0), -1); + if (createNewZkPath(confVersionPath)) { + setZkData(confVersionPath, String.valueOf(0)); } - if (!zkManager.exists(confStorePath)) { - zkManager.create(confStorePath); + if (createNewZkPath(confStorePath)) { HashMap mapSchedConf = new HashMap<>(); for (Map.Entry entry : schedConf) { mapSchedConf.put(entry.getKey(), entry.getValue()); } - zkManager.setData(confStorePath, serializeObject(mapSchedConf), -1); + setZkData(confStorePath, mapSchedConf); long configVersion = getConfigVersion() + 1L; - zkManager.setData(confVersionPath, String.valueOf(configVersion), -1); + setZkData(confVersionPath, String.valueOf(configVersion)); } } @VisibleForTesting @Override protected LinkedList getLogs() throws Exception { - return (LinkedList) - deserializeObject(zkManager.getData(logsPath)); + return unsafeCast(deserializeObject(getZkData(logsPath))); } - // TODO: following version-related code is taken from ZKRMStateStore @Override public Version getCurrentVersion() { return CURRENT_VERSION_INFO; @@ -135,7 +128,7 @@ public class ZKConfigurationStore extends YarnConfigurationStore { @Override public Version getConfStoreVersion() throws Exception { if (zkManager.exists(zkVersionPath)) { - byte[] data = zkManager.getData(zkVersionPath); + byte[] data = getZkData(zkVersionPath); return new VersionPBImpl(YarnServerCommonProtos.VersionProto .parseFrom(data)); } @@ -154,26 +147,24 @@ public class ZKConfigurationStore extends YarnConfigurationStore { ((VersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray(); if (zkManager.exists(zkVersionPath)) { - zkManager.safeSetData(zkVersionPath, data, -1, zkAcl, fencingNodePath); + safeSetZkData(zkVersionPath, data); } else { - zkManager.safeCreate(zkVersionPath, data, zkAcl, CreateMode.PERSISTENT, - zkAcl, fencingNodePath); + safeCreateZkData(zkVersionPath, data); } } @Override public void logMutation(LogMutation logMutation) throws Exception { - byte[] storedLogs = zkManager.getData(logsPath); + byte[] storedLogs = getZkData(logsPath); LinkedList logs = new LinkedList<>(); if (storedLogs != null) { - logs = (LinkedList) deserializeObject(storedLogs); + logs = unsafeCast(deserializeObject(storedLogs)); } logs.add(logMutation); if (logs.size() > maxLogs) { logs.remove(logs.removeFirst()); } - zkManager.safeSetData(logsPath, serializeObject(logs), -1, zkAcl, - fencingNodePath); + safeSetZkData(logsPath, logs); pendingMutation = logMutation; } @@ -194,10 +185,9 @@ public class ZKConfigurationStore extends YarnConfigurationStore { mapConf.put(confChange.getKey(), confChange.getValue()); } } - zkManager.safeSetData(confStorePath, serializeObject(mapConf), -1, - zkAcl, fencingNodePath); + safeSetZkData(confStorePath, mapConf); long configVersion = getConfigVersion() + 1L; - zkManager.setData(confVersionPath, String.valueOf(configVersion), -1); + setZkData(confVersionPath, String.valueOf(configVersion)); } pendingMutation = null; @@ -207,14 +197,14 @@ public class ZKConfigurationStore extends YarnConfigurationStore { public synchronized Configuration retrieve() { byte[] serializedSchedConf; try { - serializedSchedConf = zkManager.getData(confStorePath); + serializedSchedConf = getZkData(confStorePath); } catch (Exception e) { LOG.error("Failed to retrieve configuration from zookeeper store", e); return null; } try { Map map = - (HashMap) deserializeObject(serializedSchedConf); + unsafeCast(deserializeObject(serializedSchedConf)); Configuration c = new Configuration(false); for (Map.Entry e : map.entrySet()) { c.set(e.getKey(), e.getValue()); @@ -229,7 +219,14 @@ public class ZKConfigurationStore extends YarnConfigurationStore { @Override public long getConfigVersion() throws Exception { - return Long.parseLong(zkManager.getStringData(confVersionPath)); + String version = zkManager.getStringData(confVersionPath); + if (version == null) { + throw new IllegalStateException("Config version can not be properly " + + "serialized. Check Zookeeper config version path to locate " + + "the error!"); + } + + return Long.parseLong(version); } @Override @@ -237,6 +234,55 @@ public class ZKConfigurationStore extends YarnConfigurationStore { return null; // unimplemented } + /** + * Creates a new path in Zookeeper only, if it does not already exist. + * + * @param path Value of the Zookeeper path + * @return trueif the creation executed; false + * otherwise. + * @throws Exception + */ + private boolean createNewZkPath(String path) throws Exception { + if (!zkManager.exists(path)) { + zkManager.create(path); + return true; + } else { + return false; + } + } + + @VisibleForTesting + protected byte[] getZkData(String path) throws Exception { + return zkManager.getData(path); + } + + @VisibleForTesting + protected void setZkData(String path, byte[] data) throws Exception { + zkManager.setData(path, data, -1); + } + + private void setZkData(String path, Object data) throws Exception { + setZkData(path, serializeObject(data)); + } + + private void setZkData(String path, String data) throws Exception { + zkManager.setData(path, data, -1); + } + + private void safeSetZkData(String path, byte[] data) throws Exception { + zkManager.safeSetData(path, data, -1, zkAcl, fencingNodePath); + } + + private void safeSetZkData(String path, Object data) throws Exception { + safeSetZkData(path, serializeObject(data)); + } + + @VisibleForTesting + protected void safeCreateZkData(String path, byte[] data) throws Exception { + zkManager.safeCreate(path, data, zkAcl, CreateMode.PERSISTENT, + zkAcl, fencingNodePath); + } + private static String getNodePath(String root, String nodeName) { return ZKCuratorManager.getNodePath(root, nodeName); } @@ -257,4 +303,18 @@ public class ZKConfigurationStore extends YarnConfigurationStore { return ois.readObject(); } } + + /** + * Casts an object of type Object to type T. It is essential to emphasize, + * that it is an unsafe operation. + * + * @param o Object to be cast from + * @param Type to cast to + * @return casted object of type T + * @throws ClassCastException + */ + @SuppressWarnings("unchecked") + private static T unsafeCast(Object o) throws ClassCastException { + return (T)o; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestZKConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestZKConfigurationStore.java index e50f84e393d..40454a4621b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestZKConfigurationStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestZKConfigurationStore.java @@ -40,8 +40,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfigurat import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.webapp.dao.QueueConfigInfo; import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.data.ACL; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -49,9 +47,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.io.ByteArrayOutputStream; +import java.io.ObjectOutputStream; import java.util.Arrays; import java.util.HashMap; -import java.util.List; import java.util.Map; import static org.junit.Assert.assertEquals; @@ -113,18 +112,11 @@ public class TestZKConfigurationStore extends confStore.initialize(conf, schedConf, rmContext); Version otherVersion = Version.newInstance(1, 1); - String znodeParentPath = conf.get(YarnConfiguration. - RM_SCHEDCONF_STORE_ZK_PARENT_PATH, - YarnConfiguration.DEFAULT_RM_SCHEDCONF_STORE_ZK_PARENT_PATH); - String zkVersionPath = ZKCuratorManager.getNodePath(znodeParentPath, - "VERSION"); - String fencingNodePath = ZKCuratorManager.getNodePath(znodeParentPath, - "FENCING"); + String zkVersionPath = getZkPath("VERSION"); byte[] versionData = ((VersionPBImpl) otherVersion).getProto().toByteArray(); - List zkAcl = ZKCuratorManager.getZKAcls(conf); - ((ZKConfigurationStore) confStore).zkManager.safeCreate(zkVersionPath, - versionData, zkAcl, CreateMode.PERSISTENT, zkAcl, fencingNodePath); + ((ZKConfigurationStore) confStore).safeCreateZkData(zkVersionPath, + versionData); assertEquals("The configuration store should have stored the new" + "version.", otherVersion, confStore.getConfStoreVersion()); @@ -140,20 +132,58 @@ public class TestZKConfigurationStore extends assertNull(confStore.retrieve()); } + @Test(expected = IllegalStateException.class) + public void testGetConfigurationVersionOnSerializedNullData() + throws Exception { + confStore.initialize(conf, schedConf, rmContext); + String confVersionPath = getZkPath("CONF_VERSION"); + ((ZKConfigurationStore) confStore).setZkData(confVersionPath, null); + confStore.getConfigVersion(); + } + + /** + * The correct behavior of logMutation should be, that even though an + * Exception is thrown during serialization, the log data must not be + * overridden. + * + * @throws Exception + */ + @Test(expected = ClassCastException.class) + public void testLogMutationAfterSerializationError() throws Exception { + byte[] data = null; + String logs = "NOT_LINKED_LIST"; + confStore.initialize(conf, schedConf, rmContext); + + try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(baos)) { + oos.writeObject(logs); + oos.flush(); + baos.flush(); + data = baos.toByteArray(); + } + + String logsPath = getZkPath("LOGS"); + ((ZKConfigurationStore)confStore).setZkData(logsPath, data); + + Map update = new HashMap<>(); + update.put("valid_key", "valid_value"); + + confStore.logMutation(new YarnConfigurationStore.LogMutation(update, TEST_USER)); + + assertEquals(data, ((ZKConfigurationStore)confStore).getZkData(logsPath)); + } + @Test public void testDisableAuditLogs() throws Exception { conf.setLong(YarnConfiguration.RM_SCHEDCONF_MAX_LOGS, 0); confStore.initialize(conf, schedConf, rmContext); - String znodeParentPath = conf.get(YarnConfiguration. - RM_SCHEDCONF_STORE_ZK_PARENT_PATH, - YarnConfiguration.DEFAULT_RM_SCHEDCONF_STORE_ZK_PARENT_PATH); - String logsPath = ZKCuratorManager.getNodePath(znodeParentPath, "LOGS"); + String logsPath = getZkPath("LOGS"); byte[] data = null; - ((ZKConfigurationStore) confStore).zkManager.setData(logsPath, data, -1); + ((ZKConfigurationStore) confStore).setZkData(logsPath, data); prepareLogMutation("key1", "val1"); - data = ((ZKConfigurationStore) confStore).zkManager.getData(logsPath); + data = ((ZKConfigurationStore) confStore).getZkData(logsPath); assertNull("Failed to Disable Audit Logs", data); } @@ -373,6 +403,13 @@ public class TestZKConfigurationStore extends return new ZKConfigurationStore(); } + private String getZkPath(String nodeName) { + String znodeParentPath = conf.get(YarnConfiguration. + RM_SCHEDCONF_STORE_ZK_PARENT_PATH, + YarnConfiguration.DEFAULT_RM_SCHEDCONF_STORE_ZK_PARENT_PATH); + return ZKCuratorManager.getNodePath(znodeParentPath, nodeName); + } + @Override Version getVersion() { return ZKConfigurationStore.CURRENT_VERSION_INFO;