HDFS-11604. Define and parse erasure code policies. Contributed by Lin Zeng

This commit is contained in:
Kai Zheng 2017-04-21 13:33:33 +08:00
parent de69d6e811
commit b0803388fc
4 changed files with 722 additions and 0 deletions

View File

@ -0,0 +1,328 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.util;
import org.apache.hadoop.io.erasurecode.ECSchema;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.w3c.dom.Node;
import org.w3c.dom.Text;
import org.w3c.dom.Element;
import org.w3c.dom.Document;
import org.w3c.dom.NodeList;
import org.xml.sax.SAXException;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.parsers.ParserConfigurationException;
import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.util.Map;
import java.util.List;
import java.util.HashMap;
import java.util.ArrayList;
import java.util.Collections;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A EC policy loading tool that loads user defined EC policies from XML file.
*/
@InterfaceAudience.Private
public class ECPolicyLoader {
private static final Logger LOG
= LoggerFactory.getLogger(ECPolicyLoader.class);
private static final int LAYOUT_VERSION = 1;
/**
* Load user defined EC policies from a XML configuration file.
* @param policyFilePath path of EC policy file
* @return all valid EC policies in EC policy file
*/
public List<ErasureCodingPolicy> loadPolicy(String policyFilePath) {
File policyFile = getPolicyFile(policyFilePath);
if (policyFile == null) {
LOG.warn("Not found any EC policy file");
return Collections.emptyList();
}
try {
return loadECPolicies(policyFile);
} catch (ParserConfigurationException | IOException | SAXException e) {
throw new RuntimeException("Failed to load EC policy file: "
+ policyFile);
}
}
/**
* Load EC policies from a XML configuration file.
* @param policyFile EC policy file
* @return list of EC policies
* @throws ParserConfigurationException if ParserConfigurationException happen
* @throws IOException if no such EC policy file
* @throws SAXException if the xml file has some invalid elements
*/
private List<ErasureCodingPolicy> loadECPolicies(File policyFile)
throws ParserConfigurationException, IOException, SAXException {
LOG.info("Loading EC policy file " + policyFile);
// Read and parse the EC policy file.
DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
dbf.setIgnoringComments(true);
DocumentBuilder builder = dbf.newDocumentBuilder();
Document doc = builder.parse(policyFile);
Element root = doc.getDocumentElement();
if (!"configuration".equals(root.getTagName())) {
throw new RuntimeException("Bad EC policy configuration file: "
+ "top-level element not <configuration>");
}
List<ErasureCodingPolicy> policies;
if (root.getElementsByTagName("layoutversion").getLength() > 0) {
if (loadLayoutVersion(root) == LAYOUT_VERSION) {
if (root.getElementsByTagName("schemas").getLength() > 0) {
Map<String, ECSchema> schemas = loadSchemas(root);
if (root.getElementsByTagName("policies").getLength() > 0) {
policies = loadPolicies(root, schemas);
} else {
throw new RuntimeException("Bad EC policy configuration file: "
+ "no <policies> element");
}
} else {
throw new RuntimeException("Bad EC policy configuration file: "
+ "no <schemas> element");
}
} else {
throw new RuntimeException("The parse failed because of "
+ "bad layoutversion value");
}
} else {
throw new RuntimeException("Bad EC policy configuration file: "
+ "no <layoutVersion> element");
}
return policies;
}
/**
* Load layoutVersion from root element in the XML configuration file.
* @param root root element
* @return layout version
*/
private int loadLayoutVersion(Element root) {
int layoutVersion;
Text text = (Text) root.getElementsByTagName("layoutversion")
.item(0).getFirstChild();
if (text != null) {
String value = text.getData().trim();
try {
layoutVersion = Integer.parseInt(value);
} catch (NumberFormatException e) {
throw new IllegalArgumentException("Bad layoutVersion value "
+ value + " is found. It should be an integer");
}
} else {
throw new IllegalArgumentException("Value of <layoutVersion> is null");
}
return layoutVersion;
}
/**
* Load schemas from root element in the XML configuration file.
* @param root root element
* @return EC schema map
*/
private Map<String, ECSchema> loadSchemas(Element root) {
NodeList elements = root.getElementsByTagName("schemas")
.item(0).getChildNodes();
Map<String, ECSchema> schemas = new HashMap<String, ECSchema>();
for (int i = 0; i < elements.getLength(); i++) {
Node node = elements.item(i);
if (node instanceof Element) {
Element element = (Element) node;
if ("schema".equals(element.getTagName())) {
String schemaId = element.getAttribute("id");
ECSchema schema = loadSchema(element);
if (!schemas.containsValue(schema)) {
schemas.put(schemaId, schema);
} else {
throw new RuntimeException("Repetitive schemas in EC policy"
+ " configuration file: " + schemaId);
}
} else {
throw new RuntimeException("Bad element in EC policy"
+ " configuration file: " + element.getTagName());
}
}
}
return schemas;
}
/**
* Load EC policies from root element in the XML configuration file.
* @param root root element
* @param schemas schema map
* @return EC policy list
*/
private List<ErasureCodingPolicy> loadPolicies(
Element root, Map<String, ECSchema> schemas) {
NodeList elements = root.getElementsByTagName("policies")
.item(0).getChildNodes();
List<ErasureCodingPolicy> policies = new ArrayList<ErasureCodingPolicy>();
for (int i = 0; i < elements.getLength(); i++) {
Node node = elements.item(i);
if (node instanceof Element) {
Element element = (Element) node;
if ("policy".equals(element.getTagName())) {
ErasureCodingPolicy policy = loadPolicy(element, schemas);
if (!policies.contains(policy)) {
policies.add(policy);
} else {
LOG.warn("Repetitive policies in EC policy configuration file: "
+ policy.toString());
}
} else {
throw new RuntimeException("Bad element in EC policy configuration"
+ " file: " + element.getTagName());
}
}
}
return policies;
}
/**
* Path to the XML file containing user defined EC policies. If the path is
* relative, it is searched for in the classpath.
* @param policyFilePath path of EC policy file
* @return EC policy file
*/
private File getPolicyFile(String policyFilePath) {
File policyFile = new File(policyFilePath);
if (!policyFile.isAbsolute()) {
URL url = Thread.currentThread().getContextClassLoader()
.getResource(policyFilePath);
if (url == null) {
LOG.warn(policyFilePath + " not found on the classpath.");
policyFile = null;
} else if (!url.getProtocol().equalsIgnoreCase("file")) {
throw new RuntimeException(
"EC policy file " + url
+ " found on the classpath is not on the local filesystem.");
} else {
policyFile = new File(url.getPath());
}
}
return policyFile;
}
/**
* Load a schema from a schema element in the XML configuration file.
* @param element EC schema element
* @return ECSchema
*/
private ECSchema loadSchema(Element element) {
Map<String, String> schemaOptions = new HashMap<String, String>();
NodeList fields = element.getChildNodes();
for (int i = 0; i < fields.getLength(); i++) {
Node fieldNode = fields.item(i);
if (fieldNode instanceof Element) {
Element field = (Element) fieldNode;
String tagName = field.getTagName();
if ("k".equals(tagName)) {
tagName = "numDataUnits";
} else if ("m".equals(tagName)) {
tagName = "numParityUnits";
}
// Get the nonnull text value.
Text text = (Text) field.getFirstChild();
if (text != null) {
String value = text.getData().trim();
schemaOptions.put(tagName, value);
} else {
throw new IllegalArgumentException("Value of <" + tagName
+ "> is null");
}
}
}
return new ECSchema(schemaOptions);
}
/**
* Load a EC policy from a policy element in the XML configuration file.
* @param element EC policy element
* @param schemas all valid schemas of the EC policy file
* @return EC policy
*/
private ErasureCodingPolicy loadPolicy(Element element,
Map<String, ECSchema> schemas) {
NodeList fields = element.getChildNodes();
ECSchema schema = null;
int cellSize = 0;
for (int i = 0; i < fields.getLength(); i++) {
Node fieldNode = fields.item(i);
if (fieldNode instanceof Element) {
Element field = (Element) fieldNode;
String tagName = field.getTagName();
// Get the nonnull text value.
Text text = (Text) field.getFirstChild();
if (text != null) {
if (!text.isElementContentWhitespace()) {
String value = text.getData().trim();
if ("schema".equals(tagName)) {
schema = schemas.get(value);
} else if ("cellsize".equals(tagName)) {
try {
cellSize = Integer.parseInt(value);
} catch (NumberFormatException e) {
throw new IllegalArgumentException("Bad EC policy cellsize"
+ " value " + value + " is found. It should be an integer");
}
} else {
LOG.warn("Invalid tagName: " + tagName);
}
}
} else {
throw new IllegalArgumentException("Value of <" + tagName
+ "> is null");
}
}
}
if (schema != null && cellSize > 0) {
return new ErasureCodingPolicy(schema, cellSize, (byte) -1);
} else {
throw new RuntimeException("Bad policy is found in"
+ " EC policy configuration file");
}
}
}

View File

@ -0,0 +1,313 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.util;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.io.erasurecode.ECSchema;
import org.junit.Test;
import java.io.File;
import java.io.FileWriter;
import java.io.PrintWriter;
import java.util.List;
import static org.junit.Assert.fail;
import static org.junit.Assert.assertEquals;
import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains;
/**
* Test load EC policy file.
*/
public class TestECPolicyLoader {
private final static String TEST_DIR = new File(System.getProperty(
"test.build.data", "/tmp")).getAbsolutePath();
private final static String POLICY_FILE = new File(TEST_DIR, "test-ecpolicy")
.getAbsolutePath();
/**
* Test load EC policy.
*/
@Test
public void testLoadECPolicy() throws Exception {
PrintWriter out = new PrintWriter(new FileWriter(POLICY_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<configuration>");
out.println("<layoutversion>1</layoutversion>");
out.println("<schemas>");
out.println(" <schema id=\"RSk12m4\">");
out.println(" <codec>RS</codec>");
out.println(" <k>12</k>");
out.println(" <m>4</m>");
out.println(" </schema>");
out.println(" <schema id=\"RS-legacyk12m4\">");
out.println(" <codec>RS-legacy</codec>");
out.println(" <k>12</k>");
out.println(" <m>4</m>");
out.println(" </schema>");
out.println("</schemas>");
out.println("<policies>");
out.println(" <policy>");
out.println(" <schema>RSk12m4</schema>");
out.println(" <cellsize>131072</cellsize>");
out.println(" </policy>");
out.println(" <policy>");
out.println(" <schema>RS-legacyk12m4</schema>");
out.println(" <cellsize>262144</cellsize>");
out.println(" </policy>");
out.println("</policies>");
out.println("</configuration>");
out.close();
ECPolicyLoader ecPolicyLoader = new ECPolicyLoader();
List<ErasureCodingPolicy> policies
= ecPolicyLoader.loadPolicy(POLICY_FILE);
assertEquals(2, policies.size());
ErasureCodingPolicy policy1 = policies.get(0);
ECSchema schema1 = policy1.getSchema();
assertEquals(131072, policy1.getCellSize());
assertEquals(0, schema1.getExtraOptions().size());
assertEquals(12, schema1.getNumDataUnits());
assertEquals(4, schema1.getNumParityUnits());
assertEquals("RS", schema1.getCodecName());
ErasureCodingPolicy policy2 = policies.get(1);
ECSchema schema2 = policy2.getSchema();
assertEquals(262144, policy2.getCellSize());
assertEquals(0, schema2.getExtraOptions().size());
assertEquals(12, schema2.getNumDataUnits());
assertEquals(4, schema2.getNumParityUnits());
assertEquals("RS-legacy", schema2.getCodecName());
}
/**
* Test load null EC schema option.
*/
@Test
public void testNullECSchemaOptionValue() throws Exception {
PrintWriter out = new PrintWriter(new FileWriter(POLICY_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<configuration>");
out.println("<layoutversion>1</layoutversion>");
out.println("<schemas>");
out.println(" <schema id=\"RSk12m4\">");
out.println(" <codec>RS</codec>");
out.println(" <k>12</k>");
out.println(" <m>4</m>");
out.println(" </schema>");
out.println(" <schema id=\"RS-legacyk12m4\">");
out.println(" <codec>RS-legacy</codec>");
out.println(" <k>12</k>");
out.println(" <m>4</m>");
out.println(" <option></option>");
out.println(" </schema>");
out.println("</schemas>");
out.println("<policies>");
out.println(" <policy>");
out.println(" <schema>RS-legacyk12m4</schema>");
out.println(" <cellsize>1024</cellsize>");
out.println(" </policy>");
out.println(" <policy>");
out.println(" <schema>RSk12m4</schema>");
out.println(" <cellsize>20480</cellsize>");
out.println(" </policy>");
out.println("</policies>");
out.println("</configuration>");
out.close();
ECPolicyLoader ecPolicyLoader = new ECPolicyLoader();
try {
ecPolicyLoader.loadPolicy(POLICY_FILE);
fail("IllegalArgumentException should be thrown for null value");
} catch (IllegalArgumentException e) {
assertExceptionContains("Value of <option> is null", e);
}
}
/**
* Test load repetitive EC schema.
*/
@Test
public void testRepeatECSchema() throws Exception {
PrintWriter out = new PrintWriter(new FileWriter(POLICY_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<configuration>");
out.println("<layoutversion>1</layoutversion>");
out.println("<schemas>");
out.println(" <schema id=\"RSk12m4\">");
out.println(" <codec>RS-legacy</codec>");
out.println(" <k>12</k>");
out.println(" <m>4</m>");
out.println(" </schema>");
out.println(" <schema id=\"RS-legacyk12m4\">");
out.println(" <codec>RS-legacy</codec>");
out.println(" <k>12</k>");
out.println(" <m>4</m>");
out.println(" </schema>");
out.println("</schemas>");
out.println("<policies>");
out.println(" <policy>");
out.println(" <schema>RS-legacyk12m4</schema>");
out.println(" <cellsize>1024</cellsize>");
out.println(" </policy>");
out.println(" <policy>");
out.println(" <schema>RSk12m4</schema>");
out.println(" <cellsize>20480</cellsize>");
out.println(" </policy>");
out.println("</policies>");
out.println("</configuration>");
out.close();
ECPolicyLoader ecPolicyLoader = new ECPolicyLoader();
try {
ecPolicyLoader.loadPolicy(POLICY_FILE);
fail("RuntimeException should be thrown for repetitive elements");
} catch (RuntimeException e) {
assertExceptionContains("Repetitive schemas in EC policy"
+ " configuration file: RS-legacyk12m4", e);
}
}
/**
* Test load bad EC policy layoutversion.
*/
@Test
public void testBadECLayoutVersion() throws Exception {
PrintWriter out = new PrintWriter(new FileWriter(POLICY_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<configuration>");
out.println("<layoutversion>3</layoutversion>");
out.println("<schemas>");
out.println(" <schema id=\"RSk12m4\">");
out.println(" <codec>RS</codec>");
out.println(" <k>12</k>");
out.println(" <m>4</m>");
out.println(" </schema>");
out.println(" <schema id=\"RS-legacyk12m4\">");
out.println(" <codec>RS-legacy</codec>");
out.println(" <k>12</k>");
out.println(" <m>4</m>");
out.println(" </schema>");
out.println("</schemas>");
out.println("<policies>");
out.println(" <policy>");
out.println(" <schema>RSk12m4</schema>");
out.println(" <cellsize>1024</cellsize>");
out.println(" </policy>");
out.println("</policies>");
out.println("</configuration>");
out.close();
ECPolicyLoader ecPolicyLoader = new ECPolicyLoader();
try {
ecPolicyLoader.loadPolicy(POLICY_FILE);
fail("RuntimeException should be thrown for bad layoutversion");
} catch (RuntimeException e) {
assertExceptionContains("The parse failed because of "
+ "bad layoutversion value", e);
}
}
/**
* Test load bad EC policy cellsize.
*/
@Test
public void testBadECCellsize() throws Exception {
PrintWriter out = new PrintWriter(new FileWriter(POLICY_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<configuration>");
out.println("<layoutversion>1</layoutversion>");
out.println("<schemas>");
out.println(" <schema id=\"RSk12m4\">");
out.println(" <codec>RS</codec>");
out.println(" <k>12</k>");
out.println(" <m>4</m>");
out.println(" </schema>");
out.println(" <schema id=\"RS-legacyk12m4\">");
out.println(" <codec>RS-legacy</codec>");
out.println(" <k>12</k>");
out.println(" <m>4</m>");
out.println(" </schema>");
out.println("</schemas>");
out.println("<policies>");
out.println(" <policy>");
out.println(" <schema>RSk12m4</schema>");
out.println(" <cellsize>free</cellsize>");
out.println(" </policy>");
out.println("</policies>");
out.println("</configuration>");
out.close();
ECPolicyLoader ecPolicyLoader = new ECPolicyLoader();
try {
ecPolicyLoader.loadPolicy(POLICY_FILE);
fail("IllegalArgumentException should be thrown for bad policy");
} catch (IllegalArgumentException e) {
assertExceptionContains("Bad EC policy cellsize value free is found."
+ " It should be an integer", e);
}
}
/**
* Test load bad EC policy.
*/
@Test
public void testBadECPolicy() throws Exception {
PrintWriter out = new PrintWriter(new FileWriter(POLICY_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<configuration>");
out.println("<layoutversion>1</layoutversion>");
out.println("<schemas>");
out.println(" <schema id=\"RSk12m4\">");
out.println(" <codec>RS</codec>");
out.println(" <k>12</k>");
out.println(" <m>4</m>");
out.println(" </schema>");
out.println(" <schema id=\"RS-legacyk12m4\">");
out.println(" <codec>RS-legacy</codec>");
out.println(" <k>12</k>");
out.println(" <m>4</m>");
out.println(" </schema>");
out.println("</schemas>");
out.println("<policies>");
out.println(" <policy>");
out.println(" <schema>RSk12m4</schema>");
out.println(" <cellsize>-1025</cellsize>");
out.println(" </policy>");
out.println("</policies>");
out.println("</configuration>");
out.close();
ECPolicyLoader ecPolicyLoader = new ECPolicyLoader();
try {
ecPolicyLoader.loadPolicy(POLICY_FILE);
fail("RuntimeException should be thrown for bad policy");
} catch (RuntimeException e) {
assertExceptionContains("Bad policy is found in EC policy"
+ " configuration file", e);
}
}
}

View File

@ -0,0 +1,71 @@
<?xml version="1.0"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<!--
This is the template for user-defined EC policies configuration.
All policies and schemas are defined within the 'configuration' tag
which is the top level element for this XML document. The 'layoutversion'
tag contains the version of EC policy XML file format, and user-defined EC
schemas are included within the 'schemas' tag. The 'policies' tag
contains all the user defined EC policies, and each policy consists of
schema id and cellsize.
-->
<configuration>
<!-- The version of EC policy XML file format, it must be an integer -->
<layoutversion>1</layoutversion>
<schemas>
<!-- schema id is only used to reference internally in this document -->
<schema id="XORk2m1">
<!-- The combination of codec, k, m and options as the schema ID, defines
a unique schema, for example 'xor-2-1'. schema ID is case insensitive -->
<!-- codec with this specific name should exist already in this system -->
<codec>xor</codec>
<k>2</k>
<m>1</m>
<options> </options>
</schema>
<schema id="RSk12m4">
<codec>RS</codec>
<k>12</k>
<m>4</m>
<options> </options>
</schema>
<schema id="RS-legacyk12m4">
<codec>RS-legacy</codec>
<k>12</k>
<m>4</m>
<options> </options>
</schema>
</schemas>
<policies>
<policy>
<!-- the combination of schema ID and cellsize(in unit k) defines a unique
policy, for example 'xor-2-1-256k', case insensitive -->
<!-- schema is referred by its id -->
<schema>XORk2m1</schema>
<!-- cellsize must be an positive integer multiple of 1024(1k) -->
<cellsize>131072</cellsize>
</policy>
<policy>
<schema>RS-legacyk12m4</schema>
<cellsize>262144</cellsize>
</policy>
</policies>
</configuration>

View File

@ -73,6 +73,16 @@ Architecture
Directory-level EC policies only affect new files created within the directory. Once a file has been created, its erasure coding policy can be queried but not changed. If an erasure coded file is renamed to a directory with a different EC policy, the file retains its existing EC policy. Converting a file to a different EC policy requires rewriting its data; do this by copying the file (e.g. via distcp) rather than renaming it.
We allow users to define their own EC policies via an XML file, which must have the following three parts:
1. _layoutversion:_ This indicates the version of EC policy XML file format.
2. _schemas:_ This includes all the user defined EC schemas.
3. _policies:_ This includes all the user defined EC policies, and each policy consists of schema id and the size of a striping cell (cellsize).
A sample EC policy XML file named user_ec_policies.xml.template is in the Hadoop conf directory, which user can reference.
* **Intel ISA-L**
Intel ISA-L stands for Intel Intelligent Storage Acceleration Library. ISA-L is an open-source collection of optimized low-level functions designed for storage applications. It includes fast block Reed-Solomon type erasure codes optimized for Intel AVX and AVX2 instruction sets.
HDFS erasure coding can leverage ISA-L to accelerate encoding and decoding calculation. ISA-L supports most major operating systems, including Linux and Windows.