From cb672a45a0bbd8950b9b5e304c2e03f516945903 Mon Sep 17 00:00:00 2001 From: Kai Zheng Date: Fri, 28 Apr 2017 13:18:50 +0800 Subject: [PATCH] HDFS-11605. Allow user to customize new erasure code policies. Contributed by Huafeng Wang --- .../hadoop/io/erasurecode/ECSchema.java | 1 + .../org/apache/hadoop/hdfs/DFSClient.java | 7 ++ .../hadoop/hdfs/DistributedFileSystem.java | 13 +++ .../hdfs/protocol/AddingECPolicyResponse.java | 66 +++++++++++++ .../hadoop/hdfs/protocol/ClientProtocol.java | 11 +++ .../hdfs/protocol/ErasureCodingPolicy.java | 14 ++- .../protocol/IllegalECPolicyException.java | 34 +++++++ .../ClientNamenodeProtocolTranslatorPB.java | 25 +++++ .../hdfs/protocolPB/PBHelperClient.java | 24 +++++ .../hadoop/hdfs/util/ECPolicyLoader.java | 25 +++-- .../main/proto/ClientNamenodeProtocol.proto | 2 + .../src/main/proto/erasurecoding.proto | 8 ++ .../src/main/proto/hdfs.proto | 6 ++ ...amenodeProtocolServerSideTranslatorPB.java | 31 +++++- .../namenode/ErasureCodingPolicyManager.java | 95 ++++++++++++++++++- .../server/namenode/FSDirErasureCodingOp.java | 13 ++- .../server/namenode/FSImageFormatPBINode.java | 3 +- .../hdfs/server/namenode/FSNamesystem.java | 36 +++++-- .../hdfs/server/namenode/INodeFile.java | 7 +- .../server/namenode/NameNodeRpcServer.java | 8 ++ .../org/apache/hadoop/hdfs/tools/ECAdmin.java | 63 ++++++++++++ .../src/site/markdown/HDFSErasureCoding.md | 4 + .../hdfs/TestDistributedFileSystem.java | 39 ++++++++ .../hadoop/hdfs/protocolPB/TestPBHelper.java | 35 +++++++ .../namenode/TestEnabledECPolicies.java | 11 ++- .../test/resources/testErasureCodingConf.xml | 68 +++++++++++++ .../src/test/resources/test_ec_policies.xml | 65 +++++++++++++ 27 files changed, 674 insertions(+), 40 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/AddingECPolicyResponse.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/IllegalECPolicyException.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/resources/test_ec_policies.xml diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/ECSchema.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/ECSchema.java index 1f11757215c..e55fbdd8768 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/ECSchema.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/ECSchema.java @@ -199,6 +199,7 @@ public final class ECSchema { return sb.toString(); } + // Todo: Further use `extraOptions` to compare ECSchemas @Override public boolean equals(Object o) { if (o == null) { diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index 7773891cb3f..187d2e90841 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -101,6 +101,7 @@ import org.apache.hadoop.hdfs.client.impl.DfsClientConf; import org.apache.hadoop.hdfs.client.impl.LeaseRenewer; import org.apache.hadoop.hdfs.net.Peer; import org.apache.hadoop.hdfs.protocol.AclException; +import org.apache.hadoop.hdfs.protocol.AddingECPolicyResponse; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry; import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo; @@ -2763,6 +2764,12 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, } } + public AddingECPolicyResponse[] addErasureCodingPolicies( + ErasureCodingPolicy[] policies) throws IOException { + checkOpen(); + return namenode.addErasureCodingPolicies(policies); + } + public DFSInotifyEventInputStream getInotifyEventStream() throws IOException { checkOpen(); return new DFSInotifyEventInputStream(namenode, tracer); diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java index cfe7240a4b0..429f4c2384a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java @@ -70,6 +70,7 @@ import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; import org.apache.hadoop.hdfs.client.impl.CorruptFileBlockIterator; import org.apache.hadoop.hdfs.DFSOpsCountStatistics.OpType; +import org.apache.hadoop.hdfs.protocol.AddingECPolicyResponse; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry; import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo; @@ -2530,6 +2531,18 @@ public class DistributedFileSystem extends FileSystem { return Arrays.asList(dfs.getErasureCodingPolicies()); } + /** + * Add Erasure coding policies to HDFS. + * + * @param policies The user defined ec policy list to add. + * @return Return the response list of adding operations. + * @throws IOException + */ + public AddingECPolicyResponse[] addErasureCodingPolicies( + ErasureCodingPolicy[] policies) throws IOException { + return dfs.addErasureCodingPolicies(policies); + } + /** * Unset the erasure coding policy from the source path. * diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/AddingECPolicyResponse.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/AddingECPolicyResponse.java new file mode 100644 index 00000000000..ab39f09383b --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/AddingECPolicyResponse.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocol; + +/** + * A response of adding an ErasureCoding policy. + */ +public class AddingECPolicyResponse { + private boolean succeed; + private ErasureCodingPolicy policy; + private String errorMsg; + + public AddingECPolicyResponse(ErasureCodingPolicy policy) { + this.policy = policy; + this.succeed = true; + } + + public AddingECPolicyResponse(ErasureCodingPolicy policy, + String errorMsg) { + this.policy = policy; + this.errorMsg = errorMsg; + this.succeed = false; + } + + public AddingECPolicyResponse(ErasureCodingPolicy policy, + IllegalECPolicyException e) { + this(policy, e.getMessage()); + } + + public boolean isSucceed() { + return succeed; + } + + public ErasureCodingPolicy getPolicy() { + return policy; + } + + public String getErrorMsg() { + return errorMsg; + } + + @Override + public String toString() { + if (isSucceed()) { + return "Add ErasureCodingPolicy " + getPolicy().getName() + " succeed."; + } else { + return "Add ErasureCodingPolicy " + getPolicy().getName() + " failed and " + + "error message is " + getErrorMsg(); + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java index 6db37b8f05f..117b9dd8e44 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java @@ -1520,6 +1520,17 @@ public interface ClientProtocol { void setErasureCodingPolicy(String src, String ecPolicyName) throws IOException; + /** + * Add Erasure coding policies. + * + * @param policies The user defined ec policy list to add. + * @return Return the response list of adding operations. + * @throws IOException + */ + @AtMostOnce + AddingECPolicyResponse[] addErasureCodingPolicies( + ErasureCodingPolicy[] policies) throws IOException; + /** * Get the erasure coding policies loaded in Namenode * diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ErasureCodingPolicy.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ErasureCodingPolicy.java index 9f485f0d57c..99bc4e6ba9d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ErasureCodingPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ErasureCodingPolicy.java @@ -31,10 +31,10 @@ import org.apache.hadoop.io.erasurecode.ECSchema; @InterfaceStability.Evolving public final class ErasureCodingPolicy { - private final String name; private final ECSchema schema; private final int cellSize; - private final byte id; + private String name; + private byte id; public ErasureCodingPolicy(String name, ECSchema schema, int cellSize, byte id) { @@ -51,7 +51,7 @@ public final class ErasureCodingPolicy { this(composePolicyName(schema, cellSize), schema, cellSize, id); } - private static String composePolicyName(ECSchema schema, int cellSize) { + public static String composePolicyName(ECSchema schema, int cellSize) { assert cellSize % 1024 == 0; return schema.getCodecName().toUpperCase() + "-" + schema.getNumDataUnits() + "-" + schema.getNumParityUnits() + @@ -62,6 +62,10 @@ public final class ErasureCodingPolicy { return name; } + public void setName(String name) { + this.name = name; + } + public ECSchema getSchema() { return schema; } @@ -86,6 +90,10 @@ public final class ErasureCodingPolicy { return id; } + public void setId(byte id) { + this.id = id; + } + @Override public boolean equals(Object o) { if (o == null) { diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/IllegalECPolicyException.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/IllegalECPolicyException.java new file mode 100644 index 00000000000..03ce2a513a0 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/IllegalECPolicyException.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocol; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * An Exception indicates the error when adding an ErasureCoding policy. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class IllegalECPolicyException extends Exception { + static final long serialVersionUID = 1L; + + public IllegalECPolicyException(String msg) { + super(msg); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java index c3708f98785..4df21234e0d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java @@ -26,6 +26,7 @@ import java.util.List; import com.google.common.collect.Lists; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -47,6 +48,7 @@ import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.AddBlockFlag; import org.apache.hadoop.hdfs.inotify.EventBatchList; +import org.apache.hadoop.hdfs.protocol.AddingECPolicyResponse; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry; import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo; @@ -168,6 +170,8 @@ import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.CreateEncrypt import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.EncryptionZoneProto; import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.GetEZForPathRequestProto; import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.ListEncryptionZonesRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.AddErasureCodingPoliciesRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.AddErasureCodingPoliciesResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.GetErasureCodingPoliciesRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.GetErasureCodingPoliciesResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.GetErasureCodingPolicyRequestProto; @@ -1624,6 +1628,27 @@ public class ClientNamenodeProtocolTranslatorPB implements } } + @Override + public AddingECPolicyResponse[] addErasureCodingPolicies( + ErasureCodingPolicy[] policies) throws IOException { + List protos = Arrays.stream(policies) + .map(PBHelperClient::convertErasureCodingPolicy) + .collect(Collectors.toList()); + AddErasureCodingPoliciesRequestProto req = + AddErasureCodingPoliciesRequestProto.newBuilder() + .addAllEcPolicies(protos).build(); + try { + AddErasureCodingPoliciesResponseProto rep = rpcProxy + .addErasureCodingPolicies(null, req); + AddingECPolicyResponse[] responses = rep.getResponsesList().stream() + .map(PBHelperClient::convertAddingECPolicyResponse) + .toArray(AddingECPolicyResponse[]::new); + return responses; + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + @Override public ErasureCodingPolicy[] getErasureCodingPolicies() throws IOException { try { diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java index dd552035d94..6ca35417540 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java @@ -58,6 +58,7 @@ import org.apache.hadoop.hdfs.DFSUtilClient; import org.apache.hadoop.hdfs.inotify.Event; import org.apache.hadoop.hdfs.inotify.EventBatch; import org.apache.hadoop.hdfs.inotify.EventBatchList; +import org.apache.hadoop.hdfs.protocol.AddingECPolicyResponse; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.protocol.BlockType; @@ -123,6 +124,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmS import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.EncryptionZoneProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.AccessModeProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.AddingECPolicyResponseProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockStoragePolicyProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockTypeProto; @@ -2679,6 +2681,28 @@ public class PBHelperClient { return builder.build(); } + public static AddingECPolicyResponseProto convertAddingECPolicyResponse( + AddingECPolicyResponse response) { + AddingECPolicyResponseProto.Builder builder = + AddingECPolicyResponseProto.newBuilder() + .setPolicy(convertErasureCodingPolicy(response.getPolicy())) + .setSucceed(response.isSucceed()); + if (!response.isSucceed()) { + builder.setErrorMsg(response.getErrorMsg()); + } + return builder.build(); + } + + public static AddingECPolicyResponse convertAddingECPolicyResponse( + AddingECPolicyResponseProto proto) { + ErasureCodingPolicy policy = convertErasureCodingPolicy(proto.getPolicy()); + if (proto.getSucceed()) { + return new AddingECPolicyResponse(policy); + } else { + return new AddingECPolicyResponse(policy, proto.getErrorMsg()); + } + } + public static HdfsProtos.DatanodeInfosProto convertToProto( DatanodeInfo[] datanodeInfos) { HdfsProtos.DatanodeInfosProto.Builder builder = diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/ECPolicyLoader.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/ECPolicyLoader.java index e75f0917858..02ae25596bb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/ECPolicyLoader.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/ECPolicyLoader.java @@ -32,6 +32,7 @@ import javax.xml.parsers.DocumentBuilderFactory; import javax.xml.parsers.ParserConfigurationException; import java.io.File; import java.io.IOException; +import java.net.MalformedURLException; import java.net.URL; import java.util.Map; import java.util.List; @@ -59,17 +60,16 @@ public class ECPolicyLoader { * @return all valid EC policies in EC policy file */ public List loadPolicy(String policyFilePath) { - File policyFile = getPolicyFile(policyFilePath); - if (policyFile == null) { - LOG.warn("Not found any EC policy file"); - return Collections.emptyList(); - } - try { + File policyFile = getPolicyFile(policyFilePath); + if (!policyFile.exists()) { + LOG.warn("Not found any EC policy file"); + return Collections.emptyList(); + } return loadECPolicies(policyFile); } catch (ParserConfigurationException | IOException | SAXException e) { throw new RuntimeException("Failed to load EC policy file: " - + policyFile); + + policyFilePath); } } @@ -220,15 +220,12 @@ public class ECPolicyLoader { * @param policyFilePath path of EC policy file * @return EC policy file */ - private File getPolicyFile(String policyFilePath) { + private File getPolicyFile(String policyFilePath) + throws MalformedURLException { File policyFile = new File(policyFilePath); if (!policyFile.isAbsolute()) { - URL url = Thread.currentThread().getContextClassLoader() - .getResource(policyFilePath); - if (url == null) { - LOG.warn(policyFilePath + " not found on the classpath."); - policyFile = null; - } else if (!url.getProtocol().equalsIgnoreCase("file")) { + URL url = new URL(policyFilePath); + if (!url.getProtocol().equalsIgnoreCase("file")) { throw new RuntimeException( "EC policy file " + url + " found on the classpath is not on the local filesystem."); diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto index 44f1c3373b2..b8bd6bfa3d3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto @@ -907,6 +907,8 @@ service ClientNamenodeProtocol { returns(GetEditsFromTxidResponseProto); rpc getErasureCodingPolicies(GetErasureCodingPoliciesRequestProto) returns(GetErasureCodingPoliciesResponseProto); + rpc addErasureCodingPolicies(AddErasureCodingPoliciesRequestProto) + returns(AddErasureCodingPoliciesResponseProto); rpc getErasureCodingPolicy(GetErasureCodingPolicyRequestProto) returns(GetErasureCodingPolicyResponseProto); rpc getQuotaUsage(GetQuotaUsageRequestProto) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/erasurecoding.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/erasurecoding.proto index 4eab4d34ade..03497e62794 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/erasurecoding.proto +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/erasurecoding.proto @@ -46,6 +46,14 @@ message GetErasureCodingPolicyResponseProto { optional ErasureCodingPolicyProto ecPolicy = 1; } +message AddErasureCodingPoliciesRequestProto { + repeated ErasureCodingPolicyProto ecPolicies = 1; +} + +message AddErasureCodingPoliciesResponseProto { + repeated AddingECPolicyResponseProto responses = 1; +} + message UnsetErasureCodingPolicyRequestProto { required string src = 1; } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto index 1be92d8c11a..3e274278311 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto @@ -371,6 +371,12 @@ message ErasureCodingPolicyProto { required uint32 id = 4; // Actually a byte - only 8 bits used } +message AddingECPolicyResponseProto { + required ErasureCodingPolicyProto policy = 1; + required bool succeed = 2; + optional string errorMsg = 3; +} + /** * Status of a file, directory or symlink * Optionally includes a file's block locations if requested by client on the rpc call. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java index ab0ccdb1ea9..3d8fe35920b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java @@ -19,8 +19,10 @@ package org.apache.hadoop.hdfs.protocolPB; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.EnumSet; import java.util.List; +import java.util.stream.Collectors; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -33,6 +35,7 @@ import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.FsServerDefaults; import org.apache.hadoop.fs.Options.Rename; import org.apache.hadoop.fs.QuotaUsage; +import org.apache.hadoop.hdfs.protocol.AddingECPolicyResponse; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry; import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo; @@ -41,6 +44,7 @@ import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks; import org.apache.hadoop.hdfs.protocol.DirectoryListing; import org.apache.hadoop.hdfs.protocol.EncryptionZone; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus; @@ -209,6 +213,8 @@ import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.GetEZForPathR import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.GetEZForPathRequestProto; import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.ListEncryptionZonesResponseProto; import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.ListEncryptionZonesRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.AddErasureCodingPoliciesRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.AddErasureCodingPoliciesResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.GetErasureCodingPoliciesRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.GetErasureCodingPoliciesResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.GetErasureCodingPolicyRequestProto; @@ -217,6 +223,7 @@ import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.SetErasureCodin import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.SetErasureCodingPolicyResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.UnsetErasureCodingPolicyRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.UnsetErasureCodingPolicyResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockStoragePolicyProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeIDProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeInfoProto; @@ -233,7 +240,6 @@ import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.io.EnumSetWritable; import org.apache.hadoop.io.Text; -import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; import org.apache.hadoop.security.proto.SecurityProtos.CancelDelegationTokenRequestProto; import org.apache.hadoop.security.proto.SecurityProtos.CancelDelegationTokenResponseProto; import org.apache.hadoop.security.proto.SecurityProtos.GetDelegationTokenRequestProto; @@ -1612,6 +1618,29 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements } } + @Override + public AddErasureCodingPoliciesResponseProto addErasureCodingPolicies( + RpcController controller, AddErasureCodingPoliciesRequestProto request) + throws ServiceException { + try { + ErasureCodingPolicy[] policies = request.getEcPoliciesList().stream() + .map(PBHelperClient::convertErasureCodingPolicy) + .toArray(ErasureCodingPolicy[]::new); + AddingECPolicyResponse[] result = server + .addErasureCodingPolicies(policies); + + List responseProtos = Arrays + .stream(result).map(PBHelperClient::convertAddingECPolicyResponse) + .collect(Collectors.toList()); + AddErasureCodingPoliciesResponseProto response = + AddErasureCodingPoliciesResponseProto.newBuilder() + .addAllResponses(responseProtos).build(); + return response; + } catch (IOException e) { + throw new ServiceException(e); + } + } + @Override public GetErasureCodingPolicyResponseProto getErasureCodingPolicy(RpcController controller, GetErasureCodingPolicyRequestProto request) throws ServiceException { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ErasureCodingPolicyManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ErasureCodingPolicyManager.java index 177c0e09fe5..4f27ed8a403 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ErasureCodingPolicyManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ErasureCodingPolicyManager.java @@ -20,14 +20,16 @@ package org.apache.hadoop.hdfs.server.namenode; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.protocol.IllegalECPolicyException; import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies; import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import java.util.List; import java.util.Map; import java.util.TreeMap; import java.util.stream.Collectors; - +import java.util.stream.Stream; /** * This manages erasure coding policies predefined and activated in the system. @@ -38,6 +40,7 @@ import java.util.stream.Collectors; */ @InterfaceAudience.LimitedPrivate({"HDFS"}) public final class ErasureCodingPolicyManager { + private static final byte USER_DEFINED_POLICY_START_ID = 32; // Supported storage policies for striped EC files private static final byte[] SUITABLE_STORAGE_POLICIES_FOR_EC_STRIPED_MODE = @@ -46,17 +49,44 @@ public final class ErasureCodingPolicyManager { HdfsConstants.COLD_STORAGE_POLICY_ID, HdfsConstants.ALLSSD_STORAGE_POLICY_ID}; + /** + * All user defined policies sorted by name for fast querying. + */ + private Map userPoliciesByName; + + /** + * All user defined policies sorted by ID for fast querying. + */ + private Map userPoliciesByID; + /** * All enabled policies maintained in NN memory for fast querying, * identified and sorted by its name. */ - private final Map enabledPoliciesByName; + private Map enabledPoliciesByName; - ErasureCodingPolicyManager(Configuration conf) { + private volatile static ErasureCodingPolicyManager instance = null; + + public static ErasureCodingPolicyManager getInstance() { + if (instance == null) { + instance = new ErasureCodingPolicyManager(); + } + return instance; + } + + private ErasureCodingPolicyManager() {} + + public void init(Configuration conf) { + this.loadPolicies(conf); + } + + private void loadPolicies(Configuration conf) { // Populate the list of enabled policies from configuration final String[] policyNames = conf.getTrimmedStrings( DFSConfigKeys.DFS_NAMENODE_EC_POLICIES_ENABLED_KEY, DFSConfigKeys.DFS_NAMENODE_EC_POLICIES_ENABLED_DEFAULT); + this.userPoliciesByID = new TreeMap<>(); + this.userPoliciesByName = new TreeMap<>(); this.enabledPoliciesByName = new TreeMap<>(); for (String policyName : policyNames) { if (policyName.trim().isEmpty()) { @@ -119,6 +149,39 @@ public final class ErasureCodingPolicyManager { return isPolicySuitable; } + /** + * Get all system defined policies and user defined policies. + * @return all policies + */ + public List getPolicies() { + return Stream.concat(SystemErasureCodingPolicies.getPolicies().stream(), + this.userPoliciesByID.values().stream()).collect(Collectors.toList()); + } + + /** + * Get a policy by policy ID, including system policy and user defined policy. + * @return ecPolicy, or null if not found + */ + public ErasureCodingPolicy getByID(byte id) { + ErasureCodingPolicy policy = SystemErasureCodingPolicies.getByID(id); + if (policy == null) { + return this.userPoliciesByID.get(id); + } + return policy; + } + + /** + * Get a policy by policy ID, including system policy and user defined policy. + * @return ecPolicy, or null if not found + */ + public ErasureCodingPolicy getByName(String name) { + ErasureCodingPolicy policy = SystemErasureCodingPolicies.getByName(name); + if (policy == null) { + return this.userPoliciesByName.get(name); + } + return policy; + } + /** * Clear and clean up. */ @@ -126,4 +189,30 @@ public final class ErasureCodingPolicyManager { // TODO: we should only clear policies loaded from NN metadata. // This is a placeholder for HDFS-7337. } + + public synchronized void addPolicy(ErasureCodingPolicy policy) + throws IllegalECPolicyException { + String assignedNewName = ErasureCodingPolicy.composePolicyName( + policy.getSchema(), policy.getCellSize()); + for (ErasureCodingPolicy p : getPolicies()) { + if (p.getName().equals(assignedNewName)) { + throw new IllegalECPolicyException("The policy name already exists"); + } + if (p.getSchema().equals(policy.getSchema()) && + p.getCellSize() == policy.getCellSize()) { + throw new IllegalECPolicyException("A policy with same schema and " + + "cell size already exists"); + } + } + policy.setName(assignedNewName); + policy.setId(getNextAvailablePolicyID()); + this.userPoliciesByName.put(policy.getName(), policy); + this.userPoliciesByID.put(policy.getId(), policy); + } + + private byte getNextAvailablePolicyID() { + byte currentId = this.userPoliciesByID.keySet().stream() + .max(Byte::compareTo).orElse(USER_DEFINED_POLICY_START_ID); + return (byte) (currentId + 1); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirErasureCodingOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirErasureCodingOp.java index 763b935b303..aa0babd14f5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirErasureCodingOp.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirErasureCodingOp.java @@ -37,7 +37,7 @@ import org.apache.hadoop.fs.XAttrSetFlag; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.XAttrHelper; -import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies; +import org.apache.hadoop.hdfs.protocol.IllegalECPolicyException; import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp; @@ -210,6 +210,12 @@ final class FSDirErasureCodingOp { return fsd.getAuditFileInfo(iip); } + static void addErasureCodePolicy(final FSNamesystem fsn, + ErasureCodingPolicy policy) throws IllegalECPolicyException { + Preconditions.checkNotNull(policy); + fsn.getErasureCodingPolicyManager().addPolicy(policy); + } + private static List removeErasureCodingPolicyXAttr( final FSNamesystem fsn, final INodesInPath srcIIP) throws IOException { FSDirectory fsd = fsn.getFSDirectory(); @@ -319,7 +325,7 @@ final class FSDirErasureCodingOp { if (inode.isFile()) { byte id = inode.asFile().getErasureCodingPolicyID(); return id < 0 ? null : - SystemErasureCodingPolicies.getByID(id); + fsd.getFSNamesystem().getErasureCodingPolicyManager().getByID(id); } // We don't allow setting EC policies on paths with a symlink. Thus // if a symlink is encountered, the dir shouldn't have EC policy. @@ -334,7 +340,8 @@ final class FSDirErasureCodingOp { ByteArrayInputStream bIn = new ByteArrayInputStream(xattr.getValue()); DataInputStream dIn = new DataInputStream(bIn); String ecPolicyName = WritableUtils.readString(dIn); - return SystemErasureCodingPolicies.getByName(ecPolicyName); + return fsd.getFSNamesystem().getErasureCodingPolicyManager() + .getByName(ecPolicyName); } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java index 9c89be15484..9f8be89d64a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java @@ -39,7 +39,6 @@ import org.apache.hadoop.fs.permission.PermissionStatus; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.XAttr; import org.apache.hadoop.hdfs.protocol.Block; -import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies; import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto; @@ -336,7 +335,7 @@ public final class FSImageFormatPBINode { assert ((!isStriped) || (isStriped && !f.hasReplication())); Short replication = (!isStriped ? (short) f.getReplication() : null); ErasureCodingPolicy ecPolicy = isStriped ? - SystemErasureCodingPolicies.getByID( + fsn.getErasureCodingPolicyManager().getByID( (byte) f.getErasureCodingPolicyID()) : null; Byte ecPolicyID = (isStriped ? ecPolicy.getId() : null); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 3dbfdf9f935..103437a5080 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -175,8 +175,10 @@ import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.HAUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.UnknownCryptoProtocolVersionException; +import org.apache.hadoop.hdfs.protocol.AddingECPolicyResponse; import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException; import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.BlockType; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry; import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo; @@ -193,6 +195,7 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; +import org.apache.hadoop.hdfs.protocol.IllegalECPolicyException; import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; @@ -211,7 +214,6 @@ import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretMan import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; -import org.apache.hadoop.hdfs.protocol.BlockType; import org.apache.hadoop.hdfs.server.blockmanagement.BlockUnderConstructionFeature; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager; @@ -429,7 +431,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, private final BlockManager blockManager; private final SnapshotManager snapshotManager; private final CacheManager cacheManager; - private final ErasureCodingPolicyManager ecPolicyManager; private final DatanodeStatistics datanodeStatistics; private String nameserviceId; @@ -593,9 +594,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, leaseManager.removeAllLeases(); snapshotManager.clearSnapshottableDirs(); cacheManager.clear(); - ecPolicyManager.clear(); setImageLoaded(false); blockManager.clear(); + ErasureCodingPolicyManager.getInstance().clear(); } @VisibleForTesting @@ -846,7 +847,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, this.dir = new FSDirectory(this, conf); this.snapshotManager = new SnapshotManager(conf, dir); this.cacheManager = new CacheManager(this, conf, blockManager); - this.ecPolicyManager = new ErasureCodingPolicyManager(conf); + // Init ErasureCodingPolicyManager instance. + ErasureCodingPolicyManager.getInstance().init(conf); this.topConf = new TopConf(conf); this.auditLoggers = initAuditLoggers(conf); this.isDefaultAuditLogger = auditLoggers.size() == 1 && @@ -4831,7 +4833,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, NameNode.stateChangeLog.info("*DIR* reportBadBlocks for block: {} on" + " datanode: {}", blk, nodes[j].getXferAddr()); blockManager.findAndMarkBlockAsCorrupt(blk, nodes[j], - storageIDs == null ? null: storageIDs[j], + storageIDs == null ? null: storageIDs[j], "client machine reported it"); } } @@ -5753,7 +5755,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, /** @return the ErasureCodingPolicyManager. */ public ErasureCodingPolicyManager getErasureCodingPolicyManager() { - return ecPolicyManager; + return ErasureCodingPolicyManager.getInstance(); } @Override @@ -6823,6 +6825,28 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, resultingStat); } + /** + * Add multiple erasure coding policies to the ErasureCodingPolicyManager. + * @param policies The policies to add. + * @return The according result of add operation. + */ + AddingECPolicyResponse[] addECPolicies(ErasureCodingPolicy[] policies) + throws IOException { + checkOperation(OperationCategory.WRITE); + List responses = new ArrayList<>(); + writeLock(); + for (ErasureCodingPolicy policy : policies) { + try { + FSDirErasureCodingOp.addErasureCodePolicy(this, policy); + responses.add(new AddingECPolicyResponse(policy)); + } catch (IllegalECPolicyException e) { + responses.add(new AddingECPolicyResponse(policy, e)); + } + } + writeUnlock("addECPolicies"); + return responses.toArray(new AddingECPolicyResponse[0]); + } + /** * Unset an erasure coding policy from the given path. * @param srcArg The path of the target directory. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java index f35bf3ce5ba..9cdac06cc81 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java @@ -37,7 +37,6 @@ import org.apache.hadoop.fs.permission.PermissionStatus; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; -import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies; import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.QuotaExceededException; @@ -192,7 +191,7 @@ public class INodeFile extends INodeWithAdditionalFields if (blockType == STRIPED) { Preconditions.checkArgument(replication == null && erasureCodingPolicyID != null); - Preconditions.checkArgument(SystemErasureCodingPolicies + Preconditions.checkArgument(ErasureCodingPolicyManager.getInstance() .getByID(erasureCodingPolicyID) != null, "Could not find EC policy with ID 0x" + StringUtils .byteToHexString(erasureCodingPolicyID)); @@ -516,8 +515,8 @@ public class INodeFile extends INodeWithAdditionalFields return max; } - ErasureCodingPolicy ecPolicy = - SystemErasureCodingPolicies.getByID(getErasureCodingPolicyID()); + ErasureCodingPolicy ecPolicy = ErasureCodingPolicyManager.getInstance() + .getByID(getErasureCodingPolicyID()); Preconditions.checkNotNull(ecPolicy, "Could not find EC policy with ID 0x" + StringUtils.byteToHexString(getErasureCodingPolicyID())); return (short) (ecPolicy.getNumDataUnits() + ecPolicy.getNumParityUnits()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index ae6430f726f..2f969ff4d42 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -83,6 +83,7 @@ import org.apache.hadoop.hdfs.HDFSPolicyProvider; import org.apache.hadoop.hdfs.inotify.EventBatch; import org.apache.hadoop.hdfs.inotify.EventBatchList; import org.apache.hadoop.hdfs.protocol.AclException; +import org.apache.hadoop.hdfs.protocol.AddingECPolicyResponse; import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; @@ -2261,6 +2262,13 @@ public class NameNodeRpcServer implements NamenodeProtocols { } } + @Override + public AddingECPolicyResponse[] addErasureCodingPolicies( + ErasureCodingPolicy[] policies) throws IOException { + checkNNStartup(); + return namesystem.addECPolicies(policies); + } + @Override // ReconfigurationProtocol public void startReconfiguration() throws IOException { checkNNStartup(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/ECAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/ECAdmin.java index 52f75342173..b4debf5fc02 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/ECAdmin.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/ECAdmin.java @@ -22,7 +22,9 @@ import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.protocol.AddingECPolicyResponse; import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; +import org.apache.hadoop.hdfs.util.ECPolicyLoader; import org.apache.hadoop.tools.TableListing; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Tool; @@ -135,6 +137,66 @@ public class ECAdmin extends Configured implements Tool { } } + /** Command to add a set of erasure coding policies. */ + private static class AddECPoliciesCommand + implements AdminHelper.Command { + @Override + public String getName() { + return "-addPolicies"; + } + + @Override + public String getShortUsage() { + return "[" + getName() + " -policyFile ]\n"; + } + + @Override + public String getLongUsage() { + final TableListing listing = AdminHelper.getOptionDescriptionListing(); + listing.addRow("", + "The path of the xml file which defines the EC policies to add"); + return getShortUsage() + "\n" + + "Add a list of erasure coding policies.\n" + + listing.toString(); + } + + @Override + public int run(Configuration conf, List args) throws IOException { + final String filePath = + StringUtils.popOptionWithArgument("-policyFile", args); + if (filePath == null) { + System.err.println("Please specify the path with -policyFile.\nUsage: " + + getLongUsage()); + return 1; + } + + if (args.size() > 0) { + System.err.println(getName() + ": Too many arguments"); + return 1; + } + + final DistributedFileSystem dfs = AdminHelper.getDFS(conf); + try { + List policies = + new ECPolicyLoader().loadPolicy(filePath); + if (policies.size() > 0) { + AddingECPolicyResponse[] responses = dfs.addErasureCodingPolicies( + policies.toArray(new ErasureCodingPolicy[policies.size()])); + for (AddingECPolicyResponse response : responses) { + System.out.println(response); + } + } else { + System.out.println("No EC policy parsed out from " + filePath); + } + + } catch (IOException e) { + System.err.println(AdminHelper.prettifyException(e)); + return 2; + } + return 0; + } + } + /** Command to get the erasure coding policy for a file or directory */ private static class GetECPolicyCommand implements AdminHelper.Command { @Override @@ -301,6 +363,7 @@ public class ECAdmin extends Configured implements Tool { private static final AdminHelper.Command[] COMMANDS = { new ListECPoliciesCommand(), + new AddECPoliciesCommand(), new GetECPolicyCommand(), new SetECPolicyCommand(), new UnsetECPolicyCommand() diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSErasureCoding.md b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSErasureCoding.md index dbfb1118b37..e8065491774 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSErasureCoding.md +++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSErasureCoding.md @@ -178,3 +178,7 @@ Below are the details about each command. * `[-listPolicies]` Lists the set of enabled erasure coding policies. These names are suitable for use with the `setPolicy` command. + + * `[-addPolicies -policyFile ]` + + Add a list of erasure coding policies. Please refer etc/hadoop/user_ec_policies.xml.template for the example policy file. \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java index e9af5949214..f024fb6a5f2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java @@ -49,6 +49,8 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; @@ -75,14 +77,18 @@ import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.client.impl.LeaseRenewer; import org.apache.hadoop.hdfs.DFSOpsCountStatistics.OpType; import org.apache.hadoop.hdfs.net.Peer; +import org.apache.hadoop.hdfs.protocol.AddingECPolicyResponse; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction; import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.web.WebHdfsConstants; +import org.apache.hadoop.io.erasurecode.ECSchema; import org.apache.hadoop.net.DNSToSwitchMapping; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.ScriptBasedMapping; @@ -1443,4 +1449,37 @@ public class TestDistributedFileSystem { } } } + + @Test + public void testAddErasureCodingPolicies() throws Exception { + Configuration conf = getTestConfiguration(); + MiniDFSCluster cluster = null; + + try { + ErasureCodingPolicy policy1 = + SystemErasureCodingPolicies.getPolicies().get(0); + conf.set(DFSConfigKeys.DFS_NAMENODE_EC_POLICIES_ENABLED_KEY, + Stream.of(policy1).map(ErasureCodingPolicy::getName) + .collect(Collectors.joining(", "))); + + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); + DistributedFileSystem fs = cluster.getFileSystem(); + + ECSchema toAddSchema = new ECSchema("testcodec", 3, 2); + ErasureCodingPolicy toAddPolicy = + new ErasureCodingPolicy(toAddSchema, 128 * 1024, (byte) 254); + ErasureCodingPolicy[] policies = new ErasureCodingPolicy[]{ + policy1, toAddPolicy}; + AddingECPolicyResponse[] responses = + fs.addErasureCodingPolicies(policies); + assertEquals(2, responses.length); + assertFalse(responses[0].isSucceed()); + assertTrue(responses[1].isSucceed()); + assertTrue(responses[1].getPolicy().getId() > 0); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java index 7647ac45fb1..5d5260938bc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.protocolPB; import com.google.protobuf.UninitializedMessageException; +import org.apache.hadoop.hdfs.protocol.AddingECPolicyResponse; import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies; import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports; @@ -907,6 +908,40 @@ public class TestPBHelper { fsServerDefaults.getKeyProviderUri()); } + @Test + public void testConvertAddingECPolicyResponse() throws Exception { + // Check conversion of the built-in policies. + for (ErasureCodingPolicy policy : + SystemErasureCodingPolicies.getPolicies()) { + AddingECPolicyResponse response = new AddingECPolicyResponse(policy); + HdfsProtos.AddingECPolicyResponseProto proto = PBHelperClient + .convertAddingECPolicyResponse(response); + // Optional fields should not be set. + assertFalse("Unnecessary field is set.", proto.hasErrorMsg()); + // Convert proto back to an object and check for equality. + AddingECPolicyResponse convertedResponse = PBHelperClient + .convertAddingECPolicyResponse(proto); + assertEquals("Converted policy not equal", response.getPolicy(), + convertedResponse.getPolicy()); + assertEquals("Converted policy not equal", response.isSucceed(), + convertedResponse.isSucceed()); + } + + ErasureCodingPolicy policy = SystemErasureCodingPolicies + .getPolicies().get(0); + AddingECPolicyResponse response = + new AddingECPolicyResponse(policy, "failed"); + HdfsProtos.AddingECPolicyResponseProto proto = PBHelperClient + .convertAddingECPolicyResponse(response); + // Convert proto back to an object and check for equality. + AddingECPolicyResponse convertedResponse = PBHelperClient + .convertAddingECPolicyResponse(proto); + assertEquals("Converted policy not equal", response.getPolicy(), + convertedResponse.getPolicy()); + assertEquals("Converted policy not equal", response.getErrorMsg(), + convertedResponse.getErrorMsg()); + } + @Test public void testConvertErasureCodingPolicy() throws Exception { // Check conversion of the built-in policies. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEnabledECPolicies.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEnabledECPolicies.java index e35fa119ecb..fe95734cb37 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEnabledECPolicies.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEnabledECPolicies.java @@ -50,7 +50,7 @@ public class TestEnabledECPolicies { conf.set(DFSConfigKeys.DFS_NAMENODE_EC_POLICIES_ENABLED_KEY, value); try { - new ErasureCodingPolicyManager(conf); + ErasureCodingPolicyManager.getInstance().init(conf); fail("Expected exception when instantiating ECPolicyManager"); } catch (IllegalArgumentException e) { GenericTestUtils.assertExceptionContains("is not a valid policy", e); @@ -62,7 +62,9 @@ public class TestEnabledECPolicies { HdfsConfiguration conf = new HdfsConfiguration(); conf.set(DFSConfigKeys.DFS_NAMENODE_EC_POLICIES_ENABLED_KEY, value); - ErasureCodingPolicyManager manager = new ErasureCodingPolicyManager(conf); + ErasureCodingPolicyManager manager = + ErasureCodingPolicyManager.getInstance(); + manager.init(conf); assertEquals("Incorrect number of enabled policies", numEnabled, manager.getEnabledPolicies().length); } @@ -130,8 +132,9 @@ public class TestEnabledECPolicies { Arrays.asList(enabledPolicies).stream() .map(ErasureCodingPolicy::getName) .collect(Collectors.joining(", "))); - ErasureCodingPolicyManager manager = new ErasureCodingPolicyManager(conf); - + ErasureCodingPolicyManager manager = + ErasureCodingPolicyManager.getInstance(); + manager.init(conf); // Check that returned values are unique Set found = new HashSet<>(); for (ErasureCodingPolicy p : manager.getEnabledPolicies()) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/testErasureCodingConf.xml b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/testErasureCodingConf.xml index 0a71109d789..812852b3d7e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/testErasureCodingConf.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/testErasureCodingConf.xml @@ -144,6 +144,25 @@ + + help: addPolicies command + + -fs NAMENODE -help addPolicies + + + + + + SubstringComparator + Add a list of erasure coding policies + + + SubstringComparator + [-addPolicies -policyFile <file>] + + + + setPolicy : set erasure coding policy on a directory to encode files @@ -311,6 +330,25 @@ + + addPolicies : add a list of ECPolicies + + -fs NAMENODE -addPolicies -policyFile CLITEST_DATA/test_ec_policies.xml + + + + + + SubstringComparator + Add ErasureCodingPolicy XOR-2-1-128k succeed + + + SubstringComparator + Add ErasureCodingPolicy RS-6-3-64k failed + + + + setPolicy : illegal parameters - path is missing @@ -473,5 +511,35 @@ + + addPolicies : illegal parameters - policyFile is missing + + -fs NAMENODE -addPolicies /etc + + + + + + SubstringComparator + Please specify the path with -policyFile + + + + + + addPolicies : illegal parameters - too many parameters + + -fs NAMENODE -addPolicies -policyFile /ecdir /ecdir2 + + + + + + SubstringComparator + -addPolicies: Too many arguments + + + + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/test_ec_policies.xml b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/test_ec_policies.xml new file mode 100644 index 00000000000..b2416ac04a7 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/test_ec_policies.xml @@ -0,0 +1,65 @@ + + + + + + + +1 + + + + + + xor + 2 + 1 + + + + RS + 6 + 3 + + + + + + + + XORk2m1 + + 131072 + + + RSk6m3 + 65536 + + +