HDFS-11975. Provide a system-default EC policy. Contributed by Huichun Lu
This commit is contained in:
parent
ad2a350662
commit
a53b8b6fdc
|
@ -2515,8 +2515,6 @@ public class DistributedFileSystem extends FileSystem {
|
||||||
public void setErasureCodingPolicy(final Path path,
|
public void setErasureCodingPolicy(final Path path,
|
||||||
final String ecPolicyName) throws IOException {
|
final String ecPolicyName) throws IOException {
|
||||||
Path absF = fixRelativePart(path);
|
Path absF = fixRelativePart(path);
|
||||||
Preconditions.checkNotNull(ecPolicyName, "Erasure coding policy cannot be" +
|
|
||||||
" null.");
|
|
||||||
new FileSystemLinkResolver<Void>() {
|
new FileSystemLinkResolver<Void>() {
|
||||||
@Override
|
@Override
|
||||||
public Void doCall(final Path p) throws IOException {
|
public Void doCall(final Path p) throws IOException {
|
||||||
|
|
|
@ -1518,7 +1518,9 @@ public class ClientNamenodeProtocolTranslatorPB implements
|
||||||
final SetErasureCodingPolicyRequestProto.Builder builder =
|
final SetErasureCodingPolicyRequestProto.Builder builder =
|
||||||
SetErasureCodingPolicyRequestProto.newBuilder();
|
SetErasureCodingPolicyRequestProto.newBuilder();
|
||||||
builder.setSrc(src);
|
builder.setSrc(src);
|
||||||
builder.setEcPolicyName(ecPolicyName);
|
if (ecPolicyName != null) {
|
||||||
|
builder.setEcPolicyName(ecPolicyName);
|
||||||
|
}
|
||||||
SetErasureCodingPolicyRequestProto req = builder.build();
|
SetErasureCodingPolicyRequestProto req = builder.build();
|
||||||
try {
|
try {
|
||||||
rpcProxy.setErasureCodingPolicy(null, req);
|
rpcProxy.setErasureCodingPolicy(null, req);
|
||||||
|
|
|
@ -25,7 +25,7 @@ import "hdfs.proto";
|
||||||
|
|
||||||
message SetErasureCodingPolicyRequestProto {
|
message SetErasureCodingPolicyRequestProto {
|
||||||
required string src = 1;
|
required string src = 1;
|
||||||
required string ecPolicyName = 2;
|
optional string ecPolicyName = 2;
|
||||||
}
|
}
|
||||||
|
|
||||||
message SetErasureCodingPolicyResponseProto {
|
message SetErasureCodingPolicyResponseProto {
|
||||||
|
|
|
@ -564,6 +564,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
||||||
public static final String DFS_NAMENODE_EC_POLICIES_ENABLED_DEFAULT = "";
|
public static final String DFS_NAMENODE_EC_POLICIES_ENABLED_DEFAULT = "";
|
||||||
public static final String DFS_NAMENODE_EC_POLICIES_MAX_CELLSIZE_KEY = "dfs.namenode.ec.policies.max.cellsize";
|
public static final String DFS_NAMENODE_EC_POLICIES_MAX_CELLSIZE_KEY = "dfs.namenode.ec.policies.max.cellsize";
|
||||||
public static final int DFS_NAMENODE_EC_POLICIES_MAX_CELLSIZE_DEFAULT = 4 * 1024 * 1024;
|
public static final int DFS_NAMENODE_EC_POLICIES_MAX_CELLSIZE_DEFAULT = 4 * 1024 * 1024;
|
||||||
|
public static final String DFS_NAMENODE_EC_SYSTEM_DEFAULT_POLICY =
|
||||||
|
"dfs.namenode.ec.system.default.policy";
|
||||||
|
public static final String DFS_NAMENODE_EC_SYSTEM_DEFAULT_POLICY_DEFAULT =
|
||||||
|
"RS-6-3-64k";
|
||||||
public static final String DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_THREADS_KEY = "dfs.datanode.ec.reconstruction.stripedread.threads";
|
public static final String DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_THREADS_KEY = "dfs.datanode.ec.reconstruction.stripedread.threads";
|
||||||
public static final int DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_THREADS_DEFAULT = 20;
|
public static final int DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_THREADS_DEFAULT = 20;
|
||||||
public static final String DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_BUFFER_SIZE_KEY = "dfs.datanode.ec.reconstruction.stripedread.buffer.size";
|
public static final String DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_BUFFER_SIZE_KEY = "dfs.datanode.ec.reconstruction.stripedread.buffer.size";
|
||||||
|
|
|
@ -1488,7 +1488,9 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
|
||||||
RpcController controller, SetErasureCodingPolicyRequestProto req)
|
RpcController controller, SetErasureCodingPolicyRequestProto req)
|
||||||
throws ServiceException {
|
throws ServiceException {
|
||||||
try {
|
try {
|
||||||
server.setErasureCodingPolicy(req.getSrc(), req.getEcPolicyName());
|
String ecPolicyName = req.hasEcPolicyName() ?
|
||||||
|
req.getEcPolicyName() : null;
|
||||||
|
server.setErasureCodingPolicy(req.getSrc(), ecPolicyName);
|
||||||
return SetErasureCodingPolicyResponseProto.newBuilder().build();
|
return SetErasureCodingPolicyResponseProto.newBuilder().build();
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw new ServiceException(e);
|
throw new ServiceException(e);
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs.server.namenode;
|
package org.apache.hadoop.hdfs.server.namenode;
|
||||||
|
|
||||||
|
import org.apache.commons.lang.ArrayUtils;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
|
@ -92,9 +93,14 @@ public final class ErasureCodingPolicyManager {
|
||||||
|
|
||||||
public void init(Configuration conf) {
|
public void init(Configuration conf) {
|
||||||
// Populate the list of enabled policies from configuration
|
// Populate the list of enabled policies from configuration
|
||||||
final String[] policyNames = conf.getTrimmedStrings(
|
final String[] enablePolicyNames = conf.getTrimmedStrings(
|
||||||
DFSConfigKeys.DFS_NAMENODE_EC_POLICIES_ENABLED_KEY,
|
DFSConfigKeys.DFS_NAMENODE_EC_POLICIES_ENABLED_KEY,
|
||||||
DFSConfigKeys.DFS_NAMENODE_EC_POLICIES_ENABLED_DEFAULT);
|
DFSConfigKeys.DFS_NAMENODE_EC_POLICIES_ENABLED_DEFAULT);
|
||||||
|
final String defaultPolicyName = conf.getTrimmed(
|
||||||
|
DFSConfigKeys.DFS_NAMENODE_EC_SYSTEM_DEFAULT_POLICY,
|
||||||
|
DFSConfigKeys.DFS_NAMENODE_EC_SYSTEM_DEFAULT_POLICY_DEFAULT);
|
||||||
|
final String[] policyNames =
|
||||||
|
(String[]) ArrayUtils.add(enablePolicyNames, defaultPolicyName);
|
||||||
this.userPoliciesByID = new TreeMap<>();
|
this.userPoliciesByID = new TreeMap<>();
|
||||||
this.userPoliciesByName = new TreeMap<>();
|
this.userPoliciesByName = new TreeMap<>();
|
||||||
this.removedPoliciesByName = new TreeMap<>();
|
this.removedPoliciesByName = new TreeMap<>();
|
||||||
|
|
|
@ -251,13 +251,15 @@ public class NameNodeRpcServer implements NamenodeProtocols {
|
||||||
|
|
||||||
private final String minimumDataNodeVersion;
|
private final String minimumDataNodeVersion;
|
||||||
|
|
||||||
|
private final String defaultECPolicyName;
|
||||||
|
|
||||||
public NameNodeRpcServer(Configuration conf, NameNode nn)
|
public NameNodeRpcServer(Configuration conf, NameNode nn)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
this.nn = nn;
|
this.nn = nn;
|
||||||
this.namesystem = nn.getNamesystem();
|
this.namesystem = nn.getNamesystem();
|
||||||
this.retryCache = namesystem.getRetryCache();
|
this.retryCache = namesystem.getRetryCache();
|
||||||
this.metrics = NameNode.getNameNodeMetrics();
|
this.metrics = NameNode.getNameNodeMetrics();
|
||||||
|
|
||||||
int handlerCount =
|
int handlerCount =
|
||||||
conf.getInt(DFS_NAMENODE_HANDLER_COUNT_KEY,
|
conf.getInt(DFS_NAMENODE_HANDLER_COUNT_KEY,
|
||||||
DFS_NAMENODE_HANDLER_COUNT_DEFAULT);
|
DFS_NAMENODE_HANDLER_COUNT_DEFAULT);
|
||||||
|
@ -490,6 +492,10 @@ public class NameNodeRpcServer implements NamenodeProtocols {
|
||||||
DFSConfigKeys.DFS_NAMENODE_MIN_SUPPORTED_DATANODE_VERSION_KEY,
|
DFSConfigKeys.DFS_NAMENODE_MIN_SUPPORTED_DATANODE_VERSION_KEY,
|
||||||
DFSConfigKeys.DFS_NAMENODE_MIN_SUPPORTED_DATANODE_VERSION_DEFAULT);
|
DFSConfigKeys.DFS_NAMENODE_MIN_SUPPORTED_DATANODE_VERSION_DEFAULT);
|
||||||
|
|
||||||
|
defaultECPolicyName = conf.get(
|
||||||
|
DFSConfigKeys.DFS_NAMENODE_EC_SYSTEM_DEFAULT_POLICY,
|
||||||
|
DFSConfigKeys.DFS_NAMENODE_EC_SYSTEM_DEFAULT_POLICY_DEFAULT);
|
||||||
|
|
||||||
// Set terse exception whose stack trace won't be logged
|
// Set terse exception whose stack trace won't be logged
|
||||||
clientRpcServer.addTerseExceptions(SafeModeException.class,
|
clientRpcServer.addTerseExceptions(SafeModeException.class,
|
||||||
FileNotFoundException.class,
|
FileNotFoundException.class,
|
||||||
|
@ -2055,6 +2061,12 @@ public class NameNodeRpcServer implements NamenodeProtocols {
|
||||||
}
|
}
|
||||||
boolean success = false;
|
boolean success = false;
|
||||||
try {
|
try {
|
||||||
|
if (ecPolicyName == null) {
|
||||||
|
ecPolicyName = defaultECPolicyName;
|
||||||
|
LOG.trace("No policy name is specified, " +
|
||||||
|
"set the default policy name instead");
|
||||||
|
}
|
||||||
|
LOG.trace("Set erasure coding policy " + ecPolicyName + " on " + src);
|
||||||
namesystem.setErasureCodingPolicy(src, ecPolicyName, cacheEntry != null);
|
namesystem.setErasureCodingPolicy(src, ecPolicyName, cacheEntry != null);
|
||||||
success = true;
|
success = true;
|
||||||
} finally {
|
} finally {
|
||||||
|
|
|
@ -335,11 +335,6 @@ public class ECAdmin extends Configured implements Tool {
|
||||||
|
|
||||||
final String ecPolicyName = StringUtils.popOptionWithArgument("-policy",
|
final String ecPolicyName = StringUtils.popOptionWithArgument("-policy",
|
||||||
args);
|
args);
|
||||||
if (ecPolicyName == null) {
|
|
||||||
System.err.println("Please specify the policy name.\nUsage: " +
|
|
||||||
getLongUsage());
|
|
||||||
return 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (args.size() > 0) {
|
if (args.size() > 0) {
|
||||||
System.err.println(getName() + ": Too many arguments");
|
System.err.println(getName() + ": Too many arguments");
|
||||||
|
@ -350,8 +345,13 @@ public class ECAdmin extends Configured implements Tool {
|
||||||
final DistributedFileSystem dfs = AdminHelper.getDFS(p.toUri(), conf);
|
final DistributedFileSystem dfs = AdminHelper.getDFS(p.toUri(), conf);
|
||||||
try {
|
try {
|
||||||
dfs.setErasureCodingPolicy(p, ecPolicyName);
|
dfs.setErasureCodingPolicy(p, ecPolicyName);
|
||||||
System.out.println("Set erasure coding policy " + ecPolicyName +
|
if (ecPolicyName == null){
|
||||||
" on " + path);
|
System.out.println("Set default erasure coding policy" +
|
||||||
|
" on " + path);
|
||||||
|
} else {
|
||||||
|
System.out.println("Set erasure coding policy " + ecPolicyName +
|
||||||
|
" on " + path);
|
||||||
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
System.err.println(AdminHelper.prettifyException(e));
|
System.err.println(AdminHelper.prettifyException(e));
|
||||||
return 2;
|
return 2;
|
||||||
|
|
|
@ -2975,6 +2975,14 @@
|
||||||
</description>
|
</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>dfs.namenode.ec.system.default.policy</name>
|
||||||
|
<value>RS-6-3-64k</value>
|
||||||
|
<description>The default erasure coding policy name will be used
|
||||||
|
on the path if no policy name is passed.
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<name>dfs.namenode.ec.policies.max.cellsize</name>
|
<name>dfs.namenode.ec.policies.max.cellsize</name>
|
||||||
<value>4194304</value>
|
<value>4194304</value>
|
||||||
|
|
|
@ -117,6 +117,11 @@ Deployment
|
||||||
be more appropriate. If the administrator only cares about node-level fault-tolerance, `RS-10-4-64k` would still be appropriate as long as
|
be more appropriate. If the administrator only cares about node-level fault-tolerance, `RS-10-4-64k` would still be appropriate as long as
|
||||||
there are at least 14 DataNodes in the cluster.
|
there are at least 14 DataNodes in the cluster.
|
||||||
|
|
||||||
|
A system default EC policy can be configured via 'dfs.namenode.ec.system.default.policy' configuration. With this configuration,
|
||||||
|
the default EC policy will be used when no policy name is passed as an argument in the '-setPolicy' command.
|
||||||
|
|
||||||
|
By default, the 'dfs.namenode.ec.system.default.policy' is "RS-6-3-64k".
|
||||||
|
|
||||||
The codec implementations for Reed-Solomon and XOR can be configured with the following client and DataNode configuration keys:
|
The codec implementations for Reed-Solomon and XOR can be configured with the following client and DataNode configuration keys:
|
||||||
`io.erasurecode.codec.rs.rawcoders` for the default RS codec,
|
`io.erasurecode.codec.rs.rawcoders` for the default RS codec,
|
||||||
`io.erasurecode.codec.rs-legacy.rawcoders` for the legacy RS codec,
|
`io.erasurecode.codec.rs-legacy.rawcoders` for the legacy RS codec,
|
||||||
|
@ -167,6 +172,9 @@ Below are the details about each command.
|
||||||
`path`: An directory in HDFS. This is a mandatory parameter. Setting a policy only affects newly created files, and does not affect existing files.
|
`path`: An directory in HDFS. This is a mandatory parameter. Setting a policy only affects newly created files, and does not affect existing files.
|
||||||
|
|
||||||
`policyName`: The erasure coding policy to be used for files under this directory.
|
`policyName`: The erasure coding policy to be used for files under this directory.
|
||||||
|
This parameter can be omitted if a 'dfs.namenode.ec.system.default.policy' configuration is set.
|
||||||
|
The EC policy of the path will be set with the default value in configuration.
|
||||||
|
|
||||||
|
|
||||||
* `[-getPolicy -path <path>]`
|
* `[-getPolicy -path <path>]`
|
||||||
|
|
||||||
|
|
|
@ -209,9 +209,9 @@ public class TestErasureCodingPolicies {
|
||||||
cluster.restartNameNodes();
|
cluster.restartNameNodes();
|
||||||
cluster.waitActive();
|
cluster.waitActive();
|
||||||
|
|
||||||
// No policies should be enabled after restart
|
// Only default policy should be enabled after restart
|
||||||
Assert.assertTrue("No policies should be enabled after restart",
|
Assert.assertEquals("Only default policy should be enabled after restart",
|
||||||
fs.getAllErasureCodingPolicies().isEmpty());
|
1, fs.getAllErasureCodingPolicies().size());
|
||||||
|
|
||||||
// Already set directory-level policies should still be in effect
|
// Already set directory-level policies should still be in effect
|
||||||
Path disabledPolicy = new Path(dir1, "afterDisabled");
|
Path disabledPolicy = new Path(dir1, "afterDisabled");
|
||||||
|
@ -359,6 +359,24 @@ public class TestErasureCodingPolicies {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSetDefaultPolicy()
|
||||||
|
throws IOException {
|
||||||
|
String src = "/ecDir";
|
||||||
|
final Path ecDir = new Path(src);
|
||||||
|
try {
|
||||||
|
fs.mkdir(ecDir, FsPermission.getDirDefault());
|
||||||
|
fs.getClient().setErasureCodingPolicy(src, null);
|
||||||
|
String actualECPolicyName = fs.getClient().
|
||||||
|
getErasureCodingPolicy(src).getName();
|
||||||
|
String expectedECPolicyName =
|
||||||
|
conf.get(DFSConfigKeys.DFS_NAMENODE_EC_SYSTEM_DEFAULT_POLICY,
|
||||||
|
DFSConfigKeys.DFS_NAMENODE_EC_SYSTEM_DEFAULT_POLICY_DEFAULT);
|
||||||
|
assertEquals(expectedECPolicyName, actualECPolicyName);
|
||||||
|
} catch (Exception e) {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testGetAllErasureCodingPolicies() throws Exception {
|
public void testGetAllErasureCodingPolicies() throws Exception {
|
||||||
Collection<ErasureCodingPolicy> allECPolicies = fs
|
Collection<ErasureCodingPolicy> allECPolicies = fs
|
||||||
|
|
|
@ -75,7 +75,7 @@ public class TestEnabledECPolicies {
|
||||||
String defaultECPolicies = conf.get(
|
String defaultECPolicies = conf.get(
|
||||||
DFSConfigKeys.DFS_NAMENODE_EC_POLICIES_ENABLED_KEY,
|
DFSConfigKeys.DFS_NAMENODE_EC_POLICIES_ENABLED_KEY,
|
||||||
DFSConfigKeys.DFS_NAMENODE_EC_POLICIES_ENABLED_DEFAULT);
|
DFSConfigKeys.DFS_NAMENODE_EC_POLICIES_ENABLED_DEFAULT);
|
||||||
expectValidPolicy(defaultECPolicies, 0);
|
expectValidPolicy(defaultECPolicies, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -98,10 +98,10 @@ public class TestEnabledECPolicies {
|
||||||
String ecPolicyName = StripedFileTestUtil.getDefaultECPolicy().getName();
|
String ecPolicyName = StripedFileTestUtil.getDefaultECPolicy().getName();
|
||||||
expectValidPolicy(ecPolicyName, 1);
|
expectValidPolicy(ecPolicyName, 1);
|
||||||
expectValidPolicy(ecPolicyName + ", ", 1);
|
expectValidPolicy(ecPolicyName + ", ", 1);
|
||||||
expectValidPolicy(",", 0);
|
expectValidPolicy(",", 1);
|
||||||
expectValidPolicy(", " + ecPolicyName, 1);
|
expectValidPolicy(", " + ecPolicyName, 1);
|
||||||
expectValidPolicy(" ", 0);
|
expectValidPolicy(" ", 1);
|
||||||
expectValidPolicy(" , ", 0);
|
expectValidPolicy(" , ", 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -147,7 +147,7 @@ public class TestEnabledECPolicies {
|
||||||
Assert.assertTrue("Did not find specified EC policy " + p.getName(),
|
Assert.assertTrue("Did not find specified EC policy " + p.getName(),
|
||||||
found.contains(p.getName()));
|
found.contains(p.getName()));
|
||||||
}
|
}
|
||||||
Assert.assertEquals(enabledPolicies.length, found.size());
|
Assert.assertEquals(enabledPolicies.length, found.size()-1);
|
||||||
// Check that getEnabledPolicyByName only returns enabled policies
|
// Check that getEnabledPolicyByName only returns enabled policies
|
||||||
for (ErasureCodingPolicy p: SystemErasureCodingPolicies.getPolicies()) {
|
for (ErasureCodingPolicy p: SystemErasureCodingPolicies.getPolicies()) {
|
||||||
if (found.contains(p.getName())) {
|
if (found.contains(p.getName())) {
|
||||||
|
|
|
@ -552,6 +552,41 @@
|
||||||
</comparators>
|
</comparators>
|
||||||
</test>
|
</test>
|
||||||
|
|
||||||
|
<test>
|
||||||
|
<description>setPolicy : set erasure coding policy without given a specific policy name</description>
|
||||||
|
<test-commands>
|
||||||
|
<command>-fs NAMENODE -mkdir /ecdir</command>
|
||||||
|
<ec-admin-command>-fs NAMENODE -setPolicy -path /ecdir</ec-admin-command>
|
||||||
|
</test-commands>
|
||||||
|
<cleanup-commands>
|
||||||
|
<command>-fs NAMENODE -rmdir /ecdir</command>
|
||||||
|
</cleanup-commands>
|
||||||
|
<comparators>
|
||||||
|
<comparator>
|
||||||
|
<type>SubstringComparator</type>
|
||||||
|
<expected-output>Set default erasure coding policy on /ecdir</expected-output>
|
||||||
|
</comparator>
|
||||||
|
</comparators>
|
||||||
|
</test>
|
||||||
|
|
||||||
|
<test>
|
||||||
|
<description>getPolicy: get the default policy after setPolicy without given a specific policy name</description>
|
||||||
|
<test-commands>
|
||||||
|
<command>-fs NAMENODE -mkdir /ecdir</command>
|
||||||
|
<ec-admin-command>-fs NAMENODE -setPolicy -path /ecdir</ec-admin-command>
|
||||||
|
<ec-admin-command>-fs NAMENODE -getPolicy -path /ecdir</ec-admin-command>
|
||||||
|
</test-commands>
|
||||||
|
<cleanup-commands>
|
||||||
|
<command>-fs NAMENODE -rmdir /ecdir</command>
|
||||||
|
</cleanup-commands>
|
||||||
|
<comparators>
|
||||||
|
<comparator>
|
||||||
|
<type>SubstringComparator</type>
|
||||||
|
<expected-output>RS-6-3-64k</expected-output>
|
||||||
|
</comparator>
|
||||||
|
</comparators>
|
||||||
|
</test>
|
||||||
|
|
||||||
<test>
|
<test>
|
||||||
<description>getPolicy : illegal parameters - path is missing</description>
|
<description>getPolicy : illegal parameters - path is missing</description>
|
||||||
<test-commands>
|
<test-commands>
|
||||||
|
|
Loading…
Reference in New Issue