HDFS-11793. Allow to enable user defined erasure coding policy. Contributed by Sammi Chen
This commit is contained in:
parent
52661e0912
commit
a62be38a5e
|
@ -127,28 +127,20 @@ public final class CodecRegistry {
|
||||||
/**
|
/**
|
||||||
* Get all coder names of the given codec.
|
* Get all coder names of the given codec.
|
||||||
* @param codecName the name of codec
|
* @param codecName the name of codec
|
||||||
* @return an array of all coder names
|
* @return an array of all coder names, null if not exist
|
||||||
*/
|
*/
|
||||||
public String[] getCoderNames(String codecName) {
|
public String[] getCoderNames(String codecName) {
|
||||||
String[] coderNames = coderNameMap.get(codecName);
|
String[] coderNames = coderNameMap.get(codecName);
|
||||||
if (coderNames == null) {
|
|
||||||
throw new IllegalArgumentException("No available raw coder factory for "
|
|
||||||
+ codecName);
|
|
||||||
}
|
|
||||||
return coderNames;
|
return coderNames;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get all coder factories of the given codec.
|
* Get all coder factories of the given codec.
|
||||||
* @param codecName the name of codec
|
* @param codecName the name of codec
|
||||||
* @return a list of all coder factories
|
* @return a list of all coder factories, null if not exist
|
||||||
*/
|
*/
|
||||||
public List<RawErasureCoderFactory> getCoders(String codecName) {
|
public List<RawErasureCoderFactory> getCoders(String codecName) {
|
||||||
List<RawErasureCoderFactory> coders = coderMap.get(codecName);
|
List<RawErasureCoderFactory> coders = coderMap.get(codecName);
|
||||||
if (coders == null) {
|
|
||||||
throw new IllegalArgumentException("No available raw coder factory for "
|
|
||||||
+ codecName);
|
|
||||||
}
|
|
||||||
return coders;
|
return coders;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -164,7 +156,7 @@ public final class CodecRegistry {
|
||||||
* Get a specific coder factory defined by codec name and coder name.
|
* Get a specific coder factory defined by codec name and coder name.
|
||||||
* @param codecName name of the codec
|
* @param codecName name of the codec
|
||||||
* @param coderName name of the coder
|
* @param coderName name of the coder
|
||||||
* @return the specific coder
|
* @return the specific coder, null if not exist
|
||||||
*/
|
*/
|
||||||
public RawErasureCoderFactory getCoderByName(
|
public RawErasureCoderFactory getCoderByName(
|
||||||
String codecName, String coderName) {
|
String codecName, String coderName) {
|
||||||
|
@ -176,10 +168,7 @@ public final class CodecRegistry {
|
||||||
return coder;
|
return coder;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return null;
|
||||||
// if not found, throw exception
|
|
||||||
throw new IllegalArgumentException("No implementation for coder "
|
|
||||||
+ coderName + " of codec " + codecName);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -153,6 +153,10 @@ public final class CodecUtil {
|
||||||
return fact;
|
return fact;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static boolean hasCodec(String codecName) {
|
||||||
|
return (CodecRegistry.getInstance().getCoderNames(codecName) != null);
|
||||||
|
}
|
||||||
|
|
||||||
// Return a list of coder names
|
// Return a list of coder names
|
||||||
private static String[] getRawCoderNames(
|
private static String[] getRawCoderNames(
|
||||||
Configuration conf, String codecName) {
|
Configuration conf, String codecName) {
|
||||||
|
|
|
@ -32,6 +32,7 @@ import java.util.List;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertNull;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -67,10 +68,11 @@ public class TestCodecRegistry {
|
||||||
assertTrue(coders.get(1) instanceof XORRawErasureCoderFactory);
|
assertTrue(coders.get(1) instanceof XORRawErasureCoderFactory);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(expected = IllegalArgumentException.class)
|
@Test
|
||||||
public void testGetCodersWrong() {
|
public void testGetCodersWrong() {
|
||||||
List<RawErasureCoderFactory> coders =
|
List<RawErasureCoderFactory> coders =
|
||||||
CodecRegistry.getInstance().getCoders("WRONG_CODEC");
|
CodecRegistry.getInstance().getCoders("WRONG_CODEC");
|
||||||
|
assertNull(coders);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -123,10 +125,11 @@ public class TestCodecRegistry {
|
||||||
assertTrue(coder instanceof NativeXORRawErasureCoderFactory);
|
assertTrue(coder instanceof NativeXORRawErasureCoderFactory);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(expected = IllegalArgumentException.class)
|
@Test
|
||||||
public void testGetCoderByNameWrong() {
|
public void testGetCoderByNameWrong() {
|
||||||
RawErasureCoderFactory coder = CodecRegistry.getInstance().
|
RawErasureCoderFactory coder = CodecRegistry.getInstance().
|
||||||
getCoderByName(ErasureCodeConstants.RS_CODEC_NAME, "WRONG_RS");
|
getCoderByName(ErasureCodeConstants.RS_CODEC_NAME, "WRONG_RS");
|
||||||
|
assertNull(coder);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
|
@ -101,7 +101,7 @@ import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
|
||||||
import org.apache.hadoop.hdfs.client.impl.LeaseRenewer;
|
import org.apache.hadoop.hdfs.client.impl.LeaseRenewer;
|
||||||
import org.apache.hadoop.hdfs.net.Peer;
|
import org.apache.hadoop.hdfs.net.Peer;
|
||||||
import org.apache.hadoop.hdfs.protocol.AclException;
|
import org.apache.hadoop.hdfs.protocol.AclException;
|
||||||
import org.apache.hadoop.hdfs.protocol.AddingECPolicyResponse;
|
import org.apache.hadoop.hdfs.protocol.AddECPolicyResponse;
|
||||||
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
||||||
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
|
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
|
||||||
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
|
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
|
||||||
|
@ -2770,7 +2770,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public AddingECPolicyResponse[] addErasureCodingPolicies(
|
public AddECPolicyResponse[] addErasureCodingPolicies(
|
||||||
ErasureCodingPolicy[] policies) throws IOException {
|
ErasureCodingPolicy[] policies) throws IOException {
|
||||||
checkOpen();
|
checkOpen();
|
||||||
return namenode.addErasureCodingPolicies(policies);
|
return namenode.addErasureCodingPolicies(policies);
|
||||||
|
|
|
@ -71,7 +71,7 @@ import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
|
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
|
||||||
import org.apache.hadoop.hdfs.client.impl.CorruptFileBlockIterator;
|
import org.apache.hadoop.hdfs.client.impl.CorruptFileBlockIterator;
|
||||||
import org.apache.hadoop.hdfs.DFSOpsCountStatistics.OpType;
|
import org.apache.hadoop.hdfs.DFSOpsCountStatistics.OpType;
|
||||||
import org.apache.hadoop.hdfs.protocol.AddingECPolicyResponse;
|
import org.apache.hadoop.hdfs.protocol.AddECPolicyResponse;
|
||||||
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
||||||
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
|
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
|
||||||
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
|
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
|
||||||
|
@ -2549,13 +2549,16 @@ public class DistributedFileSystem extends FileSystem {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Add Erasure coding policies to HDFS.
|
* Add Erasure coding policies to HDFS. For each policy input, schema and
|
||||||
|
* cellSize are musts, name and id are ignored. They will be automatically
|
||||||
|
* created and assigned by Namenode once the policy is successfully added, and
|
||||||
|
* will be returned in the response.
|
||||||
*
|
*
|
||||||
* @param policies The user defined ec policy list to add.
|
* @param policies The user defined ec policy list to add.
|
||||||
* @return Return the response list of adding operations.
|
* @return Return the response list of adding operations.
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public AddingECPolicyResponse[] addErasureCodingPolicies(
|
public AddECPolicyResponse[] addErasureCodingPolicies(
|
||||||
ErasureCodingPolicy[] policies) throws IOException {
|
ErasureCodingPolicy[] policies) throws IOException {
|
||||||
return dfs.addErasureCodingPolicies(policies);
|
return dfs.addErasureCodingPolicies(policies);
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,26 +18,26 @@
|
||||||
package org.apache.hadoop.hdfs.protocol;
|
package org.apache.hadoop.hdfs.protocol;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A response of adding an ErasureCoding policy.
|
* A response of add an ErasureCoding policy.
|
||||||
*/
|
*/
|
||||||
public class AddingECPolicyResponse {
|
public class AddECPolicyResponse {
|
||||||
private boolean succeed;
|
private boolean succeed;
|
||||||
private ErasureCodingPolicy policy;
|
private ErasureCodingPolicy policy;
|
||||||
private String errorMsg;
|
private String errorMsg;
|
||||||
|
|
||||||
public AddingECPolicyResponse(ErasureCodingPolicy policy) {
|
public AddECPolicyResponse(ErasureCodingPolicy policy) {
|
||||||
this.policy = policy;
|
this.policy = policy;
|
||||||
this.succeed = true;
|
this.succeed = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
public AddingECPolicyResponse(ErasureCodingPolicy policy,
|
public AddECPolicyResponse(ErasureCodingPolicy policy,
|
||||||
String errorMsg) {
|
String errorMsg) {
|
||||||
this.policy = policy;
|
this.policy = policy;
|
||||||
this.errorMsg = errorMsg;
|
this.errorMsg = errorMsg;
|
||||||
this.succeed = false;
|
this.succeed = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
public AddingECPolicyResponse(ErasureCodingPolicy policy,
|
public AddECPolicyResponse(ErasureCodingPolicy policy,
|
||||||
IllegalECPolicyException e) {
|
IllegalECPolicyException e) {
|
||||||
this(policy, e.getMessage());
|
this(policy, e.getMessage());
|
||||||
}
|
}
|
|
@ -1526,14 +1526,17 @@ public interface ClientProtocol {
|
||||||
throws IOException;
|
throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Add Erasure coding policies.
|
* Add Erasure coding policies to HDFS. For each policy input, schema and
|
||||||
|
* cellSize are musts, name and id are ignored. They will be automatically
|
||||||
|
* created and assigned by Namenode once the policy is successfully added, and
|
||||||
|
* will be returned in the response.
|
||||||
*
|
*
|
||||||
* @param policies The user defined ec policy list to add.
|
* @param policies The user defined ec policy list to add.
|
||||||
* @return Return the response list of adding operations.
|
* @return Return the response list of adding operations.
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
@AtMostOnce
|
@AtMostOnce
|
||||||
AddingECPolicyResponse[] addErasureCodingPolicies(
|
AddECPolicyResponse[] addErasureCodingPolicies(
|
||||||
ErasureCodingPolicy[] policies) throws IOException;
|
ErasureCodingPolicy[] policies) throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -41,6 +41,8 @@ public final class ErasureCodingPolicy {
|
||||||
Preconditions.checkNotNull(name);
|
Preconditions.checkNotNull(name);
|
||||||
Preconditions.checkNotNull(schema);
|
Preconditions.checkNotNull(schema);
|
||||||
Preconditions.checkArgument(cellSize > 0, "cellSize must be positive");
|
Preconditions.checkArgument(cellSize > 0, "cellSize must be positive");
|
||||||
|
Preconditions.checkArgument(cellSize % 1024 == 0,
|
||||||
|
"cellSize must be 1024 aligned");
|
||||||
this.name = name;
|
this.name = name;
|
||||||
this.schema = schema;
|
this.schema = schema;
|
||||||
this.cellSize = cellSize;
|
this.cellSize = cellSize;
|
||||||
|
@ -51,8 +53,13 @@ public final class ErasureCodingPolicy {
|
||||||
this(composePolicyName(schema, cellSize), schema, cellSize, id);
|
this(composePolicyName(schema, cellSize), schema, cellSize, id);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public ErasureCodingPolicy(ECSchema schema, int cellSize) {
|
||||||
|
this(composePolicyName(schema, cellSize), schema, cellSize, (byte) -1);
|
||||||
|
}
|
||||||
|
|
||||||
public static String composePolicyName(ECSchema schema, int cellSize) {
|
public static String composePolicyName(ECSchema schema, int cellSize) {
|
||||||
assert cellSize % 1024 == 0;
|
Preconditions.checkArgument(cellSize % 1024 == 0,
|
||||||
|
"cellSize must be 1024 aligned");
|
||||||
return schema.getCodecName().toUpperCase() + "-" +
|
return schema.getCodecName().toUpperCase() + "-" +
|
||||||
schema.getNumDataUnits() + "-" + schema.getNumParityUnits() +
|
schema.getNumDataUnits() + "-" + schema.getNumParityUnits() +
|
||||||
"-" + cellSize / 1024 + "k";
|
"-" + cellSize / 1024 + "k";
|
||||||
|
|
|
@ -49,7 +49,7 @@ import org.apache.hadoop.fs.permission.FsAction;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.hdfs.AddBlockFlag;
|
import org.apache.hadoop.hdfs.AddBlockFlag;
|
||||||
import org.apache.hadoop.hdfs.inotify.EventBatchList;
|
import org.apache.hadoop.hdfs.inotify.EventBatchList;
|
||||||
import org.apache.hadoop.hdfs.protocol.AddingECPolicyResponse;
|
import org.apache.hadoop.hdfs.protocol.AddECPolicyResponse;
|
||||||
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
||||||
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
|
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
|
||||||
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
|
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
|
||||||
|
@ -1637,7 +1637,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public AddingECPolicyResponse[] addErasureCodingPolicies(
|
public AddECPolicyResponse[] addErasureCodingPolicies(
|
||||||
ErasureCodingPolicy[] policies) throws IOException {
|
ErasureCodingPolicy[] policies) throws IOException {
|
||||||
List<ErasureCodingPolicyProto> protos = Arrays.stream(policies)
|
List<ErasureCodingPolicyProto> protos = Arrays.stream(policies)
|
||||||
.map(PBHelperClient::convertErasureCodingPolicy)
|
.map(PBHelperClient::convertErasureCodingPolicy)
|
||||||
|
@ -1648,9 +1648,9 @@ public class ClientNamenodeProtocolTranslatorPB implements
|
||||||
try {
|
try {
|
||||||
AddErasureCodingPoliciesResponseProto rep = rpcProxy
|
AddErasureCodingPoliciesResponseProto rep = rpcProxy
|
||||||
.addErasureCodingPolicies(null, req);
|
.addErasureCodingPolicies(null, req);
|
||||||
AddingECPolicyResponse[] responses = rep.getResponsesList().stream()
|
AddECPolicyResponse[] responses = rep.getResponsesList().stream()
|
||||||
.map(PBHelperClient::convertAddingECPolicyResponse)
|
.map(PBHelperClient::convertAddECPolicyResponse)
|
||||||
.toArray(AddingECPolicyResponse[]::new);
|
.toArray(AddECPolicyResponse[]::new);
|
||||||
return responses;
|
return responses;
|
||||||
} catch (ServiceException e) {
|
} catch (ServiceException e) {
|
||||||
throw ProtobufHelper.getRemoteException(e);
|
throw ProtobufHelper.getRemoteException(e);
|
||||||
|
|
|
@ -58,7 +58,7 @@ import org.apache.hadoop.hdfs.DFSUtilClient;
|
||||||
import org.apache.hadoop.hdfs.inotify.Event;
|
import org.apache.hadoop.hdfs.inotify.Event;
|
||||||
import org.apache.hadoop.hdfs.inotify.EventBatch;
|
import org.apache.hadoop.hdfs.inotify.EventBatch;
|
||||||
import org.apache.hadoop.hdfs.inotify.EventBatchList;
|
import org.apache.hadoop.hdfs.inotify.EventBatchList;
|
||||||
import org.apache.hadoop.hdfs.protocol.AddingECPolicyResponse;
|
import org.apache.hadoop.hdfs.protocol.AddECPolicyResponse;
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
||||||
import org.apache.hadoop.hdfs.protocol.BlockType;
|
import org.apache.hadoop.hdfs.protocol.BlockType;
|
||||||
|
@ -124,7 +124,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmS
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.EncryptionZoneProto;
|
import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.EncryptionZoneProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
|
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.AccessModeProto;
|
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.AccessModeProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.AddingECPolicyResponseProto;
|
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.AddECPolicyResponseProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
|
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockStoragePolicyProto;
|
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockStoragePolicyProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockTypeProto;
|
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockTypeProto;
|
||||||
|
@ -2709,10 +2709,10 @@ public class PBHelperClient {
|
||||||
return builder.build();
|
return builder.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
public static AddingECPolicyResponseProto convertAddingECPolicyResponse(
|
public static AddECPolicyResponseProto convertAddECPolicyResponse(
|
||||||
AddingECPolicyResponse response) {
|
AddECPolicyResponse response) {
|
||||||
AddingECPolicyResponseProto.Builder builder =
|
AddECPolicyResponseProto.Builder builder =
|
||||||
AddingECPolicyResponseProto.newBuilder()
|
AddECPolicyResponseProto.newBuilder()
|
||||||
.setPolicy(convertErasureCodingPolicy(response.getPolicy()))
|
.setPolicy(convertErasureCodingPolicy(response.getPolicy()))
|
||||||
.setSucceed(response.isSucceed());
|
.setSucceed(response.isSucceed());
|
||||||
if (!response.isSucceed()) {
|
if (!response.isSucceed()) {
|
||||||
|
@ -2721,13 +2721,13 @@ public class PBHelperClient {
|
||||||
return builder.build();
|
return builder.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
public static AddingECPolicyResponse convertAddingECPolicyResponse(
|
public static AddECPolicyResponse convertAddECPolicyResponse(
|
||||||
AddingECPolicyResponseProto proto) {
|
AddECPolicyResponseProto proto) {
|
||||||
ErasureCodingPolicy policy = convertErasureCodingPolicy(proto.getPolicy());
|
ErasureCodingPolicy policy = convertErasureCodingPolicy(proto.getPolicy());
|
||||||
if (proto.getSucceed()) {
|
if (proto.getSucceed()) {
|
||||||
return new AddingECPolicyResponse(policy);
|
return new AddECPolicyResponse(policy);
|
||||||
} else {
|
} else {
|
||||||
return new AddingECPolicyResponse(policy, proto.getErrorMsg());
|
return new AddECPolicyResponse(policy, proto.getErrorMsg());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -316,7 +316,7 @@ public class ECPolicyLoader {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (schema != null && cellSize > 0) {
|
if (schema != null && cellSize > 0) {
|
||||||
return new ErasureCodingPolicy(schema, cellSize, (byte) -1);
|
return new ErasureCodingPolicy(schema, cellSize);
|
||||||
} else {
|
} else {
|
||||||
throw new RuntimeException("Bad policy is found in"
|
throw new RuntimeException("Bad policy is found in"
|
||||||
+ " EC policy configuration file");
|
+ " EC policy configuration file");
|
||||||
|
|
|
@ -58,7 +58,7 @@ message AddErasureCodingPoliciesRequestProto {
|
||||||
}
|
}
|
||||||
|
|
||||||
message AddErasureCodingPoliciesResponseProto {
|
message AddErasureCodingPoliciesResponseProto {
|
||||||
repeated AddingECPolicyResponseProto responses = 1;
|
repeated AddECPolicyResponseProto responses = 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
message UnsetErasureCodingPolicyRequestProto {
|
message UnsetErasureCodingPolicyRequestProto {
|
||||||
|
|
|
@ -371,7 +371,7 @@ message ErasureCodingPolicyProto {
|
||||||
required uint32 id = 4; // Actually a byte - only 8 bits used
|
required uint32 id = 4; // Actually a byte - only 8 bits used
|
||||||
}
|
}
|
||||||
|
|
||||||
message AddingECPolicyResponseProto {
|
message AddECPolicyResponseProto {
|
||||||
required ErasureCodingPolicyProto policy = 1;
|
required ErasureCodingPolicyProto policy = 1;
|
||||||
required bool succeed = 2;
|
required bool succeed = 2;
|
||||||
optional string errorMsg = 3;
|
optional string errorMsg = 3;
|
||||||
|
|
|
@ -56,11 +56,11 @@ public class TestErasureCodingPolicy {
|
||||||
@Test
|
@Test
|
||||||
public void testEqualsAndHashCode() {
|
public void testEqualsAndHashCode() {
|
||||||
ErasureCodingPolicy[] policies = new ErasureCodingPolicy[]{
|
ErasureCodingPolicy[] policies = new ErasureCodingPolicy[]{
|
||||||
new ErasureCodingPolicy("one", SCHEMA_1, 321, (byte) 1),
|
new ErasureCodingPolicy("one", SCHEMA_1, 1024, (byte) 1),
|
||||||
new ErasureCodingPolicy("two", SCHEMA_1, 321, (byte) 1),
|
new ErasureCodingPolicy("two", SCHEMA_1, 1024, (byte) 1),
|
||||||
new ErasureCodingPolicy("one", SCHEMA_2, 321, (byte) 1),
|
new ErasureCodingPolicy("one", SCHEMA_2, 1024, (byte) 1),
|
||||||
new ErasureCodingPolicy("one", SCHEMA_1, 123, (byte) 1),
|
new ErasureCodingPolicy("one", SCHEMA_1, 2048, (byte) 1),
|
||||||
new ErasureCodingPolicy("one", SCHEMA_1, 321, (byte) 3),
|
new ErasureCodingPolicy("one", SCHEMA_1, 1024, (byte) 3),
|
||||||
};
|
};
|
||||||
|
|
||||||
for (int i = 0; i < policies.length; i++) {
|
for (int i = 0; i < policies.length; i++) {
|
||||||
|
|
|
@ -61,6 +61,7 @@
|
||||||
<!-- schema is referred by its id -->
|
<!-- schema is referred by its id -->
|
||||||
<schema>XORk2m1</schema>
|
<schema>XORk2m1</schema>
|
||||||
<!-- cellsize must be an positive integer multiple of 1024(1k) -->
|
<!-- cellsize must be an positive integer multiple of 1024(1k) -->
|
||||||
|
<!-- maximum cellsize is defined by 'dfs.namenode.ec.policies.max.cellsize' property -->
|
||||||
<cellsize>131072</cellsize>
|
<cellsize>131072</cellsize>
|
||||||
</policy>
|
</policy>
|
||||||
<policy>
|
<policy>
|
||||||
|
|
|
@ -570,6 +570,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
||||||
|
|
||||||
public static final String DFS_NAMENODE_EC_POLICIES_ENABLED_KEY = "dfs.namenode.ec.policies.enabled";
|
public static final String DFS_NAMENODE_EC_POLICIES_ENABLED_KEY = "dfs.namenode.ec.policies.enabled";
|
||||||
public static final String DFS_NAMENODE_EC_POLICIES_ENABLED_DEFAULT = "";
|
public static final String DFS_NAMENODE_EC_POLICIES_ENABLED_DEFAULT = "";
|
||||||
|
public static final String DFS_NAMENODE_EC_POLICIES_MAX_CELLSIZE_KEY = "dfs.namenode.ec.policies.max.cellsize";
|
||||||
|
public static final int DFS_NAMENODE_EC_POLICIES_MAX_CELLSIZE_DEFAULT = 4 * 1024 * 1024;
|
||||||
public static final String DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_THREADS_KEY = "dfs.datanode.ec.reconstruction.stripedread.threads";
|
public static final String DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_THREADS_KEY = "dfs.datanode.ec.reconstruction.stripedread.threads";
|
||||||
public static final int DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_THREADS_DEFAULT = 20;
|
public static final int DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_THREADS_DEFAULT = 20;
|
||||||
public static final String DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_BUFFER_SIZE_KEY = "dfs.datanode.ec.reconstruction.stripedread.buffer.size";
|
public static final String DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_BUFFER_SIZE_KEY = "dfs.datanode.ec.reconstruction.stripedread.buffer.size";
|
||||||
|
|
|
@ -37,7 +37,7 @@ import org.apache.hadoop.fs.CreateFlag;
|
||||||
import org.apache.hadoop.fs.FsServerDefaults;
|
import org.apache.hadoop.fs.FsServerDefaults;
|
||||||
import org.apache.hadoop.fs.Options.Rename;
|
import org.apache.hadoop.fs.Options.Rename;
|
||||||
import org.apache.hadoop.fs.QuotaUsage;
|
import org.apache.hadoop.fs.QuotaUsage;
|
||||||
import org.apache.hadoop.hdfs.protocol.AddingECPolicyResponse;
|
import org.apache.hadoop.hdfs.protocol.AddECPolicyResponse;
|
||||||
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
||||||
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
|
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
|
||||||
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
|
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
|
||||||
|
@ -1649,11 +1649,11 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
|
||||||
ErasureCodingPolicy[] policies = request.getEcPoliciesList().stream()
|
ErasureCodingPolicy[] policies = request.getEcPoliciesList().stream()
|
||||||
.map(PBHelperClient::convertErasureCodingPolicy)
|
.map(PBHelperClient::convertErasureCodingPolicy)
|
||||||
.toArray(ErasureCodingPolicy[]::new);
|
.toArray(ErasureCodingPolicy[]::new);
|
||||||
AddingECPolicyResponse[] result = server
|
AddECPolicyResponse[] result = server
|
||||||
.addErasureCodingPolicies(policies);
|
.addErasureCodingPolicies(policies);
|
||||||
|
|
||||||
List<HdfsProtos.AddingECPolicyResponseProto> responseProtos = Arrays
|
List<HdfsProtos.AddECPolicyResponseProto> responseProtos = Arrays
|
||||||
.stream(result).map(PBHelperClient::convertAddingECPolicyResponse)
|
.stream(result).map(PBHelperClient::convertAddECPolicyResponse)
|
||||||
.collect(Collectors.toList());
|
.collect(Collectors.toList());
|
||||||
AddErasureCodingPoliciesResponseProto response =
|
AddErasureCodingPoliciesResponseProto response =
|
||||||
AddErasureCodingPoliciesResponseProto.newBuilder()
|
AddErasureCodingPoliciesResponseProto.newBuilder()
|
||||||
|
|
|
@ -25,7 +25,10 @@ import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
|
||||||
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
|
|
||||||
import java.util.List;
|
import org.apache.hadoop.io.erasurecode.CodecUtil;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.TreeMap;
|
import java.util.TreeMap;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
@ -40,7 +43,12 @@ import java.util.stream.Stream;
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.LimitedPrivate({"HDFS"})
|
@InterfaceAudience.LimitedPrivate({"HDFS"})
|
||||||
public final class ErasureCodingPolicyManager {
|
public final class ErasureCodingPolicyManager {
|
||||||
private static final byte USER_DEFINED_POLICY_START_ID = 32;
|
|
||||||
|
public static Logger LOG = LoggerFactory.getLogger(
|
||||||
|
ErasureCodingPolicyManager.class);
|
||||||
|
private static final byte USER_DEFINED_POLICY_START_ID = (byte) 64;
|
||||||
|
private int maxCellSize =
|
||||||
|
DFSConfigKeys.DFS_NAMENODE_EC_POLICIES_MAX_CELLSIZE_DEFAULT;
|
||||||
|
|
||||||
// Supported storage policies for striped EC files
|
// Supported storage policies for striped EC files
|
||||||
private static final byte[] SUITABLE_STORAGE_POLICIES_FOR_EC_STRIPED_MODE =
|
private static final byte[] SUITABLE_STORAGE_POLICIES_FOR_EC_STRIPED_MODE =
|
||||||
|
@ -77,10 +85,6 @@ public final class ErasureCodingPolicyManager {
|
||||||
private ErasureCodingPolicyManager() {}
|
private ErasureCodingPolicyManager() {}
|
||||||
|
|
||||||
public void init(Configuration conf) {
|
public void init(Configuration conf) {
|
||||||
this.loadPolicies(conf);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void loadPolicies(Configuration conf) {
|
|
||||||
// Populate the list of enabled policies from configuration
|
// Populate the list of enabled policies from configuration
|
||||||
final String[] policyNames = conf.getTrimmedStrings(
|
final String[] policyNames = conf.getTrimmedStrings(
|
||||||
DFSConfigKeys.DFS_NAMENODE_EC_POLICIES_ENABLED_KEY,
|
DFSConfigKeys.DFS_NAMENODE_EC_POLICIES_ENABLED_KEY,
|
||||||
|
@ -95,20 +99,30 @@ public final class ErasureCodingPolicyManager {
|
||||||
ErasureCodingPolicy ecPolicy =
|
ErasureCodingPolicy ecPolicy =
|
||||||
SystemErasureCodingPolicies.getByName(policyName);
|
SystemErasureCodingPolicies.getByName(policyName);
|
||||||
if (ecPolicy == null) {
|
if (ecPolicy == null) {
|
||||||
String sysPolicies = SystemErasureCodingPolicies.getPolicies().stream()
|
ecPolicy = userPoliciesByName.get(policyName);
|
||||||
|
if (ecPolicy == null) {
|
||||||
|
String allPolicies = SystemErasureCodingPolicies.getPolicies()
|
||||||
|
.stream().map(ErasureCodingPolicy::getName)
|
||||||
|
.collect(Collectors.joining(", ")) + ", " +
|
||||||
|
userPoliciesByName.values().stream()
|
||||||
.map(ErasureCodingPolicy::getName)
|
.map(ErasureCodingPolicy::getName)
|
||||||
.collect(Collectors.joining(", "));
|
.collect(Collectors.joining(", "));
|
||||||
String msg = String.format("EC policy '%s' specified at %s is not a " +
|
String msg = String.format("EC policy '%s' specified at %s is not a "
|
||||||
"valid policy. Please choose from list of available policies: " +
|
+ "valid policy. Please choose from list of available "
|
||||||
"[%s]",
|
+ "policies: [%s]",
|
||||||
policyName,
|
policyName,
|
||||||
DFSConfigKeys.DFS_NAMENODE_EC_POLICIES_ENABLED_KEY,
|
DFSConfigKeys.DFS_NAMENODE_EC_POLICIES_ENABLED_KEY,
|
||||||
sysPolicies);
|
allPolicies);
|
||||||
throw new IllegalArgumentException(msg);
|
throw new IllegalArgumentException(msg);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
enabledPoliciesByName.put(ecPolicy.getName(), ecPolicy);
|
enabledPoliciesByName.put(ecPolicy.getName(), ecPolicy);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
maxCellSize = conf.getInt(
|
||||||
|
DFSConfigKeys.DFS_NAMENODE_EC_POLICIES_MAX_CELLSIZE_KEY,
|
||||||
|
DFSConfigKeys.DFS_NAMENODE_EC_POLICIES_MAX_CELLSIZE_DEFAULT);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* TODO: HDFS-7859 persist into NameNode
|
* TODO: HDFS-7859 persist into NameNode
|
||||||
* load persistent policies from image and editlog, which is done only once
|
* load persistent policies from image and editlog, which is done only once
|
||||||
|
@ -153,9 +167,10 @@ public final class ErasureCodingPolicyManager {
|
||||||
* Get all system defined policies and user defined policies.
|
* Get all system defined policies and user defined policies.
|
||||||
* @return all policies
|
* @return all policies
|
||||||
*/
|
*/
|
||||||
public List<ErasureCodingPolicy> getPolicies() {
|
public ErasureCodingPolicy[] getPolicies() {
|
||||||
return Stream.concat(SystemErasureCodingPolicies.getPolicies().stream(),
|
return Stream.concat(SystemErasureCodingPolicies.getPolicies().stream(),
|
||||||
this.userPoliciesByID.values().stream()).collect(Collectors.toList());
|
userPoliciesByName.values().stream())
|
||||||
|
.toArray(ErasureCodingPolicy[]::new);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -190,24 +205,37 @@ public final class ErasureCodingPolicyManager {
|
||||||
// This is a placeholder for HDFS-7337.
|
// This is a placeholder for HDFS-7337.
|
||||||
}
|
}
|
||||||
|
|
||||||
public synchronized void addPolicy(ErasureCodingPolicy policy)
|
public synchronized ErasureCodingPolicy addPolicy(ErasureCodingPolicy policy)
|
||||||
throws IllegalECPolicyException {
|
throws IllegalECPolicyException {
|
||||||
|
if (!CodecUtil.hasCodec(policy.getCodecName())) {
|
||||||
|
throw new IllegalECPolicyException("Codec name "
|
||||||
|
+ policy.getCodecName() + " is not supported");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (policy.getCellSize() > maxCellSize) {
|
||||||
|
throw new IllegalECPolicyException("Cell size " + policy.getCellSize()
|
||||||
|
+ " should not exceed maximum " + maxCellSize + " byte");
|
||||||
|
}
|
||||||
|
|
||||||
String assignedNewName = ErasureCodingPolicy.composePolicyName(
|
String assignedNewName = ErasureCodingPolicy.composePolicyName(
|
||||||
policy.getSchema(), policy.getCellSize());
|
policy.getSchema(), policy.getCellSize());
|
||||||
for (ErasureCodingPolicy p : getPolicies()) {
|
for (ErasureCodingPolicy p : getPolicies()) {
|
||||||
if (p.getName().equals(assignedNewName)) {
|
if (p.getName().equals(assignedNewName)) {
|
||||||
throw new IllegalECPolicyException("The policy name already exists");
|
throw new IllegalECPolicyException("The policy name " + assignedNewName
|
||||||
|
+ " already exists");
|
||||||
}
|
}
|
||||||
if (p.getSchema().equals(policy.getSchema()) &&
|
if (p.getSchema().equals(policy.getSchema()) &&
|
||||||
p.getCellSize() == policy.getCellSize()) {
|
p.getCellSize() == policy.getCellSize()) {
|
||||||
throw new IllegalECPolicyException("A policy with same schema and " +
|
throw new IllegalECPolicyException("A policy with same schema "
|
||||||
"cell size already exists");
|
+ policy.getSchema().toString() + " and cell size "
|
||||||
|
+ p.getCellSize() + " is already exists");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
policy.setName(assignedNewName);
|
policy.setName(assignedNewName);
|
||||||
policy.setId(getNextAvailablePolicyID());
|
policy.setId(getNextAvailablePolicyID());
|
||||||
this.userPoliciesByName.put(policy.getName(), policy);
|
this.userPoliciesByName.put(policy.getName(), policy);
|
||||||
this.userPoliciesByID.put(policy.getId(), policy);
|
this.userPoliciesByID.put(policy.getId(), policy);
|
||||||
|
return policy;
|
||||||
}
|
}
|
||||||
|
|
||||||
private byte getNextAvailablePolicyID() {
|
private byte getNextAvailablePolicyID() {
|
||||||
|
|
|
@ -212,10 +212,10 @@ final class FSDirErasureCodingOp {
|
||||||
return fsd.getAuditFileInfo(iip);
|
return fsd.getAuditFileInfo(iip);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void addErasureCodePolicy(final FSNamesystem fsn,
|
static ErasureCodingPolicy addErasureCodePolicy(final FSNamesystem fsn,
|
||||||
ErasureCodingPolicy policy) throws IllegalECPolicyException {
|
ErasureCodingPolicy policy) throws IllegalECPolicyException {
|
||||||
Preconditions.checkNotNull(policy);
|
Preconditions.checkNotNull(policy);
|
||||||
fsn.getErasureCodingPolicyManager().addPolicy(policy);
|
return fsn.getErasureCodingPolicyManager().addPolicy(policy);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static List<XAttr> removeErasureCodingPolicyXAttr(
|
private static List<XAttr> removeErasureCodingPolicyXAttr(
|
||||||
|
|
|
@ -88,6 +88,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROU
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_DEFAULT;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_DEFAULT;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY;
|
||||||
import static org.apache.hadoop.hdfs.server.namenode.FSDirStatAndListingOp.*;
|
import static org.apache.hadoop.hdfs.server.namenode.FSDirStatAndListingOp.*;
|
||||||
|
|
||||||
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
|
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
|
||||||
import static org.apache.hadoop.util.Time.now;
|
import static org.apache.hadoop.util.Time.now;
|
||||||
import static org.apache.hadoop.util.Time.monotonicNow;
|
import static org.apache.hadoop.util.Time.monotonicNow;
|
||||||
|
@ -175,7 +176,7 @@ import org.apache.hadoop.hdfs.DFSUtil;
|
||||||
import org.apache.hadoop.hdfs.HAUtil;
|
import org.apache.hadoop.hdfs.HAUtil;
|
||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
import org.apache.hadoop.hdfs.UnknownCryptoProtocolVersionException;
|
import org.apache.hadoop.hdfs.UnknownCryptoProtocolVersionException;
|
||||||
import org.apache.hadoop.hdfs.protocol.AddingECPolicyResponse;
|
import org.apache.hadoop.hdfs.protocol.AddECPolicyResponse;
|
||||||
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
|
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
import org.apache.hadoop.hdfs.protocol.BlockType;
|
import org.apache.hadoop.hdfs.protocol.BlockType;
|
||||||
|
@ -6847,21 +6848,33 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
||||||
* @param policies The policies to add.
|
* @param policies The policies to add.
|
||||||
* @return The according result of add operation.
|
* @return The according result of add operation.
|
||||||
*/
|
*/
|
||||||
AddingECPolicyResponse[] addECPolicies(ErasureCodingPolicy[] policies)
|
AddECPolicyResponse[] addECPolicies(ErasureCodingPolicy[] policies)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
final String operationName = "addECPolicies";
|
||||||
checkOperation(OperationCategory.WRITE);
|
checkOperation(OperationCategory.WRITE);
|
||||||
List<AddingECPolicyResponse> responses = new ArrayList<>();
|
List<AddECPolicyResponse> responses = new ArrayList<>();
|
||||||
|
boolean success = false;
|
||||||
writeLock();
|
writeLock();
|
||||||
|
try {
|
||||||
|
checkOperation(OperationCategory.WRITE);
|
||||||
for (ErasureCodingPolicy policy : policies) {
|
for (ErasureCodingPolicy policy : policies) {
|
||||||
try {
|
try {
|
||||||
|
ErasureCodingPolicy newPolicy =
|
||||||
FSDirErasureCodingOp.addErasureCodePolicy(this, policy);
|
FSDirErasureCodingOp.addErasureCodePolicy(this, policy);
|
||||||
responses.add(new AddingECPolicyResponse(policy));
|
responses.add(new AddECPolicyResponse(newPolicy));
|
||||||
} catch (IllegalECPolicyException e) {
|
} catch (IllegalECPolicyException e) {
|
||||||
responses.add(new AddingECPolicyResponse(policy, e));
|
responses.add(new AddECPolicyResponse(policy, e));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
writeUnlock("addECPolicies");
|
success = true;
|
||||||
return responses.toArray(new AddingECPolicyResponse[0]);
|
return responses.toArray(new AddECPolicyResponse[0]);
|
||||||
|
} finally {
|
||||||
|
writeUnlock(operationName);
|
||||||
|
if (success) {
|
||||||
|
getEditLog().logSync();
|
||||||
|
}
|
||||||
|
logAuditEvent(success, operationName, null, null, null);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -84,7 +84,7 @@ import org.apache.hadoop.hdfs.HDFSPolicyProvider;
|
||||||
import org.apache.hadoop.hdfs.inotify.EventBatch;
|
import org.apache.hadoop.hdfs.inotify.EventBatch;
|
||||||
import org.apache.hadoop.hdfs.inotify.EventBatchList;
|
import org.apache.hadoop.hdfs.inotify.EventBatchList;
|
||||||
import org.apache.hadoop.hdfs.protocol.AclException;
|
import org.apache.hadoop.hdfs.protocol.AclException;
|
||||||
import org.apache.hadoop.hdfs.protocol.AddingECPolicyResponse;
|
import org.apache.hadoop.hdfs.protocol.AddECPolicyResponse;
|
||||||
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
|
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
|
||||||
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
||||||
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
||||||
|
@ -2270,9 +2270,10 @@ public class NameNodeRpcServer implements NamenodeProtocols {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public AddingECPolicyResponse[] addErasureCodingPolicies(
|
public AddECPolicyResponse[] addErasureCodingPolicies(
|
||||||
ErasureCodingPolicy[] policies) throws IOException {
|
ErasureCodingPolicy[] policies) throws IOException {
|
||||||
checkNNStartup();
|
checkNNStartup();
|
||||||
|
namesystem.checkSuperuserPrivilege();
|
||||||
return namesystem.addECPolicies(policies);
|
return namesystem.addECPolicies(policies);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -22,7 +22,7 @@ import org.apache.hadoop.conf.Configured;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
import org.apache.hadoop.hdfs.protocol.AddingECPolicyResponse;
|
import org.apache.hadoop.hdfs.protocol.AddECPolicyResponse;
|
||||||
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
||||||
import org.apache.hadoop.hdfs.util.ECPolicyLoader;
|
import org.apache.hadoop.hdfs.util.ECPolicyLoader;
|
||||||
import org.apache.hadoop.tools.TableListing;
|
import org.apache.hadoop.tools.TableListing;
|
||||||
|
@ -182,9 +182,9 @@ public class ECAdmin extends Configured implements Tool {
|
||||||
List<ErasureCodingPolicy> policies =
|
List<ErasureCodingPolicy> policies =
|
||||||
new ECPolicyLoader().loadPolicy(filePath);
|
new ECPolicyLoader().loadPolicy(filePath);
|
||||||
if (policies.size() > 0) {
|
if (policies.size() > 0) {
|
||||||
AddingECPolicyResponse[] responses = dfs.addErasureCodingPolicies(
|
AddECPolicyResponse[] responses = dfs.addErasureCodingPolicies(
|
||||||
policies.toArray(new ErasureCodingPolicy[policies.size()]));
|
policies.toArray(new ErasureCodingPolicy[policies.size()]));
|
||||||
for (AddingECPolicyResponse response : responses) {
|
for (AddECPolicyResponse response : responses) {
|
||||||
System.out.println(response);
|
System.out.println(response);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -2953,6 +2953,13 @@
|
||||||
</description>
|
</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>dfs.namenode.ec.policies.max.cellsize</name>
|
||||||
|
<value>4194304</value>
|
||||||
|
<description>The maximum cell size of erasure coding policy. Default is 4MB.
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<name>dfs.datanode.ec.reconstruction.stripedread.timeout.millis</name>
|
<name>dfs.datanode.ec.reconstruction.stripedread.timeout.millis</name>
|
||||||
<value>5000</value>
|
<value>5000</value>
|
||||||
|
|
|
@ -154,6 +154,7 @@ Deployment
|
||||||
[-getPolicy -path <path>]
|
[-getPolicy -path <path>]
|
||||||
[-unsetPolicy -path <path>]
|
[-unsetPolicy -path <path>]
|
||||||
[-listPolicies]
|
[-listPolicies]
|
||||||
|
[-addPolicies -policyFile <file>]
|
||||||
[-listCodecs]
|
[-listCodecs]
|
||||||
[-usage [cmd ...]]
|
[-usage [cmd ...]]
|
||||||
[-help [cmd ...]]
|
[-help [cmd ...]]
|
||||||
|
@ -182,7 +183,7 @@ Below are the details about each command.
|
||||||
|
|
||||||
* `[-addPolicies -policyFile <file>]`
|
* `[-addPolicies -policyFile <file>]`
|
||||||
|
|
||||||
Add a list of erasure coding policies. Please refer etc/hadoop/user_ec_policies.xml.template for the example policy file.
|
Add a list of erasure coding policies. Please refer etc/hadoop/user_ec_policies.xml.template for the example policy file. The maximum cell size is defined in property 'dfs.namenode.ec.policies.max.cellsize' with the default value 4MB.
|
||||||
|
|
||||||
* `[-listCodecs]`
|
* `[-listCodecs]`
|
||||||
|
|
||||||
|
|
|
@ -49,8 +49,6 @@ import java.util.Set;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
import java.util.stream.Collectors;
|
|
||||||
import java.util.stream.Stream;
|
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.BlockLocation;
|
import org.apache.hadoop.fs.BlockLocation;
|
||||||
|
@ -77,18 +75,14 @@ import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.client.impl.LeaseRenewer;
|
import org.apache.hadoop.hdfs.client.impl.LeaseRenewer;
|
||||||
import org.apache.hadoop.hdfs.DFSOpsCountStatistics.OpType;
|
import org.apache.hadoop.hdfs.DFSOpsCountStatistics.OpType;
|
||||||
import org.apache.hadoop.hdfs.net.Peer;
|
import org.apache.hadoop.hdfs.net.Peer;
|
||||||
import org.apache.hadoop.hdfs.protocol.AddingECPolicyResponse;
|
|
||||||
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
|
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||||
import org.apache.hadoop.hdfs.web.WebHdfsConstants;
|
import org.apache.hadoop.hdfs.web.WebHdfsConstants;
|
||||||
import org.apache.hadoop.io.erasurecode.ECSchema;
|
|
||||||
import org.apache.hadoop.net.DNSToSwitchMapping;
|
import org.apache.hadoop.net.DNSToSwitchMapping;
|
||||||
import org.apache.hadoop.net.NetUtils;
|
import org.apache.hadoop.net.NetUtils;
|
||||||
import org.apache.hadoop.net.ScriptBasedMapping;
|
import org.apache.hadoop.net.ScriptBasedMapping;
|
||||||
|
@ -1449,37 +1443,4 @@ public class TestDistributedFileSystem {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testAddErasureCodingPolicies() throws Exception {
|
|
||||||
Configuration conf = getTestConfiguration();
|
|
||||||
MiniDFSCluster cluster = null;
|
|
||||||
|
|
||||||
try {
|
|
||||||
ErasureCodingPolicy policy1 =
|
|
||||||
SystemErasureCodingPolicies.getPolicies().get(0);
|
|
||||||
conf.set(DFSConfigKeys.DFS_NAMENODE_EC_POLICIES_ENABLED_KEY,
|
|
||||||
Stream.of(policy1).map(ErasureCodingPolicy::getName)
|
|
||||||
.collect(Collectors.joining(", ")));
|
|
||||||
|
|
||||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
|
|
||||||
DistributedFileSystem fs = cluster.getFileSystem();
|
|
||||||
|
|
||||||
ECSchema toAddSchema = new ECSchema("testcodec", 3, 2);
|
|
||||||
ErasureCodingPolicy toAddPolicy =
|
|
||||||
new ErasureCodingPolicy(toAddSchema, 128 * 1024, (byte) 254);
|
|
||||||
ErasureCodingPolicy[] policies = new ErasureCodingPolicy[]{
|
|
||||||
policy1, toAddPolicy};
|
|
||||||
AddingECPolicyResponse[] responses =
|
|
||||||
fs.addErasureCodingPolicies(policies);
|
|
||||||
assertEquals(2, responses.length);
|
|
||||||
assertFalse(responses[0].isSucceed());
|
|
||||||
assertTrue(responses[1].isSucceed());
|
|
||||||
assertTrue(responses[1].getPolicy().getId() > 0);
|
|
||||||
} finally {
|
|
||||||
if (cluster != null) {
|
|
||||||
cluster.shutdown();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,10 +23,12 @@ import org.apache.hadoop.fs.CreateFlag;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.AddECPolicyResponse;
|
||||||
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
|
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
|
||||||
import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
|
import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.ErasureCodingPolicyManager;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.INode;
|
import org.apache.hadoop.hdfs.server.namenode.INode;
|
||||||
import org.apache.hadoop.hdfs.client.HdfsAdmin;
|
import org.apache.hadoop.hdfs.client.HdfsAdmin;
|
||||||
|
@ -639,4 +641,56 @@ public class TestErasureCodingPolicies {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAddErasureCodingPolicies() throws Exception {
|
||||||
|
// Test nonexistent codec name
|
||||||
|
ECSchema toAddSchema = new ECSchema("testcodec", 3, 2);
|
||||||
|
ErasureCodingPolicy newPolicy =
|
||||||
|
new ErasureCodingPolicy(toAddSchema, 128 * 1024);
|
||||||
|
ErasureCodingPolicy[] policyArray = new ErasureCodingPolicy[]{newPolicy};
|
||||||
|
AddECPolicyResponse[] responses =
|
||||||
|
fs.addErasureCodingPolicies(policyArray);
|
||||||
|
assertEquals(1, responses.length);
|
||||||
|
assertFalse(responses[0].isSucceed());
|
||||||
|
|
||||||
|
// Test too big cell size
|
||||||
|
toAddSchema = new ECSchema("rs", 3, 2);
|
||||||
|
newPolicy =
|
||||||
|
new ErasureCodingPolicy(toAddSchema, 128 * 1024 * 1024);
|
||||||
|
policyArray = new ErasureCodingPolicy[]{newPolicy};
|
||||||
|
responses = fs.addErasureCodingPolicies(policyArray);
|
||||||
|
assertEquals(1, responses.length);
|
||||||
|
assertFalse(responses[0].isSucceed());
|
||||||
|
|
||||||
|
// Test other invalid cell size
|
||||||
|
toAddSchema = new ECSchema("rs", 3, 2);
|
||||||
|
int[] cellSizes = {0, -1, 1023};
|
||||||
|
for (int cellSize: cellSizes) {
|
||||||
|
try {
|
||||||
|
new ErasureCodingPolicy(toAddSchema, cellSize);
|
||||||
|
Assert.fail("Invalid cell size should be detected.");
|
||||||
|
} catch (Exception e){
|
||||||
|
GenericTestUtils.assertExceptionContains("cellSize must be", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test duplicate policy
|
||||||
|
ErasureCodingPolicy policy0 =
|
||||||
|
SystemErasureCodingPolicies.getPolicies().get(0);
|
||||||
|
policyArray = new ErasureCodingPolicy[]{policy0};
|
||||||
|
responses = fs.addErasureCodingPolicies(policyArray);
|
||||||
|
assertEquals(1, responses.length);
|
||||||
|
assertFalse(responses[0].isSucceed());
|
||||||
|
|
||||||
|
// Test add policy successfully
|
||||||
|
newPolicy =
|
||||||
|
new ErasureCodingPolicy(toAddSchema, 1 * 1024 * 1024);
|
||||||
|
policyArray = new ErasureCodingPolicy[]{newPolicy};
|
||||||
|
responses = fs.addErasureCodingPolicies(policyArray);
|
||||||
|
assertEquals(1, responses.length);
|
||||||
|
assertTrue(responses[0].isSucceed());
|
||||||
|
assertEquals(SystemErasureCodingPolicies.getPolicies().size() + 1,
|
||||||
|
ErasureCodingPolicyManager.getInstance().getPolicies().length);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,7 +19,7 @@ package org.apache.hadoop.hdfs.protocolPB;
|
||||||
|
|
||||||
|
|
||||||
import com.google.protobuf.UninitializedMessageException;
|
import com.google.protobuf.UninitializedMessageException;
|
||||||
import org.apache.hadoop.hdfs.protocol.AddingECPolicyResponse;
|
import org.apache.hadoop.hdfs.protocol.AddECPolicyResponse;
|
||||||
import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
|
import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
|
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
|
||||||
|
|
||||||
|
@ -913,14 +913,14 @@ public class TestPBHelper {
|
||||||
// Check conversion of the built-in policies.
|
// Check conversion of the built-in policies.
|
||||||
for (ErasureCodingPolicy policy :
|
for (ErasureCodingPolicy policy :
|
||||||
SystemErasureCodingPolicies.getPolicies()) {
|
SystemErasureCodingPolicies.getPolicies()) {
|
||||||
AddingECPolicyResponse response = new AddingECPolicyResponse(policy);
|
AddECPolicyResponse response = new AddECPolicyResponse(policy);
|
||||||
HdfsProtos.AddingECPolicyResponseProto proto = PBHelperClient
|
HdfsProtos.AddECPolicyResponseProto proto = PBHelperClient
|
||||||
.convertAddingECPolicyResponse(response);
|
.convertAddECPolicyResponse(response);
|
||||||
// Optional fields should not be set.
|
// Optional fields should not be set.
|
||||||
assertFalse("Unnecessary field is set.", proto.hasErrorMsg());
|
assertFalse("Unnecessary field is set.", proto.hasErrorMsg());
|
||||||
// Convert proto back to an object and check for equality.
|
// Convert proto back to an object and check for equality.
|
||||||
AddingECPolicyResponse convertedResponse = PBHelperClient
|
AddECPolicyResponse convertedResponse = PBHelperClient
|
||||||
.convertAddingECPolicyResponse(proto);
|
.convertAddECPolicyResponse(proto);
|
||||||
assertEquals("Converted policy not equal", response.getPolicy(),
|
assertEquals("Converted policy not equal", response.getPolicy(),
|
||||||
convertedResponse.getPolicy());
|
convertedResponse.getPolicy());
|
||||||
assertEquals("Converted policy not equal", response.isSucceed(),
|
assertEquals("Converted policy not equal", response.isSucceed(),
|
||||||
|
@ -929,13 +929,13 @@ public class TestPBHelper {
|
||||||
|
|
||||||
ErasureCodingPolicy policy = SystemErasureCodingPolicies
|
ErasureCodingPolicy policy = SystemErasureCodingPolicies
|
||||||
.getPolicies().get(0);
|
.getPolicies().get(0);
|
||||||
AddingECPolicyResponse response =
|
AddECPolicyResponse response =
|
||||||
new AddingECPolicyResponse(policy, "failed");
|
new AddECPolicyResponse(policy, "failed");
|
||||||
HdfsProtos.AddingECPolicyResponseProto proto = PBHelperClient
|
HdfsProtos.AddECPolicyResponseProto proto = PBHelperClient
|
||||||
.convertAddingECPolicyResponse(response);
|
.convertAddECPolicyResponse(response);
|
||||||
// Convert proto back to an object and check for equality.
|
// Convert proto back to an object and check for equality.
|
||||||
AddingECPolicyResponse convertedResponse = PBHelperClient
|
AddECPolicyResponse convertedResponse = PBHelperClient
|
||||||
.convertAddingECPolicyResponse(proto);
|
.convertAddECPolicyResponse(proto);
|
||||||
assertEquals("Converted policy not equal", response.getPolicy(),
|
assertEquals("Converted policy not equal", response.getPolicy(),
|
||||||
convertedResponse.getPolicy());
|
convertedResponse.getPolicy());
|
||||||
assertEquals("Converted policy not equal", response.getErrorMsg(),
|
assertEquals("Converted policy not equal", response.getErrorMsg(),
|
||||||
|
@ -961,7 +961,7 @@ public class TestPBHelper {
|
||||||
// Check conversion of a non-built-in policy.
|
// Check conversion of a non-built-in policy.
|
||||||
ECSchema newSchema = new ECSchema("testcodec", 3, 2);
|
ECSchema newSchema = new ECSchema("testcodec", 3, 2);
|
||||||
ErasureCodingPolicy newPolicy =
|
ErasureCodingPolicy newPolicy =
|
||||||
new ErasureCodingPolicy(newSchema, 128 * 1024, (byte) 254);
|
new ErasureCodingPolicy(newSchema, 128 * 1024);
|
||||||
HdfsProtos.ErasureCodingPolicyProto proto = PBHelperClient
|
HdfsProtos.ErasureCodingPolicyProto proto = PBHelperClient
|
||||||
.convertErasureCodingPolicy(newPolicy);
|
.convertErasureCodingPolicy(newPolicy);
|
||||||
// Optional fields should be set.
|
// Optional fields should be set.
|
||||||
|
|
Loading…
Reference in New Issue