From b0803388fc5ec03b774aa003f52232deb8db6f69 Mon Sep 17 00:00:00 2001 From: Kai Zheng Date: Fri, 21 Apr 2017 13:33:33 +0800 Subject: [PATCH] HDFS-11604. Define and parse erasure code policies. Contributed by Lin Zeng --- .../hadoop/hdfs/util/ECPolicyLoader.java | 328 ++++++++++++++++++ .../hadoop/hdfs/util/TestECPolicyLoader.java | 313 +++++++++++++++++ .../main/conf/user_ec_policies.xml.template | 71 ++++ .../src/site/markdown/HDFSErasureCoding.md | 10 + 4 files changed, 722 insertions(+) create mode 100644 hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/ECPolicyLoader.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/util/TestECPolicyLoader.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/conf/user_ec_policies.xml.template diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/ECPolicyLoader.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/ECPolicyLoader.java new file mode 100644 index 00000000000..e75f0917858 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/ECPolicyLoader.java @@ -0,0 +1,328 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.util; + +import org.apache.hadoop.io.erasurecode.ECSchema; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; +import org.w3c.dom.Node; +import org.w3c.dom.Text; +import org.w3c.dom.Element; +import org.w3c.dom.Document; +import org.w3c.dom.NodeList; +import org.xml.sax.SAXException; + +import javax.xml.parsers.DocumentBuilder; +import javax.xml.parsers.DocumentBuilderFactory; +import javax.xml.parsers.ParserConfigurationException; +import java.io.File; +import java.io.IOException; +import java.net.URL; +import java.util.Map; +import java.util.List; +import java.util.HashMap; +import java.util.ArrayList; +import java.util.Collections; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A EC policy loading tool that loads user defined EC policies from XML file. + */ +@InterfaceAudience.Private +public class ECPolicyLoader { + + private static final Logger LOG + = LoggerFactory.getLogger(ECPolicyLoader.class); + + private static final int LAYOUT_VERSION = 1; + + /** + * Load user defined EC policies from a XML configuration file. + * @param policyFilePath path of EC policy file + * @return all valid EC policies in EC policy file + */ + public List loadPolicy(String policyFilePath) { + File policyFile = getPolicyFile(policyFilePath); + if (policyFile == null) { + LOG.warn("Not found any EC policy file"); + return Collections.emptyList(); + } + + try { + return loadECPolicies(policyFile); + } catch (ParserConfigurationException | IOException | SAXException e) { + throw new RuntimeException("Failed to load EC policy file: " + + policyFile); + } + } + + /** + * Load EC policies from a XML configuration file. + * @param policyFile EC policy file + * @return list of EC policies + * @throws ParserConfigurationException if ParserConfigurationException happen + * @throws IOException if no such EC policy file + * @throws SAXException if the xml file has some invalid elements + */ + private List loadECPolicies(File policyFile) + throws ParserConfigurationException, IOException, SAXException { + + LOG.info("Loading EC policy file " + policyFile); + + // Read and parse the EC policy file. + DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance(); + dbf.setIgnoringComments(true); + DocumentBuilder builder = dbf.newDocumentBuilder(); + Document doc = builder.parse(policyFile); + Element root = doc.getDocumentElement(); + + if (!"configuration".equals(root.getTagName())) { + throw new RuntimeException("Bad EC policy configuration file: " + + "top-level element not "); + } + + List policies; + if (root.getElementsByTagName("layoutversion").getLength() > 0) { + if (loadLayoutVersion(root) == LAYOUT_VERSION) { + if (root.getElementsByTagName("schemas").getLength() > 0) { + Map schemas = loadSchemas(root); + if (root.getElementsByTagName("policies").getLength() > 0) { + policies = loadPolicies(root, schemas); + } else { + throw new RuntimeException("Bad EC policy configuration file: " + + "no element"); + } + } else { + throw new RuntimeException("Bad EC policy configuration file: " + + "no element"); + } + } else { + throw new RuntimeException("The parse failed because of " + + "bad layoutversion value"); + } + } else { + throw new RuntimeException("Bad EC policy configuration file: " + + "no element"); + } + + return policies; + } + + /** + * Load layoutVersion from root element in the XML configuration file. + * @param root root element + * @return layout version + */ + private int loadLayoutVersion(Element root) { + int layoutVersion; + Text text = (Text) root.getElementsByTagName("layoutversion") + .item(0).getFirstChild(); + if (text != null) { + String value = text.getData().trim(); + try { + layoutVersion = Integer.parseInt(value); + } catch (NumberFormatException e) { + throw new IllegalArgumentException("Bad layoutVersion value " + + value + " is found. It should be an integer"); + } + } else { + throw new IllegalArgumentException("Value of is null"); + } + + return layoutVersion; + } + + /** + * Load schemas from root element in the XML configuration file. + * @param root root element + * @return EC schema map + */ + private Map loadSchemas(Element root) { + NodeList elements = root.getElementsByTagName("schemas") + .item(0).getChildNodes(); + Map schemas = new HashMap(); + for (int i = 0; i < elements.getLength(); i++) { + Node node = elements.item(i); + if (node instanceof Element) { + Element element = (Element) node; + if ("schema".equals(element.getTagName())) { + String schemaId = element.getAttribute("id"); + ECSchema schema = loadSchema(element); + if (!schemas.containsValue(schema)) { + schemas.put(schemaId, schema); + } else { + throw new RuntimeException("Repetitive schemas in EC policy" + + " configuration file: " + schemaId); + } + } else { + throw new RuntimeException("Bad element in EC policy" + + " configuration file: " + element.getTagName()); + } + } + } + + return schemas; + } + + /** + * Load EC policies from root element in the XML configuration file. + * @param root root element + * @param schemas schema map + * @return EC policy list + */ + private List loadPolicies( + Element root, Map schemas) { + NodeList elements = root.getElementsByTagName("policies") + .item(0).getChildNodes(); + List policies = new ArrayList(); + for (int i = 0; i < elements.getLength(); i++) { + Node node = elements.item(i); + if (node instanceof Element) { + Element element = (Element) node; + if ("policy".equals(element.getTagName())) { + ErasureCodingPolicy policy = loadPolicy(element, schemas); + if (!policies.contains(policy)) { + policies.add(policy); + } else { + LOG.warn("Repetitive policies in EC policy configuration file: " + + policy.toString()); + } + } else { + throw new RuntimeException("Bad element in EC policy configuration" + + " file: " + element.getTagName()); + } + } + } + + return policies; + } + + /** + * Path to the XML file containing user defined EC policies. If the path is + * relative, it is searched for in the classpath. + * @param policyFilePath path of EC policy file + * @return EC policy file + */ + private File getPolicyFile(String policyFilePath) { + File policyFile = new File(policyFilePath); + if (!policyFile.isAbsolute()) { + URL url = Thread.currentThread().getContextClassLoader() + .getResource(policyFilePath); + if (url == null) { + LOG.warn(policyFilePath + " not found on the classpath."); + policyFile = null; + } else if (!url.getProtocol().equalsIgnoreCase("file")) { + throw new RuntimeException( + "EC policy file " + url + + " found on the classpath is not on the local filesystem."); + } else { + policyFile = new File(url.getPath()); + } + } + + return policyFile; + } + + /** + * Load a schema from a schema element in the XML configuration file. + * @param element EC schema element + * @return ECSchema + */ + private ECSchema loadSchema(Element element) { + Map schemaOptions = new HashMap(); + NodeList fields = element.getChildNodes(); + + for (int i = 0; i < fields.getLength(); i++) { + Node fieldNode = fields.item(i); + if (fieldNode instanceof Element) { + Element field = (Element) fieldNode; + String tagName = field.getTagName(); + if ("k".equals(tagName)) { + tagName = "numDataUnits"; + } else if ("m".equals(tagName)) { + tagName = "numParityUnits"; + } + + // Get the nonnull text value. + Text text = (Text) field.getFirstChild(); + if (text != null) { + String value = text.getData().trim(); + schemaOptions.put(tagName, value); + } else { + throw new IllegalArgumentException("Value of <" + tagName + + "> is null"); + } + } + } + + return new ECSchema(schemaOptions); + } + + /** + * Load a EC policy from a policy element in the XML configuration file. + * @param element EC policy element + * @param schemas all valid schemas of the EC policy file + * @return EC policy + */ + private ErasureCodingPolicy loadPolicy(Element element, + Map schemas) { + NodeList fields = element.getChildNodes(); + ECSchema schema = null; + int cellSize = 0; + + for (int i = 0; i < fields.getLength(); i++) { + Node fieldNode = fields.item(i); + if (fieldNode instanceof Element) { + Element field = (Element) fieldNode; + String tagName = field.getTagName(); + + // Get the nonnull text value. + Text text = (Text) field.getFirstChild(); + if (text != null) { + if (!text.isElementContentWhitespace()) { + String value = text.getData().trim(); + if ("schema".equals(tagName)) { + schema = schemas.get(value); + } else if ("cellsize".equals(tagName)) { + try { + cellSize = Integer.parseInt(value); + } catch (NumberFormatException e) { + throw new IllegalArgumentException("Bad EC policy cellsize" + + " value " + value + " is found. It should be an integer"); + } + } else { + LOG.warn("Invalid tagName: " + tagName); + } + } + } else { + throw new IllegalArgumentException("Value of <" + tagName + + "> is null"); + } + } + } + + if (schema != null && cellSize > 0) { + return new ErasureCodingPolicy(schema, cellSize, (byte) -1); + } else { + throw new RuntimeException("Bad policy is found in" + + " EC policy configuration file"); + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/util/TestECPolicyLoader.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/util/TestECPolicyLoader.java new file mode 100644 index 00000000000..a6adb97dac6 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/util/TestECPolicyLoader.java @@ -0,0 +1,313 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.util; + +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; +import org.apache.hadoop.io.erasurecode.ECSchema; +import org.junit.Test; + +import java.io.File; +import java.io.FileWriter; +import java.io.PrintWriter; +import java.util.List; + +import static org.junit.Assert.fail; +import static org.junit.Assert.assertEquals; +import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains; + +/** + * Test load EC policy file. + */ +public class TestECPolicyLoader { + + private final static String TEST_DIR = new File(System.getProperty( + "test.build.data", "/tmp")).getAbsolutePath(); + + private final static String POLICY_FILE = new File(TEST_DIR, "test-ecpolicy") + .getAbsolutePath(); + + /** + * Test load EC policy. + */ + @Test + public void testLoadECPolicy() throws Exception { + PrintWriter out = new PrintWriter(new FileWriter(POLICY_FILE)); + out.println(""); + out.println(""); + out.println("1"); + out.println(""); + out.println(" "); + out.println(" RS"); + out.println(" 12"); + out.println(" 4"); + out.println(" "); + out.println(" "); + out.println(" RS-legacy"); + out.println(" 12"); + out.println(" 4"); + out.println(" "); + out.println(""); + out.println(""); + out.println(" "); + out.println(" RSk12m4"); + out.println(" 131072"); + out.println(" "); + out.println(" "); + out.println(" RS-legacyk12m4"); + out.println(" 262144"); + out.println(" "); + out.println(""); + out.println(""); + out.close(); + + ECPolicyLoader ecPolicyLoader = new ECPolicyLoader(); + List policies + = ecPolicyLoader.loadPolicy(POLICY_FILE); + + assertEquals(2, policies.size()); + + ErasureCodingPolicy policy1 = policies.get(0); + ECSchema schema1 = policy1.getSchema(); + assertEquals(131072, policy1.getCellSize()); + assertEquals(0, schema1.getExtraOptions().size()); + assertEquals(12, schema1.getNumDataUnits()); + assertEquals(4, schema1.getNumParityUnits()); + assertEquals("RS", schema1.getCodecName()); + + ErasureCodingPolicy policy2 = policies.get(1); + ECSchema schema2 = policy2.getSchema(); + assertEquals(262144, policy2.getCellSize()); + assertEquals(0, schema2.getExtraOptions().size()); + assertEquals(12, schema2.getNumDataUnits()); + assertEquals(4, schema2.getNumParityUnits()); + assertEquals("RS-legacy", schema2.getCodecName()); + } + + /** + * Test load null EC schema option. + */ + @Test + public void testNullECSchemaOptionValue() throws Exception { + PrintWriter out = new PrintWriter(new FileWriter(POLICY_FILE)); + out.println(""); + out.println(""); + out.println("1"); + out.println(""); + out.println(" "); + out.println(" RS"); + out.println(" 12"); + out.println(" 4"); + out.println(" "); + out.println(" "); + out.println(" RS-legacy"); + out.println(" 12"); + out.println(" 4"); + out.println(" "); + out.println(" "); + out.println(""); + out.println(""); + out.println(" "); + out.println(" RS-legacyk12m4"); + out.println(" 1024"); + out.println(" "); + out.println(" "); + out.println(" RSk12m4"); + out.println(" 20480"); + out.println(" "); + out.println(""); + out.println(""); + out.close(); + + ECPolicyLoader ecPolicyLoader = new ECPolicyLoader(); + + try { + ecPolicyLoader.loadPolicy(POLICY_FILE); + fail("IllegalArgumentException should be thrown for null value"); + } catch (IllegalArgumentException e) { + assertExceptionContains("Value of