HDFS-11314. Enforce set of enabled EC policies on the NameNode.

This commit is contained in:
Andrew Wang 2017-03-08 16:41:44 -08:00
parent 5ca6ef0c26
commit 33a38a5341
26 changed files with 401 additions and 110 deletions

View File

@ -562,6 +562,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_DATANODE_DISK_CHECK_TIMEOUT_DEFAULT =
"10m";
public static final String DFS_NAMENODE_EC_POLICIES_ENABLED_KEY = "dfs.namenode.ec.policies.enabled";
public static final String DFS_NAMENODE_EC_POLICIES_ENABLED_DEFAULT = "RS-6-3-64k";
public static final String DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_THREADS_KEY = "dfs.datanode.ec.reconstruction.stripedread.threads";
public static final int DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_THREADS_DEFAULT = 20;
public static final String DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_BUFFER_SIZE_KEY = "dfs.datanode.ec.reconstruction.stripedread.buffer.size";

View File

@ -18,12 +18,17 @@
package org.apache.hadoop.hdfs.server.namenode;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.io.erasurecode.ErasureCodeConstants;
import java.util.Arrays;
import java.util.Map;
import java.util.TreeMap;
import java.util.stream.Collectors;
/**
* This manages erasure coding policies predefined and activated in the system.
@ -61,21 +66,52 @@ public final class ErasureCodingPolicyManager {
SYS_POLICY4, SYS_POLICY5};
// Supported storage policies for striped EC files
private static final byte[] SUITABLE_STORAGE_POLICIES_FOR_EC_STRIPED_MODE = new byte[] {
HdfsConstants.HOT_STORAGE_POLICY_ID, HdfsConstants.COLD_STORAGE_POLICY_ID,
HdfsConstants.ALLSSD_STORAGE_POLICY_ID };
private static final byte[] SUITABLE_STORAGE_POLICIES_FOR_EC_STRIPED_MODE =
new byte[]{
HdfsConstants.HOT_STORAGE_POLICY_ID,
HdfsConstants.COLD_STORAGE_POLICY_ID,
HdfsConstants.ALLSSD_STORAGE_POLICY_ID};
/**
* All active policies maintained in NN memory for fast querying,
* All supported policies maintained in NN memory for fast querying,
* identified and sorted by its name.
*/
private final Map<String, ErasureCodingPolicy> activePoliciesByName;
private static final Map<String, ErasureCodingPolicy> SYSTEM_POLICIES_BY_NAME;
ErasureCodingPolicyManager() {
this.activePoliciesByName = new TreeMap<>();
static {
// Create a hashmap of all available policies for quick lookup by name
SYSTEM_POLICIES_BY_NAME = new TreeMap<>();
for (ErasureCodingPolicy policy : SYS_POLICIES) {
activePoliciesByName.put(policy.getName(), policy);
SYSTEM_POLICIES_BY_NAME.put(policy.getName(), policy);
}
}
/**
* All enabled policies maintained in NN memory for fast querying,
* identified and sorted by its name.
*/
private final Map<String, ErasureCodingPolicy> enabledPoliciesByName;
ErasureCodingPolicyManager(Configuration conf) {
// Populate the list of enabled policies from configuration
final String[] policyNames = conf.getTrimmedStrings(
DFSConfigKeys.DFS_NAMENODE_EC_POLICIES_ENABLED_KEY,
DFSConfigKeys.DFS_NAMENODE_EC_POLICIES_ENABLED_DEFAULT);
this.enabledPoliciesByName = new TreeMap<>();
for (String policyName : policyNames) {
ErasureCodingPolicy ecPolicy = SYSTEM_POLICIES_BY_NAME.get(policyName);
if (ecPolicy == null) {
String sysPolicies = Arrays.asList(SYS_POLICIES).stream()
.map(ErasureCodingPolicy::getName)
.collect(Collectors.joining(", "));
String msg = String.format("EC policy %s specified at %s is not a " +
"valid policy. Please choose from list of available policies: " +
"[%s]",
policyName,
DFSConfigKeys.DFS_NAMENODE_EC_POLICIES_ENABLED_KEY,
sysPolicies);
throw new IllegalArgumentException(msg);
}
enabledPoliciesByName.put(ecPolicy.getName(), ecPolicy);
}
/**
@ -104,10 +140,10 @@ public static ErasureCodingPolicy getSystemDefaultPolicy() {
}
/**
* Get system-wide policy by policy ID.
* @return ecPolicy
* Get a policy by policy ID.
* @return ecPolicy, or null if not found
*/
public static ErasureCodingPolicy getPolicyByPolicyID(byte id) {
public static ErasureCodingPolicy getPolicyByID(byte id) {
for (ErasureCodingPolicy policy : SYS_POLICIES) {
if (policy.getId() == id) {
return policy;
@ -117,36 +153,33 @@ public static ErasureCodingPolicy getPolicyByPolicyID(byte id) {
}
/**
* Get all policies that's available to use.
* Get a policy by policy name.
* @return ecPolicy, or null if not found
*/
public static ErasureCodingPolicy getPolicyByName(String name) {
return SYSTEM_POLICIES_BY_NAME.get(name);
}
/**
* Get the set of enabled policies.
* @return all policies
*/
public ErasureCodingPolicy[] getPolicies() {
public ErasureCodingPolicy[] getEnabledPolicies() {
ErasureCodingPolicy[] results =
new ErasureCodingPolicy[activePoliciesByName.size()];
return activePoliciesByName.values().toArray(results);
new ErasureCodingPolicy[enabledPoliciesByName.size()];
return enabledPoliciesByName.values().toArray(results);
}
/**
* Get the policy specified by the policy name.
* Get enabled policy by policy name.
*/
public ErasureCodingPolicy getPolicyByName(String name) {
return activePoliciesByName.get(name);
public ErasureCodingPolicy getEnabledPolicyByName(String name) {
return enabledPoliciesByName.get(name);
}
/**
* Get the policy specified by the policy ID.
*/
public ErasureCodingPolicy getPolicyByID(byte id) {
for (ErasureCodingPolicy policy : activePoliciesByName.values()) {
if (policy.getId() == id) {
return policy;
}
}
return null;
}
/**
* @return True if given policy is be suitable for striped EC Files.
* @return if the specified storage policy ID is suitable for striped EC
* files.
*/
public static boolean checkStoragePolicySuitableForECStripedMode(
byte storagePolicyID) {
@ -164,6 +197,6 @@ public static boolean checkStoragePolicySuitableForECStripedMode(
* Clear and clean up.
*/
public void clear() {
activePoliciesByName.clear();
enabledPoliciesByName.clear();
}
}

View File

@ -23,9 +23,10 @@
import java.io.DataOutputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.List;
import java.util.stream.Collectors;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
@ -34,6 +35,7 @@
import org.apache.hadoop.fs.XAttr;
import org.apache.hadoop.fs.XAttrSetFlag;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.XAttrHelper;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
@ -82,11 +84,23 @@ static HdfsFileStatus setErasureCodingPolicy(final FSNamesystem fsn,
fsd.writeLock();
try {
ErasureCodingPolicy ecPolicy = fsn.getErasureCodingPolicyManager()
.getPolicyByName(ecPolicyName);
.getEnabledPolicyByName(ecPolicyName);
if (ecPolicy == null) {
throw new HadoopIllegalArgumentException("Policy '" +
ecPolicyName + "' does not match any supported erasure coding " +
"policies.");
final String sysPolicies =
Arrays.asList(
fsn.getErasureCodingPolicyManager().getEnabledPolicies())
.stream()
.map(ErasureCodingPolicy::getName)
.collect(Collectors.joining(", "));
final String message = String.format("Policy '%s' does not match any " +
"enabled erasure" +
" coding policies: [%s]. The set of enabled erasure coding " +
"policies can be configured at '%s'.",
ecPolicyName,
sysPolicies,
DFSConfigKeys.DFS_NAMENODE_EC_POLICIES_ENABLED_KEY
);
throw new HadoopIllegalArgumentException(message);
}
iip = fsd.resolvePath(pc, src, DirOp.WRITE_LINK);
// Write access is required to set erasure coding policy
@ -118,26 +132,6 @@ private static List<XAttr> setErasureCodingPolicyXAttr(final FSNamesystem fsn,
"for a file " + src);
}
// Check that the EC policy is one of the active policies.
boolean validPolicy = false;
ErasureCodingPolicy[] activePolicies =
FSDirErasureCodingOp.getErasureCodingPolicies(fsd.getFSNamesystem());
for (ErasureCodingPolicy activePolicy : activePolicies) {
if (activePolicy.equals(ecPolicy)) {
validPolicy = true;
break;
}
}
if (!validPolicy) {
List<String> ecPolicyNames = new ArrayList<String>();
for (ErasureCodingPolicy activePolicy : activePolicies) {
ecPolicyNames.add(activePolicy.getName());
}
throw new HadoopIllegalArgumentException("Policy [ " +
ecPolicy.getName() + " ] does not match any of the " +
"supported policies. Please select any one of " + ecPolicyNames);
}
final XAttr ecXAttr;
DataOutputStream dOut = null;
try {
@ -291,7 +285,7 @@ static ErasureCodingPolicy unprotectedGetErasureCodingPolicy(
static ErasureCodingPolicy[] getErasureCodingPolicies(final FSNamesystem fsn)
throws IOException {
assert fsn.hasReadLock();
return fsn.getErasureCodingPolicyManager().getPolicies();
return fsn.getErasureCodingPolicyManager().getEnabledPolicies();
}
private static ErasureCodingPolicy getErasureCodingPolicyForPath(
@ -307,8 +301,8 @@ private static ErasureCodingPolicy getErasureCodingPolicyForPath(
}
if (inode.isFile()) {
byte id = inode.asFile().getErasureCodingPolicyID();
return id < 0 ? null : fsd.getFSNamesystem().
getErasureCodingPolicyManager().getPolicyByID(id);
return id < 0 ? null :
ErasureCodingPolicyManager.getPolicyByID(id);
}
// We don't allow setting EC policies on paths with a symlink. Thus
// if a symlink is encountered, the dir shouldn't have EC policy.
@ -323,8 +317,8 @@ private static ErasureCodingPolicy getErasureCodingPolicyForPath(
ByteArrayInputStream bIn = new ByteArrayInputStream(xattr.getValue());
DataInputStream dIn = new DataInputStream(bIn);
String ecPolicyName = WritableUtils.readString(dIn);
return fsd.getFSNamesystem().getErasureCodingPolicyManager().
getPolicyByName(ecPolicyName);
return ErasureCodingPolicyManager
.getPolicyByName(ecPolicyName);
}
}
}

View File

@ -335,7 +335,7 @@ private INodeFile loadINodeFile(INodeSection.INode n) {
assert ((!isStriped) || (isStriped && !f.hasReplication()));
Short replication = (!isStriped ? (short) f.getReplication() : null);
ErasureCodingPolicy ecPolicy = isStriped ?
ErasureCodingPolicyManager.getPolicyByPolicyID(
ErasureCodingPolicyManager.getPolicyByID(
(byte) f.getErasureCodingPolicyID()) : null;
Byte ecPolicyID = (isStriped ? ecPolicy.getId() : null);

View File

@ -838,7 +838,7 @@ static FSNamesystem loadFromDisk(Configuration conf) throws IOException {
this.dir = new FSDirectory(this, conf);
this.snapshotManager = new SnapshotManager(dir);
this.cacheManager = new CacheManager(this, conf, blockManager);
this.ecPolicyManager = new ErasureCodingPolicyManager();
this.ecPolicyManager = new ErasureCodingPolicyManager(conf);
this.topConf = new TopConf(conf);
this.auditLoggers = initAuditLoggers(conf);
this.isDefaultAuditLogger = auditLoggers.size() == 1 &&

View File

@ -192,7 +192,7 @@ static long getBlockLayoutRedundancy(final BlockType blockType,
Preconditions.checkArgument(replication == null &&
erasureCodingPolicyID != null);
Preconditions.checkArgument(ErasureCodingPolicyManager
.getPolicyByPolicyID(erasureCodingPolicyID) != null,
.getPolicyByID(erasureCodingPolicyID) != null,
"Could not find EC policy with ID 0x" + StringUtils
.byteToHexString(erasureCodingPolicyID));
layoutRedundancy |= BLOCK_TYPE_MASK_STRIPED;
@ -514,7 +514,7 @@ public short getPreferredBlockReplication() {
}
ErasureCodingPolicy ecPolicy =
ErasureCodingPolicyManager.getPolicyByPolicyID(
ErasureCodingPolicyManager.getPolicyByID(
getErasureCodingPolicyID());
Preconditions.checkNotNull(ecPolicy, "Could not find EC policy with ID 0x"
+ StringUtils.byteToHexString(getErasureCodingPolicyID()));

View File

@ -20,6 +20,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.tools.TableListing;
@ -79,7 +80,7 @@ public int run(String[] args) throws Exception {
}
}
/** Command to list the set of available erasure coding policies */
/** Command to list the set of enabled erasure coding policies. */
private static class ListECPoliciesCommand
implements AdminHelper.Command {
@Override
@ -95,7 +96,9 @@ public String getShortUsage() {
@Override
public String getLongUsage() {
return getShortUsage() + "\n" +
"Get the list of supported erasure coding policies.\n";
"Get the list of enabled erasure coding policies.\n" +
"Policies can be enabled on the NameNode via `" +
DFSConfigKeys.DFS_NAMENODE_EC_POLICIES_ENABLED_KEY + "`.\n";
}
@Override
@ -109,10 +112,19 @@ public int run(Configuration conf, List<String> args) throws IOException {
try {
Collection<ErasureCodingPolicy> policies =
dfs.getAllErasureCodingPolicies();
System.out.println("Erasure Coding Policies:");
for (ErasureCodingPolicy policy : policies) {
if (policy != null) {
System.out.println("\t" + policy.getName());
if (policies.isEmpty()) {
System.out.println("No erasure coding policies are enabled on the " +
"cluster.");
System.out.println("The set of enabled policies can be " +
"configured at '" +
DFSConfigKeys.DFS_NAMENODE_EC_POLICIES_ENABLED_KEY + "' on the " +
"NameNode.");
} else {
System.out.println("Erasure Coding Policies:");
for (ErasureCodingPolicy policy : policies) {
if (policy != null) {
System.out.println("\t" + policy.getName());
}
}
}
} catch (IOException e) {

View File

@ -2928,6 +2928,15 @@
</description>
</property>
<property>
<name>dfs.namenode.ec.policies.enabled</name>
<value>RS-6-3-64k</value>
<description>Comma-delimited list of enabled erasure coding policies.
The NameNode will enforce this when setting an erasure coding policy
on a directory.
</description>
</property>
<property>
<name>dfs.datanode.ec.reconstruction.stripedread.timeout.millis</name>
<value>5000</value>

View File

@ -56,19 +56,26 @@ Architecture
3. _Transfer the generated data blocks to target nodes:_ Once decoding is finished, the recovered blocks are transferred to target DataNodes.
* **ErasureCoding policy**
To accommodate heterogeneous workloads, we allow files and directories in an HDFS cluster to have different replication and EC policies.
Information on how to encode/decode a file is encapsulated in an ErasureCodingPolicy class. Each policy is defined by the following 2 pieces of information:
* **Erasure coding policies**
To accommodate heterogeneous workloads, we allow files and directories in an HDFS cluster to have different replication and erasure coding policies.
The erasure coding policy encapsulates how to encode/decode a file. Each policy is defined by the following pieces of information:
1. _The ECSchema:_ This includes the numbers of data and parity blocks in an EC group (e.g., 6+3), as well as the codec algorithm (e.g., Reed-Solomon).
1. _The EC schema:_ This includes the numbers of data and parity blocks in an EC group (e.g., 6+3), as well as the codec algorithm (e.g., Reed-Solomon, XOR).
2. _The size of a striping cell._ This determines the granularity of striped reads and writes, including buffer sizes and encoding work.
Five policies are currently supported: RS-3-2-64k, RS-6-3-64k, RS-10-4-64k, RS-LEGACY-6-3-64k, and XOR-2-1-64k. All with default cell size of 64KB. The system default policy is RS-6-3-64k which use the default schema RS_6_3_SCHEMA with a cell size of 64KB.
Policies are named *codec*-*num data blocks*-*num parity blocks*-*cell size*. Currently, five built-in policies are supported: `RS-3-2-64k`, `RS-6-3-64k`, `RS-10-4-64k`, `RS-LEGACY-6-3-64k`, and `XOR-2-1-64k`.
By default, only `RS-6-3-64k` is enabled.
Similar to HDFS storage policies, erasure coding policies are set on a directory. When a file is created, it inherits the EC policy of its nearest ancestor directory.
Directory-level EC policies only affect new files created within the directory. Once a file has been created, its erasure coding policy can be queried but not changed. If an erasure coded file is renamed to a directory with a different EC policy, the file retains its existing EC policy. Converting a file to a different EC policy requires rewriting its data; do this by copying the file (e.g. via distcp) rather than renaming it.
* **Intel ISA-L**
Intel ISA-L stands for Intel Intelligent Storage Acceleration Library. ISA-L is a collection of optimized low-level functions used primarily in storage applications. It includes a fast block Reed-Solomon type erasure codes optimized for Intel AVX and AVX2 instruction sets.
HDFS EC can leverage this open-source library to accelerate encoding and decoding calculation. ISA-L supports most of major operating systems, including Linux and Windows. By default, ISA-L is not enabled in HDFS.
Intel ISA-L stands for Intel Intelligent Storage Acceleration Library. ISA-L is an open-source collection of optimized low-level functions designed for storage applications. It includes fast block Reed-Solomon type erasure codes optimized for Intel AVX and AVX2 instruction sets.
HDFS erasure coding can leverage ISA-L to accelerate encoding and decoding calculation. ISA-L supports most major operating systems, including Linux and Windows.
ISA-L is not enabled by default. See the instructions below for how to enable ISA-L.
Deployment
----------
@ -90,6 +97,10 @@ Deployment
### Configuration keys
The set of enabled erasure coding policies can be configured on the NameNode via `dfs.namenode.ec.policies.enabled`. This restricts what EC policies can be set by clients. It does not affect the behavior of already set file or directory-level EC policies.
By default, only the `RS-6-3-64k` policy is enabled. Typically, the cluster administrator will configure the set of enabled policies based on the size of the cluster and the desired fault-tolerance properties. For instance, for a cluster with 9 racks, a policy like `RS-10-4-64k` will not preserve rack-level fault-tolerance, and `RS-6-3-64k` or `RS-3-2-64k` might be more appropriate. If the administrator only cares about node-level fault-tolerance, `RS-10-4-64k` would still be appropriate as long as there are at least 14 DataNodes in the cluster.
The codec implementation for Reed-Solomon and XOR can be configured with the following client and DataNode configuration keys:
`io.erasurecode.codec.rs.rawcoder` for the default RS codec,
`io.erasurecode.codec.rs-legacy.rawcoder` for the legacy RS codec,
@ -106,13 +117,13 @@ Deployment
### Enable Intel ISA-L
HDFS native implementation of default RS codec leverages Intel ISA-L library to improve the encoding and decoding calculation. To enable and use Intel ISA-L, there are three steps.
1. Build ISA-L library. Please refer to the offical site "https://github.com/01org/isa-l/" for detail information.
2. Build Hadoop with ISA-L support. Please refer to "Intel ISA-L build options" section in "Build instructions for Hadoop"(BUILDING.txt) document. Use -Dbundle.isal to copy the contents of the isal.lib directory into the final tar file. Deploy hadoop with the tar file. Make sure ISA-L library is available on both HDFS client and DataNodes.
3. Configure the `io.erasurecode.codec.rs.rawcoder` key with value `org.apache.hadoop.io.erasurecode.rawcoder.NativeRSRawErasureCoderFactory` on HDFS client and DataNodes.
1. Build ISA-L library. Please refer to the official site "https://github.com/01org/isa-l/" for detail information.
2. Build Hadoop with ISA-L support. Please refer to "Intel ISA-L build options" section in "Build instructions for Hadoop" in (BUILDING.txt) in the source code. Use `-Dbundle.isal` to copy the contents of the `isal.lib` directory into the final tar file. Deploy Hadoop with the tar file. Make sure ISA-L is available on HDFS clients and DataNodes.
3. Configure the `io.erasurecode.codec.rs.rawcoder` key with value `org.apache.hadoop.io.erasurecode.rawcoder.NativeRSRawErasureCoderFactory` on HDFS clients and DataNodes.
To check ISA-L library enable state, try "Hadoop checknative" command. It will tell you if ISA-L library is enabled or not.
To verify that ISA-L is correctly detected by Hadoop, run the `hadoop checknative` command.
It also requires three steps to enable the native implementation of XOR codec. The first two steps are the same as the above step 1 and step 2. In step 3, configure the `io.erasurecode.codec.xor.rawcoder` key with `org.apache.hadoop.io.erasurecode.rawcoder.NativeXORRawErasureCoderFactory` on both HDFS client and DataNodes.
To enable the native implementation of the XOR codec, perform the same first two steps as above to build and deploy Hadoop with ISA-L support. Afterwards, configure the `io.erasurecode.codec.xor.rawcoder` key with `org.apache.hadoop.io.erasurecode.rawcoder.NativeXORRawErasureCoderFactory` on both HDFS client and DataNodes.
### Administrative commands
@ -130,20 +141,20 @@ Below are the details about each command.
* `[-setPolicy -policy <policyName> -path <path>]`
Sets an ErasureCoding policy on a directory at the specified path.
Sets an erasure coding policy on a directory at the specified path.
`path`: An directory in HDFS. This is a mandatory parameter. Setting a policy only affects newly created files, and does not affect existing files.
`policyName`: The ErasureCoding policy to be used for files under this directory.
`policyName`: The erasure coding policy to be used for files under this directory.
* `[-getPolicy -path <path>]`
Get details of the ErasureCoding policy of a file or directory at the specified path.
Get details of the erasure coding policy of a file or directory at the specified path.
* `[-unsetPolicy -path <path>]`
Unset an ErasureCoding policy set by a previous call to "setPolicy" on a directory. If the directory inherits the ErasureCoding policy from an ancestor directory, "unsetPolicy" is a no-op. Unsetting the policy on a directory which doesn't have an explicit policy set will not return an error.
Unset an erasure coding policy set by a previous call to `setPolicy` on a directory. If the directory inherits the erasure coding policy from an ancestor directory, `unsetPolicy` is a no-op. Unsetting the policy on a directory which doesn't have an explicit policy set will not return an error.
* `[-listPolicies]`
Lists all supported ErasureCoding policies. These names are suitable for use with the `setPolicy` command.
Lists the set of enabled erasure coding policies. These names are suitable for use with the `setPolicy` command.

View File

@ -45,6 +45,9 @@ public class TestErasureCodingCLI extends CLITestHelper {
public void setUp() throws Exception {
super.setUp();
conf.set(DFSConfigKeys.DFS_NAMENODE_EC_POLICIES_ENABLED_KEY,
"RS-6-3-64k,RS-3-2-64k,XOR-2-1-64k");
dfsCluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(NUM_OF_DATANODES).build();
dfsCluster.waitClusterUp();

View File

@ -68,6 +68,8 @@
import java.util.UUID;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import com.google.common.base.Charsets;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
@ -279,6 +281,15 @@ public static void setEditLogForTesting(FSNamesystem fsn, FSEditLog newLog) {
Whitebox.setInternalState(fsn.getFSDirectory(), "editLog", newLog);
}
public static void enableAllECPolicies(Configuration conf) {
// Enable all the available EC policies
String policies =
Arrays.asList(ErasureCodingPolicyManager.getSystemPolicies()).stream()
.map(ErasureCodingPolicy::getName)
.collect(Collectors.joining(","));
conf.set(DFSConfigKeys.DFS_NAMENODE_EC_POLICIES_ENABLED_KEY, policies);
}
/** class MyFile contains enough information to recreate the contents of
* a single file.
*/

View File

@ -29,7 +29,7 @@ public class TestDFSRSDefault10x4StripedInputStream extends
TestDFSStripedInputStream {
public ErasureCodingPolicy getEcPolicy() {
return ErasureCodingPolicyManager.getPolicyByPolicyID(
return ErasureCodingPolicyManager.getPolicyByID(
HdfsConstants.RS_10_4_POLICY_ID);
}
}

View File

@ -30,7 +30,7 @@ public class TestDFSRSDefault10x4StripedOutputStream
@Override
public ErasureCodingPolicy getEcPolicy() {
return ErasureCodingPolicyManager.getPolicyByPolicyID(
return ErasureCodingPolicyManager.getPolicyByID(
HdfsConstants.RS_10_4_POLICY_ID);
}
}

View File

@ -30,7 +30,7 @@ public class TestDFSRSDefault10x4StripedOutputStreamWithFailure
@Override
public ErasureCodingPolicy getEcPolicy() {
return ErasureCodingPolicyManager.getPolicyByPolicyID(
return ErasureCodingPolicyManager.getPolicyByID(
HdfsConstants.RS_10_4_POLICY_ID);
}
}

View File

@ -94,6 +94,8 @@ public void setup() throws IOException {
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 0);
conf.set(DFSConfigKeys.DFS_NAMENODE_EC_POLICIES_ENABLED_KEY,
getEcPolicy().getName());
if (ErasureCodeNative.isNativeCodeLoaded()) {
conf.set(
CodecUtil.IO_ERASURECODE_CODEC_RS_RAWCODER_KEY,

View File

@ -88,6 +88,7 @@ public void setup() throws IOException {
CodecUtil.IO_ERASURECODE_CODEC_RS_RAWCODER_KEY,
NativeRSRawErasureCoderFactory.class.getCanonicalName());
}
DFSTestUtil.enableAllECPolicies(conf);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
cluster.getFileSystem().getClient().setErasureCodingPolicy("/", ecPolicy
.getName());

View File

@ -217,6 +217,7 @@ private void setup(Configuration conf) throws IOException {
CodecUtil.IO_ERASURECODE_CODEC_RS_RAWCODER_KEY,
NativeRSRawErasureCoderFactory.class.getCanonicalName());
}
DFSTestUtil.enableAllECPolicies(conf);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
cluster.waitActive();
dfs = cluster.getFileSystem();

View File

@ -27,7 +27,7 @@
public class TestDFSXORStripedInputStream extends TestDFSStripedInputStream{
public ErasureCodingPolicy getEcPolicy() {
return ErasureCodingPolicyManager.getPolicyByPolicyID(
return ErasureCodingPolicyManager.getPolicyByID(
HdfsConstants.XOR_2_1_POLICY_ID);
}
}

View File

@ -29,7 +29,7 @@ public class TestDFSXORStripedOutputStream extends TestDFSStripedOutputStream{
@Override
public ErasureCodingPolicy getEcPolicy() {
return ErasureCodingPolicyManager.getPolicyByPolicyID(
return ErasureCodingPolicyManager.getPolicyByID(
HdfsConstants.XOR_2_1_POLICY_ID);
}
}

View File

@ -30,7 +30,7 @@ public class TestDFSXORStripedOutputStreamWithFailure
@Override
public ErasureCodingPolicy getEcPolicy() {
return ErasureCodingPolicyManager.getPolicyByPolicyID(
return ErasureCodingPolicyManager.getPolicyByID(
HdfsConstants.XOR_2_1_POLICY_ID);
}
}

View File

@ -23,6 +23,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.server.namenode.ErasureCodingPolicyManager;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
@ -34,6 +35,7 @@
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@ -64,6 +66,7 @@ public class TestErasureCodingPolicies {
public void setupCluster() throws IOException {
conf = new HdfsConfiguration();
conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
DFSTestUtil.enableAllECPolicies(conf);
cluster = new MiniDFSCluster.Builder(conf).
numDataNodes(1).build();
cluster.waitActive();
@ -189,6 +192,40 @@ public void testBasicSetECPolicy()
} catch (IOException e) {
assertExceptionContains("erasure coding policy for a file", e);
}
// Verify that policies are successfully loaded even when policies
// are disabled
cluster.getConfiguration(0).set(
DFSConfigKeys.DFS_NAMENODE_EC_POLICIES_ENABLED_KEY, "");
cluster.restartNameNodes();
cluster.waitActive();
// No policies should be enabled after restart
Assert.assertTrue("No policies should be enabled after restart",
fs.getAllErasureCodingPolicies().isEmpty());
// Already set directory-level policies should still be in effect
Path disabledPolicy = new Path(dir1, "afterDisabled");
Assert.assertEquals("Dir does not have policy set",
EC_POLICY,
fs.getErasureCodingPolicy(dir1));
fs.create(disabledPolicy).close();
Assert.assertEquals("File did not inherit dir's policy",
EC_POLICY,
fs.getErasureCodingPolicy(disabledPolicy));
// Also check loading disabled EC policies from fsimage
fs.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_ENTER);
fs.saveNamespace();
fs.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE);
cluster.restartNameNodes();
Assert.assertEquals("Dir does not have policy set",
EC_POLICY,
fs.getErasureCodingPolicy(dir1));
Assert.assertEquals("File does not have policy set",
EC_POLICY,
fs.getErasureCodingPolicy(disabledPolicy));
}
@Test
@ -310,7 +347,7 @@ public void testSetInvalidPolicy()
+ "setting an invalid erasure coding policy");
} catch (Exception e) {
assertExceptionContains("Policy 'RS-4-2-128k' does not match " +
"any supported erasure coding policies",e);
"any enabled erasure coding policies", e);
}
}
@ -320,7 +357,7 @@ public void testGetAllErasureCodingPolicies() throws Exception {
.getSystemPolicies();
Collection<ErasureCodingPolicy> allECPolicies = fs
.getAllErasureCodingPolicies();
assertTrue("All system policies should be active",
assertTrue("All system policies should be enabled",
allECPolicies.containsAll(Arrays.asList(sysECPolicies)));
}

View File

@ -72,6 +72,7 @@ public void setup() throws IOException {
CodecUtil.IO_ERASURECODE_CODEC_RS_RAWCODER_KEY,
NativeRSRawErasureCoderFactory.class.getCanonicalName());
}
DFSTestUtil.enableAllECPolicies(conf);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(
dataBlocks + parityBlocks).build();
cluster.waitActive();
@ -139,7 +140,7 @@ public void testNestedEcPolicy() throws Exception {
final Path ec32FilePath = new Path(childDir, "ec_3_2_file");
final Path ec63FilePath2 = new Path(childDir, "ec_6_3_file_2");
final ErasureCodingPolicy ec32Policy = ErasureCodingPolicyManager
.getPolicyByPolicyID(HdfsConstants.RS_3_2_POLICY_ID);
.getPolicyByID(HdfsConstants.RS_3_2_POLICY_ID);
fs.mkdirs(parentDir);
fs.setErasureCodingPolicy(parentDir, ecPolicy.getName());
@ -237,7 +238,7 @@ public void testChangeRootDirEcPolicy() throws Exception {
final Path ec63FilePath = new Path(rootPath, "ec_6_3_file");
final Path ec32FilePath = new Path(rootPath, "ec_3_2_file");
final ErasureCodingPolicy ec32Policy = ErasureCodingPolicyManager
.getPolicyByPolicyID(HdfsConstants.RS_3_2_POLICY_ID);
.getPolicyByID(HdfsConstants.RS_3_2_POLICY_ID);
fs.unsetErasureCodingPolicy(rootPath);
fs.setErasureCodingPolicy(rootPath, ecPolicy.getName());

View File

@ -0,0 +1,151 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.stream.Collectors;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
/**
* Test that ErasureCodingPolicyManager correctly parses the set of enabled
* erasure coding policies from configuration and exposes this information.
*/
public class TestEnabledECPolicies {
private static final ErasureCodingPolicy[] SYSTEM_POLICIES =
ErasureCodingPolicyManager.getSystemPolicies();
@Rule
public Timeout testTimeout = new Timeout(60000);
private void expectInvalidPolicy(String value) {
HdfsConfiguration conf = new HdfsConfiguration();
conf.set(DFSConfigKeys.DFS_NAMENODE_EC_POLICIES_ENABLED_KEY,
value);
try {
new ErasureCodingPolicyManager(conf);
fail("Expected exception when instantiating ECPolicyManager");
} catch (IllegalArgumentException e) {
GenericTestUtils.assertExceptionContains("is not a valid policy", e);
}
}
private void expectValidPolicy(String value, final int numEnabled) throws
Exception {
HdfsConfiguration conf = new HdfsConfiguration();
conf.set(DFSConfigKeys.DFS_NAMENODE_EC_POLICIES_ENABLED_KEY,
value);
ErasureCodingPolicyManager manager = new ErasureCodingPolicyManager(conf);
assertEquals("Incorrect number of enabled policies",
numEnabled, manager.getEnabledPolicies().length);
}
@Test
public void testInvalid() throws Exception {
// Test first with an invalid policy
expectInvalidPolicy("not-a-policy");
// Test with an invalid policy and a valid policy
expectInvalidPolicy("not-a-policy," + ErasureCodingPolicyManager
.getSystemDefaultPolicy().getName());
// Test with a valid and an invalid policy
expectInvalidPolicy(ErasureCodingPolicyManager
.getSystemDefaultPolicy().getName() + ", not-a-policy");
// Some more invalid values
expectInvalidPolicy("not-a-policy, ");
expectInvalidPolicy(" ,not-a-policy, ");
}
@Test
public void testValid() throws Exception {
String ecPolicyName = ErasureCodingPolicyManager.getSystemDefaultPolicy()
.getName();
expectValidPolicy(ecPolicyName, 1);
expectValidPolicy(ecPolicyName + ", ", 1);
expectValidPolicy(",", 0);
}
@Test
public void testGetPolicies() throws Exception {
ErasureCodingPolicy[] enabledPolicies;
// Enable no policies
enabledPolicies = new ErasureCodingPolicy[]
{SYSTEM_POLICIES[1], SYSTEM_POLICIES[2]};
testGetPolicies(enabledPolicies);
// Enable one policy
enabledPolicies = new ErasureCodingPolicy[]
{SYSTEM_POLICIES[1]};
testGetPolicies(enabledPolicies);
// Enable two policies
enabledPolicies = new ErasureCodingPolicy[]
{SYSTEM_POLICIES[1], SYSTEM_POLICIES[2]};
testGetPolicies(enabledPolicies);
}
private void testGetPolicies(ErasureCodingPolicy[] enabledPolicies)
throws Exception {
HdfsConfiguration conf = new HdfsConfiguration();
conf.set(DFSConfigKeys.DFS_NAMENODE_EC_POLICIES_ENABLED_KEY,
Arrays.asList(enabledPolicies).stream()
.map(ErasureCodingPolicy::getName)
.collect(Collectors.joining(", ")));
ErasureCodingPolicyManager manager = new ErasureCodingPolicyManager(conf);
// Check that returned values are unique
Set<String> found = new HashSet<>();
for (ErasureCodingPolicy p : manager.getEnabledPolicies()) {
Assert.assertFalse("Duplicate policy name found: " + p.getName(),
found.contains(p.getName()));
found.add(p.getName());
}
// Check that the policies specified in conf are found
for (ErasureCodingPolicy p: enabledPolicies) {
Assert.assertTrue("Did not find specified EC policy " + p.getName(),
found.contains(p.getName()));
}
Assert.assertEquals(enabledPolicies.length, found.size());
// Check that getEnabledPolicyByName only returns enabled policies
for (ErasureCodingPolicy p: SYSTEM_POLICIES) {
if (found.contains(p.getName())) {
// Enabled policy should be present
Assert.assertNotNull(
"getEnabledPolicyByName did not find enabled policy" + p.getName(),
manager.getEnabledPolicyByName(p.getName()));
} else {
// Disabled policy should not be present
Assert.assertNull(
"getEnabledPolicyByName found disabled policy " + p.getName(),
manager.getEnabledPolicyByName(p.getName()));
}
}
}
}

View File

@ -77,7 +77,7 @@ public class TestFSImage {
private static final String HADOOP_2_7_ZER0_BLOCK_SIZE_TGZ =
"image-with-zero-block-size.tar.gz";
private static final ErasureCodingPolicy testECPolicy =
ErasureCodingPolicyManager.getPolicyByPolicyID(
ErasureCodingPolicyManager.getPolicyByID(
HdfsConstants.RS_10_4_POLICY_ID);
@Test
@ -240,6 +240,7 @@ private void testSaveAndLoadStripedINodeFile(FSNamesystem fsn, Configuration con
@Test
public void testSaveAndLoadStripedINodeFile() throws IOException{
Configuration conf = new Configuration();
DFSTestUtil.enableAllECPolicies(conf);
MiniDFSCluster cluster = null;
try {
cluster = new MiniDFSCluster.Builder(conf).build();
@ -260,6 +261,7 @@ public void testSaveAndLoadStripedINodeFile() throws IOException{
public void testSaveAndLoadStripedINodeFileUC() throws IOException {
// construct a INode with StripedBlock for saving and loading
Configuration conf = new Configuration();
DFSTestUtil.enableAllECPolicies(conf);
MiniDFSCluster cluster = null;
try {
cluster = new MiniDFSCluster.Builder(conf).build();
@ -459,6 +461,7 @@ public void testSupportBlockGroup() throws Exception {
final int BLOCK_SIZE = 8 * 1024 * 1024;
Configuration conf = new HdfsConfiguration();
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
DFSTestUtil.enableAllECPolicies(conf);
MiniDFSCluster cluster = null;
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(GROUP_SIZE)
@ -468,7 +471,7 @@ public void testSupportBlockGroup() throws Exception {
Path parentDir = new Path("/ec-10-4");
Path childDir = new Path(parentDir, "ec-3-2");
ErasureCodingPolicy ec32Policy = ErasureCodingPolicyManager
.getPolicyByPolicyID(HdfsConstants.RS_3_2_POLICY_ID);
.getPolicyByID(HdfsConstants.RS_3_2_POLICY_ID);
// Create directories and files
fs.mkdirs(parentDir);
@ -516,7 +519,7 @@ public void testSupportBlockGroup() throws Exception {
// check the information of file_3_2
inode = fsn.dir.getINode(file_3_2.toString()).asFile();
assertTrue(inode.isStriped());
assertEquals(ErasureCodingPolicyManager.getPolicyByPolicyID(
assertEquals(ErasureCodingPolicyManager.getPolicyByID(
HdfsConstants.RS_3_2_POLICY_ID).getId(),
inode.getErasureCodingPolicyID());
blks = inode.getBlocks();

View File

@ -124,6 +124,10 @@ public static void createOriginalFSImage() throws IOException {
tempDir = Files.createTempDir();
MiniDFSCluster cluster = null;
try {
final ErasureCodingPolicy ecPolicy =
ErasureCodingPolicyManager.getPolicyByID(
HdfsConstants.XOR_2_1_POLICY_ID);
Configuration conf = new Configuration();
conf.setLong(
DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_MAX_LIFETIME_KEY, 10000);
@ -134,6 +138,8 @@ public static void createOriginalFSImage() throws IOException {
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, true);
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTH_TO_LOCAL,
"RULE:[2:$1@$0](JobTracker@.*FOO.COM)s/@.*//" + "DEFAULT");
conf.set(DFSConfigKeys.DFS_NAMENODE_EC_POLICIES_ENABLED_KEY,
ecPolicy.getName());
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
cluster.waitActive();
DistributedFileSystem hdfs = cluster.getFileSystem();
@ -233,9 +239,6 @@ public static void createOriginalFSImage() throws IOException {
Path ecDir = new Path("/ec");
hdfs.mkdirs(ecDir);
dirCount++;
ErasureCodingPolicy ecPolicy =
ErasureCodingPolicyManager.getPolicyByPolicyID(
HdfsConstants.XOR_2_1_POLICY_ID);
hdfs.getClient().setErasureCodingPolicy(ecDir.toString(),
ecPolicy.getName());
writtenFiles.put(ecDir.toString(), hdfs.getFileStatus(ecDir));
@ -409,7 +412,7 @@ public void endElement(String uri, String localName, String qName)
Assert.assertEquals("INode '"
+ currentInodeName + "' has unexpected EC Policy!",
Byte.parseByte(currentECPolicy),
ErasureCodingPolicyManager.getPolicyByPolicyID(
ErasureCodingPolicyManager.getPolicyByID(
HdfsConstants.XOR_2_1_POLICY_ID).getId());
Assert.assertEquals("INode '"
+ currentInodeName + "' has unexpected replication!",

View File

@ -119,7 +119,7 @@
<comparators>
<comparator>
<type>SubstringComparator</type>
<expected-output>Get the list of supported erasure coding policies</expected-output>
<expected-output>Get the list of enabled erasure coding policies</expected-output>
</comparator>
<comparator>
<type>SubstringComparator</type>
@ -359,7 +359,24 @@
<comparators>
<comparator>
<type>SubstringComparator</type>
<expected-output>Policy 'invalidpolicy' does not match any supported erasure coding policies.</expected-output>
<expected-output>Policy 'invalidpolicy' does not match any enabled erasure coding policies</expected-output>
</comparator>
</comparators>
</test>
<test>
<description>setPolicy : illegal parameters - RS-10-4-64k</description>
<test-commands>
<command>-fs NAMENODE -mkdir /ecdir</command>
<ec-admin-command>-fs NAMENODE -setPolicy -policy RS-10-4-64k -path /ecdir</ec-admin-command>
</test-commands>
<cleanup-commands>
<command>-fs NAMENODE -rmdir /ecdir</command>
</cleanup-commands>
<comparators>
<comparator>
<type>SubstringComparator</type>
<expected-output>Policy 'RS-10-4-64k' does not match any enabled erasure coding policies</expected-output>
</comparator>
</comparators>
</test>