HDFS-11605. Allow user to customize new erasure code policies. Contributed by Huafeng Wang
This commit is contained in:
parent
371b6467dc
commit
cb672a45a0
|
@ -199,6 +199,7 @@ public final class ECSchema {
|
|||
return sb.toString();
|
||||
}
|
||||
|
||||
// Todo: Further use `extraOptions` to compare ECSchemas
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (o == null) {
|
||||
|
|
|
@ -101,6 +101,7 @@ import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
|
|||
import org.apache.hadoop.hdfs.client.impl.LeaseRenewer;
|
||||
import org.apache.hadoop.hdfs.net.Peer;
|
||||
import org.apache.hadoop.hdfs.protocol.AclException;
|
||||
import org.apache.hadoop.hdfs.protocol.AddingECPolicyResponse;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
||||
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
|
||||
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
|
||||
|
@ -2763,6 +2764,12 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
|
|||
}
|
||||
}
|
||||
|
||||
public AddingECPolicyResponse[] addErasureCodingPolicies(
|
||||
ErasureCodingPolicy[] policies) throws IOException {
|
||||
checkOpen();
|
||||
return namenode.addErasureCodingPolicies(policies);
|
||||
}
|
||||
|
||||
public DFSInotifyEventInputStream getInotifyEventStream() throws IOException {
|
||||
checkOpen();
|
||||
return new DFSInotifyEventInputStream(namenode, tracer);
|
||||
|
|
|
@ -70,6 +70,7 @@ import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
|||
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
|
||||
import org.apache.hadoop.hdfs.client.impl.CorruptFileBlockIterator;
|
||||
import org.apache.hadoop.hdfs.DFSOpsCountStatistics.OpType;
|
||||
import org.apache.hadoop.hdfs.protocol.AddingECPolicyResponse;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
||||
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
|
||||
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
|
||||
|
@ -2530,6 +2531,18 @@ public class DistributedFileSystem extends FileSystem {
|
|||
return Arrays.asList(dfs.getErasureCodingPolicies());
|
||||
}
|
||||
|
||||
/**
|
||||
* Add Erasure coding policies to HDFS.
|
||||
*
|
||||
* @param policies The user defined ec policy list to add.
|
||||
* @return Return the response list of adding operations.
|
||||
* @throws IOException
|
||||
*/
|
||||
public AddingECPolicyResponse[] addErasureCodingPolicies(
|
||||
ErasureCodingPolicy[] policies) throws IOException {
|
||||
return dfs.addErasureCodingPolicies(policies);
|
||||
}
|
||||
|
||||
/**
|
||||
* Unset the erasure coding policy from the source path.
|
||||
*
|
||||
|
|
|
@ -0,0 +1,66 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.protocol;
|
||||
|
||||
/**
|
||||
* A response of adding an ErasureCoding policy.
|
||||
*/
|
||||
public class AddingECPolicyResponse {
|
||||
private boolean succeed;
|
||||
private ErasureCodingPolicy policy;
|
||||
private String errorMsg;
|
||||
|
||||
public AddingECPolicyResponse(ErasureCodingPolicy policy) {
|
||||
this.policy = policy;
|
||||
this.succeed = true;
|
||||
}
|
||||
|
||||
public AddingECPolicyResponse(ErasureCodingPolicy policy,
|
||||
String errorMsg) {
|
||||
this.policy = policy;
|
||||
this.errorMsg = errorMsg;
|
||||
this.succeed = false;
|
||||
}
|
||||
|
||||
public AddingECPolicyResponse(ErasureCodingPolicy policy,
|
||||
IllegalECPolicyException e) {
|
||||
this(policy, e.getMessage());
|
||||
}
|
||||
|
||||
public boolean isSucceed() {
|
||||
return succeed;
|
||||
}
|
||||
|
||||
public ErasureCodingPolicy getPolicy() {
|
||||
return policy;
|
||||
}
|
||||
|
||||
public String getErrorMsg() {
|
||||
return errorMsg;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
if (isSucceed()) {
|
||||
return "Add ErasureCodingPolicy " + getPolicy().getName() + " succeed.";
|
||||
} else {
|
||||
return "Add ErasureCodingPolicy " + getPolicy().getName() + " failed and "
|
||||
+ "error message is " + getErrorMsg();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1520,6 +1520,17 @@ public interface ClientProtocol {
|
|||
void setErasureCodingPolicy(String src, String ecPolicyName)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* Add Erasure coding policies.
|
||||
*
|
||||
* @param policies The user defined ec policy list to add.
|
||||
* @return Return the response list of adding operations.
|
||||
* @throws IOException
|
||||
*/
|
||||
@AtMostOnce
|
||||
AddingECPolicyResponse[] addErasureCodingPolicies(
|
||||
ErasureCodingPolicy[] policies) throws IOException;
|
||||
|
||||
/**
|
||||
* Get the erasure coding policies loaded in Namenode
|
||||
*
|
||||
|
|
|
@ -31,10 +31,10 @@ import org.apache.hadoop.io.erasurecode.ECSchema;
|
|||
@InterfaceStability.Evolving
|
||||
public final class ErasureCodingPolicy {
|
||||
|
||||
private final String name;
|
||||
private final ECSchema schema;
|
||||
private final int cellSize;
|
||||
private final byte id;
|
||||
private String name;
|
||||
private byte id;
|
||||
|
||||
public ErasureCodingPolicy(String name, ECSchema schema,
|
||||
int cellSize, byte id) {
|
||||
|
@ -51,7 +51,7 @@ public final class ErasureCodingPolicy {
|
|||
this(composePolicyName(schema, cellSize), schema, cellSize, id);
|
||||
}
|
||||
|
||||
private static String composePolicyName(ECSchema schema, int cellSize) {
|
||||
public static String composePolicyName(ECSchema schema, int cellSize) {
|
||||
assert cellSize % 1024 == 0;
|
||||
return schema.getCodecName().toUpperCase() + "-" +
|
||||
schema.getNumDataUnits() + "-" + schema.getNumParityUnits() +
|
||||
|
@ -62,6 +62,10 @@ public final class ErasureCodingPolicy {
|
|||
return name;
|
||||
}
|
||||
|
||||
public void setName(String name) {
|
||||
this.name = name;
|
||||
}
|
||||
|
||||
public ECSchema getSchema() {
|
||||
return schema;
|
||||
}
|
||||
|
@ -86,6 +90,10 @@ public final class ErasureCodingPolicy {
|
|||
return id;
|
||||
}
|
||||
|
||||
public void setId(byte id) {
|
||||
this.id = id;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (o == null) {
|
||||
|
|
|
@ -0,0 +1,34 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.protocol;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
|
||||
/**
|
||||
* An Exception indicates the error when adding an ErasureCoding policy.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Evolving
|
||||
public class IllegalECPolicyException extends Exception {
|
||||
static final long serialVersionUID = 1L;
|
||||
|
||||
public IllegalECPolicyException(String msg) {
|
||||
super(msg);
|
||||
}
|
||||
}
|
|
@ -26,6 +26,7 @@ import java.util.List;
|
|||
import com.google.common.collect.Lists;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
|
@ -47,6 +48,7 @@ import org.apache.hadoop.fs.permission.FsAction;
|
|||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.hdfs.AddBlockFlag;
|
||||
import org.apache.hadoop.hdfs.inotify.EventBatchList;
|
||||
import org.apache.hadoop.hdfs.protocol.AddingECPolicyResponse;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
||||
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
|
||||
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
|
||||
|
@ -168,6 +170,8 @@ import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.CreateEncrypt
|
|||
import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.EncryptionZoneProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.GetEZForPathRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.ListEncryptionZonesRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.AddErasureCodingPoliciesRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.AddErasureCodingPoliciesResponseProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.GetErasureCodingPoliciesRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.GetErasureCodingPoliciesResponseProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.GetErasureCodingPolicyRequestProto;
|
||||
|
@ -1624,6 +1628,27 @@ public class ClientNamenodeProtocolTranslatorPB implements
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public AddingECPolicyResponse[] addErasureCodingPolicies(
|
||||
ErasureCodingPolicy[] policies) throws IOException {
|
||||
List<ErasureCodingPolicyProto> protos = Arrays.stream(policies)
|
||||
.map(PBHelperClient::convertErasureCodingPolicy)
|
||||
.collect(Collectors.toList());
|
||||
AddErasureCodingPoliciesRequestProto req =
|
||||
AddErasureCodingPoliciesRequestProto.newBuilder()
|
||||
.addAllEcPolicies(protos).build();
|
||||
try {
|
||||
AddErasureCodingPoliciesResponseProto rep = rpcProxy
|
||||
.addErasureCodingPolicies(null, req);
|
||||
AddingECPolicyResponse[] responses = rep.getResponsesList().stream()
|
||||
.map(PBHelperClient::convertAddingECPolicyResponse)
|
||||
.toArray(AddingECPolicyResponse[]::new);
|
||||
return responses;
|
||||
} catch (ServiceException e) {
|
||||
throw ProtobufHelper.getRemoteException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ErasureCodingPolicy[] getErasureCodingPolicies() throws IOException {
|
||||
try {
|
||||
|
|
|
@ -58,6 +58,7 @@ import org.apache.hadoop.hdfs.DFSUtilClient;
|
|||
import org.apache.hadoop.hdfs.inotify.Event;
|
||||
import org.apache.hadoop.hdfs.inotify.EventBatch;
|
||||
import org.apache.hadoop.hdfs.inotify.EventBatchList;
|
||||
import org.apache.hadoop.hdfs.protocol.AddingECPolicyResponse;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockType;
|
||||
|
@ -123,6 +124,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmS
|
|||
import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.EncryptionZoneProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.AccessModeProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.AddingECPolicyResponseProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockStoragePolicyProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockTypeProto;
|
||||
|
@ -2679,6 +2681,28 @@ public class PBHelperClient {
|
|||
return builder.build();
|
||||
}
|
||||
|
||||
public static AddingECPolicyResponseProto convertAddingECPolicyResponse(
|
||||
AddingECPolicyResponse response) {
|
||||
AddingECPolicyResponseProto.Builder builder =
|
||||
AddingECPolicyResponseProto.newBuilder()
|
||||
.setPolicy(convertErasureCodingPolicy(response.getPolicy()))
|
||||
.setSucceed(response.isSucceed());
|
||||
if (!response.isSucceed()) {
|
||||
builder.setErrorMsg(response.getErrorMsg());
|
||||
}
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
public static AddingECPolicyResponse convertAddingECPolicyResponse(
|
||||
AddingECPolicyResponseProto proto) {
|
||||
ErasureCodingPolicy policy = convertErasureCodingPolicy(proto.getPolicy());
|
||||
if (proto.getSucceed()) {
|
||||
return new AddingECPolicyResponse(policy);
|
||||
} else {
|
||||
return new AddingECPolicyResponse(policy, proto.getErrorMsg());
|
||||
}
|
||||
}
|
||||
|
||||
public static HdfsProtos.DatanodeInfosProto convertToProto(
|
||||
DatanodeInfo[] datanodeInfos) {
|
||||
HdfsProtos.DatanodeInfosProto.Builder builder =
|
||||
|
|
|
@ -32,6 +32,7 @@ import javax.xml.parsers.DocumentBuilderFactory;
|
|||
import javax.xml.parsers.ParserConfigurationException;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.net.MalformedURLException;
|
||||
import java.net.URL;
|
||||
import java.util.Map;
|
||||
import java.util.List;
|
||||
|
@ -59,17 +60,16 @@ public class ECPolicyLoader {
|
|||
* @return all valid EC policies in EC policy file
|
||||
*/
|
||||
public List<ErasureCodingPolicy> loadPolicy(String policyFilePath) {
|
||||
File policyFile = getPolicyFile(policyFilePath);
|
||||
if (policyFile == null) {
|
||||
LOG.warn("Not found any EC policy file");
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
try {
|
||||
File policyFile = getPolicyFile(policyFilePath);
|
||||
if (!policyFile.exists()) {
|
||||
LOG.warn("Not found any EC policy file");
|
||||
return Collections.emptyList();
|
||||
}
|
||||
return loadECPolicies(policyFile);
|
||||
} catch (ParserConfigurationException | IOException | SAXException e) {
|
||||
throw new RuntimeException("Failed to load EC policy file: "
|
||||
+ policyFile);
|
||||
+ policyFilePath);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -220,15 +220,12 @@ public class ECPolicyLoader {
|
|||
* @param policyFilePath path of EC policy file
|
||||
* @return EC policy file
|
||||
*/
|
||||
private File getPolicyFile(String policyFilePath) {
|
||||
private File getPolicyFile(String policyFilePath)
|
||||
throws MalformedURLException {
|
||||
File policyFile = new File(policyFilePath);
|
||||
if (!policyFile.isAbsolute()) {
|
||||
URL url = Thread.currentThread().getContextClassLoader()
|
||||
.getResource(policyFilePath);
|
||||
if (url == null) {
|
||||
LOG.warn(policyFilePath + " not found on the classpath.");
|
||||
policyFile = null;
|
||||
} else if (!url.getProtocol().equalsIgnoreCase("file")) {
|
||||
URL url = new URL(policyFilePath);
|
||||
if (!url.getProtocol().equalsIgnoreCase("file")) {
|
||||
throw new RuntimeException(
|
||||
"EC policy file " + url
|
||||
+ " found on the classpath is not on the local filesystem.");
|
||||
|
|
|
@ -907,6 +907,8 @@ service ClientNamenodeProtocol {
|
|||
returns(GetEditsFromTxidResponseProto);
|
||||
rpc getErasureCodingPolicies(GetErasureCodingPoliciesRequestProto)
|
||||
returns(GetErasureCodingPoliciesResponseProto);
|
||||
rpc addErasureCodingPolicies(AddErasureCodingPoliciesRequestProto)
|
||||
returns(AddErasureCodingPoliciesResponseProto);
|
||||
rpc getErasureCodingPolicy(GetErasureCodingPolicyRequestProto)
|
||||
returns(GetErasureCodingPolicyResponseProto);
|
||||
rpc getQuotaUsage(GetQuotaUsageRequestProto)
|
||||
|
|
|
@ -46,6 +46,14 @@ message GetErasureCodingPolicyResponseProto {
|
|||
optional ErasureCodingPolicyProto ecPolicy = 1;
|
||||
}
|
||||
|
||||
message AddErasureCodingPoliciesRequestProto {
|
||||
repeated ErasureCodingPolicyProto ecPolicies = 1;
|
||||
}
|
||||
|
||||
message AddErasureCodingPoliciesResponseProto {
|
||||
repeated AddingECPolicyResponseProto responses = 1;
|
||||
}
|
||||
|
||||
message UnsetErasureCodingPolicyRequestProto {
|
||||
required string src = 1;
|
||||
}
|
||||
|
|
|
@ -371,6 +371,12 @@ message ErasureCodingPolicyProto {
|
|||
required uint32 id = 4; // Actually a byte - only 8 bits used
|
||||
}
|
||||
|
||||
message AddingECPolicyResponseProto {
|
||||
required ErasureCodingPolicyProto policy = 1;
|
||||
required bool succeed = 2;
|
||||
optional string errorMsg = 3;
|
||||
}
|
||||
|
||||
/**
|
||||
* Status of a file, directory or symlink
|
||||
* Optionally includes a file's block locations if requested by client on the rpc call.
|
||||
|
|
|
@ -19,8 +19,10 @@ package org.apache.hadoop.hdfs.protocolPB;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.EnumSet;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
|
@ -33,6 +35,7 @@ import org.apache.hadoop.fs.CreateFlag;
|
|||
import org.apache.hadoop.fs.FsServerDefaults;
|
||||
import org.apache.hadoop.fs.Options.Rename;
|
||||
import org.apache.hadoop.fs.QuotaUsage;
|
||||
import org.apache.hadoop.hdfs.protocol.AddingECPolicyResponse;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
||||
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
|
||||
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
|
||||
|
@ -41,6 +44,7 @@ import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
|||
import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
|
||||
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
|
||||
import org.apache.hadoop.hdfs.protocol.EncryptionZone;
|
||||
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||
import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
|
||||
|
@ -209,6 +213,8 @@ import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.GetEZForPathR
|
|||
import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.GetEZForPathRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.ListEncryptionZonesResponseProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.ListEncryptionZonesRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.AddErasureCodingPoliciesRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.AddErasureCodingPoliciesResponseProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.GetErasureCodingPoliciesRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.GetErasureCodingPoliciesResponseProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.GetErasureCodingPolicyRequestProto;
|
||||
|
@ -217,6 +223,7 @@ import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.SetErasureCodin
|
|||
import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.SetErasureCodingPolicyResponseProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.UnsetErasureCodingPolicyRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.UnsetErasureCodingPolicyResponseProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockStoragePolicyProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeIDProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeInfoProto;
|
||||
|
@ -233,7 +240,6 @@ import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
|
|||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
||||
import org.apache.hadoop.io.EnumSetWritable;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
||||
import org.apache.hadoop.security.proto.SecurityProtos.CancelDelegationTokenRequestProto;
|
||||
import org.apache.hadoop.security.proto.SecurityProtos.CancelDelegationTokenResponseProto;
|
||||
import org.apache.hadoop.security.proto.SecurityProtos.GetDelegationTokenRequestProto;
|
||||
|
@ -1612,6 +1618,29 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public AddErasureCodingPoliciesResponseProto addErasureCodingPolicies(
|
||||
RpcController controller, AddErasureCodingPoliciesRequestProto request)
|
||||
throws ServiceException {
|
||||
try {
|
||||
ErasureCodingPolicy[] policies = request.getEcPoliciesList().stream()
|
||||
.map(PBHelperClient::convertErasureCodingPolicy)
|
||||
.toArray(ErasureCodingPolicy[]::new);
|
||||
AddingECPolicyResponse[] result = server
|
||||
.addErasureCodingPolicies(policies);
|
||||
|
||||
List<HdfsProtos.AddingECPolicyResponseProto> responseProtos = Arrays
|
||||
.stream(result).map(PBHelperClient::convertAddingECPolicyResponse)
|
||||
.collect(Collectors.toList());
|
||||
AddErasureCodingPoliciesResponseProto response =
|
||||
AddErasureCodingPoliciesResponseProto.newBuilder()
|
||||
.addAllResponses(responseProtos).build();
|
||||
return response;
|
||||
} catch (IOException e) {
|
||||
throw new ServiceException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetErasureCodingPolicyResponseProto getErasureCodingPolicy(RpcController controller,
|
||||
GetErasureCodingPolicyRequestProto request) throws ServiceException {
|
||||
|
|
|
@ -20,14 +20,16 @@ package org.apache.hadoop.hdfs.server.namenode;
|
|||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.protocol.IllegalECPolicyException;
|
||||
import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
|
||||
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.TreeMap;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import java.util.stream.Stream;
|
||||
|
||||
/**
|
||||
* This manages erasure coding policies predefined and activated in the system.
|
||||
|
@ -38,6 +40,7 @@ import java.util.stream.Collectors;
|
|||
*/
|
||||
@InterfaceAudience.LimitedPrivate({"HDFS"})
|
||||
public final class ErasureCodingPolicyManager {
|
||||
private static final byte USER_DEFINED_POLICY_START_ID = 32;
|
||||
|
||||
// Supported storage policies for striped EC files
|
||||
private static final byte[] SUITABLE_STORAGE_POLICIES_FOR_EC_STRIPED_MODE =
|
||||
|
@ -46,17 +49,44 @@ public final class ErasureCodingPolicyManager {
|
|||
HdfsConstants.COLD_STORAGE_POLICY_ID,
|
||||
HdfsConstants.ALLSSD_STORAGE_POLICY_ID};
|
||||
|
||||
/**
|
||||
* All user defined policies sorted by name for fast querying.
|
||||
*/
|
||||
private Map<String, ErasureCodingPolicy> userPoliciesByName;
|
||||
|
||||
/**
|
||||
* All user defined policies sorted by ID for fast querying.
|
||||
*/
|
||||
private Map<Byte, ErasureCodingPolicy> userPoliciesByID;
|
||||
|
||||
/**
|
||||
* All enabled policies maintained in NN memory for fast querying,
|
||||
* identified and sorted by its name.
|
||||
*/
|
||||
private final Map<String, ErasureCodingPolicy> enabledPoliciesByName;
|
||||
private Map<String, ErasureCodingPolicy> enabledPoliciesByName;
|
||||
|
||||
ErasureCodingPolicyManager(Configuration conf) {
|
||||
private volatile static ErasureCodingPolicyManager instance = null;
|
||||
|
||||
public static ErasureCodingPolicyManager getInstance() {
|
||||
if (instance == null) {
|
||||
instance = new ErasureCodingPolicyManager();
|
||||
}
|
||||
return instance;
|
||||
}
|
||||
|
||||
private ErasureCodingPolicyManager() {}
|
||||
|
||||
public void init(Configuration conf) {
|
||||
this.loadPolicies(conf);
|
||||
}
|
||||
|
||||
private void loadPolicies(Configuration conf) {
|
||||
// Populate the list of enabled policies from configuration
|
||||
final String[] policyNames = conf.getTrimmedStrings(
|
||||
DFSConfigKeys.DFS_NAMENODE_EC_POLICIES_ENABLED_KEY,
|
||||
DFSConfigKeys.DFS_NAMENODE_EC_POLICIES_ENABLED_DEFAULT);
|
||||
this.userPoliciesByID = new TreeMap<>();
|
||||
this.userPoliciesByName = new TreeMap<>();
|
||||
this.enabledPoliciesByName = new TreeMap<>();
|
||||
for (String policyName : policyNames) {
|
||||
if (policyName.trim().isEmpty()) {
|
||||
|
@ -119,6 +149,39 @@ public final class ErasureCodingPolicyManager {
|
|||
return isPolicySuitable;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get all system defined policies and user defined policies.
|
||||
* @return all policies
|
||||
*/
|
||||
public List<ErasureCodingPolicy> getPolicies() {
|
||||
return Stream.concat(SystemErasureCodingPolicies.getPolicies().stream(),
|
||||
this.userPoliciesByID.values().stream()).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a policy by policy ID, including system policy and user defined policy.
|
||||
* @return ecPolicy, or null if not found
|
||||
*/
|
||||
public ErasureCodingPolicy getByID(byte id) {
|
||||
ErasureCodingPolicy policy = SystemErasureCodingPolicies.getByID(id);
|
||||
if (policy == null) {
|
||||
return this.userPoliciesByID.get(id);
|
||||
}
|
||||
return policy;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a policy by policy ID, including system policy and user defined policy.
|
||||
* @return ecPolicy, or null if not found
|
||||
*/
|
||||
public ErasureCodingPolicy getByName(String name) {
|
||||
ErasureCodingPolicy policy = SystemErasureCodingPolicies.getByName(name);
|
||||
if (policy == null) {
|
||||
return this.userPoliciesByName.get(name);
|
||||
}
|
||||
return policy;
|
||||
}
|
||||
|
||||
/**
|
||||
* Clear and clean up.
|
||||
*/
|
||||
|
@ -126,4 +189,30 @@ public final class ErasureCodingPolicyManager {
|
|||
// TODO: we should only clear policies loaded from NN metadata.
|
||||
// This is a placeholder for HDFS-7337.
|
||||
}
|
||||
|
||||
public synchronized void addPolicy(ErasureCodingPolicy policy)
|
||||
throws IllegalECPolicyException {
|
||||
String assignedNewName = ErasureCodingPolicy.composePolicyName(
|
||||
policy.getSchema(), policy.getCellSize());
|
||||
for (ErasureCodingPolicy p : getPolicies()) {
|
||||
if (p.getName().equals(assignedNewName)) {
|
||||
throw new IllegalECPolicyException("The policy name already exists");
|
||||
}
|
||||
if (p.getSchema().equals(policy.getSchema()) &&
|
||||
p.getCellSize() == policy.getCellSize()) {
|
||||
throw new IllegalECPolicyException("A policy with same schema and " +
|
||||
"cell size already exists");
|
||||
}
|
||||
}
|
||||
policy.setName(assignedNewName);
|
||||
policy.setId(getNextAvailablePolicyID());
|
||||
this.userPoliciesByName.put(policy.getName(), policy);
|
||||
this.userPoliciesByID.put(policy.getId(), policy);
|
||||
}
|
||||
|
||||
private byte getNextAvailablePolicyID() {
|
||||
byte currentId = this.userPoliciesByID.keySet().stream()
|
||||
.max(Byte::compareTo).orElse(USER_DEFINED_POLICY_START_ID);
|
||||
return (byte) (currentId + 1);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -37,7 +37,7 @@ import org.apache.hadoop.fs.XAttrSetFlag;
|
|||
import org.apache.hadoop.fs.permission.FsAction;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.XAttrHelper;
|
||||
import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
|
||||
import org.apache.hadoop.hdfs.protocol.IllegalECPolicyException;
|
||||
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
|
||||
|
@ -210,6 +210,12 @@ final class FSDirErasureCodingOp {
|
|||
return fsd.getAuditFileInfo(iip);
|
||||
}
|
||||
|
||||
static void addErasureCodePolicy(final FSNamesystem fsn,
|
||||
ErasureCodingPolicy policy) throws IllegalECPolicyException {
|
||||
Preconditions.checkNotNull(policy);
|
||||
fsn.getErasureCodingPolicyManager().addPolicy(policy);
|
||||
}
|
||||
|
||||
private static List<XAttr> removeErasureCodingPolicyXAttr(
|
||||
final FSNamesystem fsn, final INodesInPath srcIIP) throws IOException {
|
||||
FSDirectory fsd = fsn.getFSDirectory();
|
||||
|
@ -319,7 +325,7 @@ final class FSDirErasureCodingOp {
|
|||
if (inode.isFile()) {
|
||||
byte id = inode.asFile().getErasureCodingPolicyID();
|
||||
return id < 0 ? null :
|
||||
SystemErasureCodingPolicies.getByID(id);
|
||||
fsd.getFSNamesystem().getErasureCodingPolicyManager().getByID(id);
|
||||
}
|
||||
// We don't allow setting EC policies on paths with a symlink. Thus
|
||||
// if a symlink is encountered, the dir shouldn't have EC policy.
|
||||
|
@ -334,7 +340,8 @@ final class FSDirErasureCodingOp {
|
|||
ByteArrayInputStream bIn = new ByteArrayInputStream(xattr.getValue());
|
||||
DataInputStream dIn = new DataInputStream(bIn);
|
||||
String ecPolicyName = WritableUtils.readString(dIn);
|
||||
return SystemErasureCodingPolicies.getByName(ecPolicyName);
|
||||
return fsd.getFSNamesystem().getErasureCodingPolicyManager()
|
||||
.getByName(ecPolicyName);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -39,7 +39,6 @@ import org.apache.hadoop.fs.permission.PermissionStatus;
|
|||
import org.apache.hadoop.fs.StorageType;
|
||||
import org.apache.hadoop.fs.XAttr;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
|
||||
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
|
||||
|
@ -336,7 +335,7 @@ public final class FSImageFormatPBINode {
|
|||
assert ((!isStriped) || (isStriped && !f.hasReplication()));
|
||||
Short replication = (!isStriped ? (short) f.getReplication() : null);
|
||||
ErasureCodingPolicy ecPolicy = isStriped ?
|
||||
SystemErasureCodingPolicies.getByID(
|
||||
fsn.getErasureCodingPolicyManager().getByID(
|
||||
(byte) f.getErasureCodingPolicyID()) : null;
|
||||
Byte ecPolicyID = (isStriped ? ecPolicy.getId() : null);
|
||||
|
||||
|
|
|
@ -175,8 +175,10 @@ import org.apache.hadoop.hdfs.DFSUtil;
|
|||
import org.apache.hadoop.hdfs.HAUtil;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.UnknownCryptoProtocolVersionException;
|
||||
import org.apache.hadoop.hdfs.protocol.AddingECPolicyResponse;
|
||||
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockType;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
||||
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
|
||||
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
|
||||
|
@ -193,6 +195,7 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
|||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||
import org.apache.hadoop.hdfs.protocol.IllegalECPolicyException;
|
||||
import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||
|
@ -211,7 +214,6 @@ import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretMan
|
|||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockType;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockUnderConstructionFeature;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
|
||||
|
@ -429,7 +431,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
private final BlockManager blockManager;
|
||||
private final SnapshotManager snapshotManager;
|
||||
private final CacheManager cacheManager;
|
||||
private final ErasureCodingPolicyManager ecPolicyManager;
|
||||
private final DatanodeStatistics datanodeStatistics;
|
||||
|
||||
private String nameserviceId;
|
||||
|
@ -593,9 +594,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
leaseManager.removeAllLeases();
|
||||
snapshotManager.clearSnapshottableDirs();
|
||||
cacheManager.clear();
|
||||
ecPolicyManager.clear();
|
||||
setImageLoaded(false);
|
||||
blockManager.clear();
|
||||
ErasureCodingPolicyManager.getInstance().clear();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
|
@ -846,7 +847,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
this.dir = new FSDirectory(this, conf);
|
||||
this.snapshotManager = new SnapshotManager(conf, dir);
|
||||
this.cacheManager = new CacheManager(this, conf, blockManager);
|
||||
this.ecPolicyManager = new ErasureCodingPolicyManager(conf);
|
||||
// Init ErasureCodingPolicyManager instance.
|
||||
ErasureCodingPolicyManager.getInstance().init(conf);
|
||||
this.topConf = new TopConf(conf);
|
||||
this.auditLoggers = initAuditLoggers(conf);
|
||||
this.isDefaultAuditLogger = auditLoggers.size() == 1 &&
|
||||
|
@ -4831,7 +4833,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
NameNode.stateChangeLog.info("*DIR* reportBadBlocks for block: {} on"
|
||||
+ " datanode: {}", blk, nodes[j].getXferAddr());
|
||||
blockManager.findAndMarkBlockAsCorrupt(blk, nodes[j],
|
||||
storageIDs == null ? null: storageIDs[j],
|
||||
storageIDs == null ? null: storageIDs[j],
|
||||
"client machine reported it");
|
||||
}
|
||||
}
|
||||
|
@ -5753,7 +5755,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
|
||||
/** @return the ErasureCodingPolicyManager. */
|
||||
public ErasureCodingPolicyManager getErasureCodingPolicyManager() {
|
||||
return ecPolicyManager;
|
||||
return ErasureCodingPolicyManager.getInstance();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -6823,6 +6825,28 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
resultingStat);
|
||||
}
|
||||
|
||||
/**
|
||||
* Add multiple erasure coding policies to the ErasureCodingPolicyManager.
|
||||
* @param policies The policies to add.
|
||||
* @return The according result of add operation.
|
||||
*/
|
||||
AddingECPolicyResponse[] addECPolicies(ErasureCodingPolicy[] policies)
|
||||
throws IOException {
|
||||
checkOperation(OperationCategory.WRITE);
|
||||
List<AddingECPolicyResponse> responses = new ArrayList<>();
|
||||
writeLock();
|
||||
for (ErasureCodingPolicy policy : policies) {
|
||||
try {
|
||||
FSDirErasureCodingOp.addErasureCodePolicy(this, policy);
|
||||
responses.add(new AddingECPolicyResponse(policy));
|
||||
} catch (IllegalECPolicyException e) {
|
||||
responses.add(new AddingECPolicyResponse(policy, e));
|
||||
}
|
||||
}
|
||||
writeUnlock("addECPolicies");
|
||||
return responses.toArray(new AddingECPolicyResponse[0]);
|
||||
}
|
||||
|
||||
/**
|
||||
* Unset an erasure coding policy from the given path.
|
||||
* @param srcArg The path of the target directory.
|
||||
|
|
|
@ -37,7 +37,6 @@ import org.apache.hadoop.fs.permission.PermissionStatus;
|
|||
import org.apache.hadoop.fs.StorageType;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
||||
import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
|
||||
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
|
||||
|
@ -192,7 +191,7 @@ public class INodeFile extends INodeWithAdditionalFields
|
|||
if (blockType == STRIPED) {
|
||||
Preconditions.checkArgument(replication == null &&
|
||||
erasureCodingPolicyID != null);
|
||||
Preconditions.checkArgument(SystemErasureCodingPolicies
|
||||
Preconditions.checkArgument(ErasureCodingPolicyManager.getInstance()
|
||||
.getByID(erasureCodingPolicyID) != null,
|
||||
"Could not find EC policy with ID 0x" + StringUtils
|
||||
.byteToHexString(erasureCodingPolicyID));
|
||||
|
@ -516,8 +515,8 @@ public class INodeFile extends INodeWithAdditionalFields
|
|||
return max;
|
||||
}
|
||||
|
||||
ErasureCodingPolicy ecPolicy =
|
||||
SystemErasureCodingPolicies.getByID(getErasureCodingPolicyID());
|
||||
ErasureCodingPolicy ecPolicy = ErasureCodingPolicyManager.getInstance()
|
||||
.getByID(getErasureCodingPolicyID());
|
||||
Preconditions.checkNotNull(ecPolicy, "Could not find EC policy with ID 0x"
|
||||
+ StringUtils.byteToHexString(getErasureCodingPolicyID()));
|
||||
return (short) (ecPolicy.getNumDataUnits() + ecPolicy.getNumParityUnits());
|
||||
|
|
|
@ -83,6 +83,7 @@ import org.apache.hadoop.hdfs.HDFSPolicyProvider;
|
|||
import org.apache.hadoop.hdfs.inotify.EventBatch;
|
||||
import org.apache.hadoop.hdfs.inotify.EventBatchList;
|
||||
import org.apache.hadoop.hdfs.protocol.AclException;
|
||||
import org.apache.hadoop.hdfs.protocol.AddingECPolicyResponse;
|
||||
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
||||
|
@ -2261,6 +2262,13 @@ public class NameNodeRpcServer implements NamenodeProtocols {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public AddingECPolicyResponse[] addErasureCodingPolicies(
|
||||
ErasureCodingPolicy[] policies) throws IOException {
|
||||
checkNNStartup();
|
||||
return namesystem.addECPolicies(policies);
|
||||
}
|
||||
|
||||
@Override // ReconfigurationProtocol
|
||||
public void startReconfiguration() throws IOException {
|
||||
checkNNStartup();
|
||||
|
|
|
@ -22,7 +22,9 @@ import org.apache.hadoop.conf.Configured;
|
|||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||
import org.apache.hadoop.hdfs.protocol.AddingECPolicyResponse;
|
||||
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
||||
import org.apache.hadoop.hdfs.util.ECPolicyLoader;
|
||||
import org.apache.hadoop.tools.TableListing;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.hadoop.util.Tool;
|
||||
|
@ -135,6 +137,66 @@ public class ECAdmin extends Configured implements Tool {
|
|||
}
|
||||
}
|
||||
|
||||
/** Command to add a set of erasure coding policies. */
|
||||
private static class AddECPoliciesCommand
|
||||
implements AdminHelper.Command {
|
||||
@Override
|
||||
public String getName() {
|
||||
return "-addPolicies";
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getShortUsage() {
|
||||
return "[" + getName() + " -policyFile <file>]\n";
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getLongUsage() {
|
||||
final TableListing listing = AdminHelper.getOptionDescriptionListing();
|
||||
listing.addRow("<file>",
|
||||
"The path of the xml file which defines the EC policies to add");
|
||||
return getShortUsage() + "\n" +
|
||||
"Add a list of erasure coding policies.\n" +
|
||||
listing.toString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int run(Configuration conf, List<String> args) throws IOException {
|
||||
final String filePath =
|
||||
StringUtils.popOptionWithArgument("-policyFile", args);
|
||||
if (filePath == null) {
|
||||
System.err.println("Please specify the path with -policyFile.\nUsage: "
|
||||
+ getLongUsage());
|
||||
return 1;
|
||||
}
|
||||
|
||||
if (args.size() > 0) {
|
||||
System.err.println(getName() + ": Too many arguments");
|
||||
return 1;
|
||||
}
|
||||
|
||||
final DistributedFileSystem dfs = AdminHelper.getDFS(conf);
|
||||
try {
|
||||
List<ErasureCodingPolicy> policies =
|
||||
new ECPolicyLoader().loadPolicy(filePath);
|
||||
if (policies.size() > 0) {
|
||||
AddingECPolicyResponse[] responses = dfs.addErasureCodingPolicies(
|
||||
policies.toArray(new ErasureCodingPolicy[policies.size()]));
|
||||
for (AddingECPolicyResponse response : responses) {
|
||||
System.out.println(response);
|
||||
}
|
||||
} else {
|
||||
System.out.println("No EC policy parsed out from " + filePath);
|
||||
}
|
||||
|
||||
} catch (IOException e) {
|
||||
System.err.println(AdminHelper.prettifyException(e));
|
||||
return 2;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
/** Command to get the erasure coding policy for a file or directory */
|
||||
private static class GetECPolicyCommand implements AdminHelper.Command {
|
||||
@Override
|
||||
|
@ -301,6 +363,7 @@ public class ECAdmin extends Configured implements Tool {
|
|||
|
||||
private static final AdminHelper.Command[] COMMANDS = {
|
||||
new ListECPoliciesCommand(),
|
||||
new AddECPoliciesCommand(),
|
||||
new GetECPolicyCommand(),
|
||||
new SetECPolicyCommand(),
|
||||
new UnsetECPolicyCommand()
|
||||
|
|
|
@ -178,3 +178,7 @@ Below are the details about each command.
|
|||
* `[-listPolicies]`
|
||||
|
||||
Lists the set of enabled erasure coding policies. These names are suitable for use with the `setPolicy` command.
|
||||
|
||||
* `[-addPolicies -policyFile <file>]`
|
||||
|
||||
Add a list of erasure coding policies. Please refer etc/hadoop/user_ec_policies.xml.template for the example policy file.
|
|
@ -49,6 +49,8 @@ import java.util.Set;
|
|||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.BlockLocation;
|
||||
|
@ -75,14 +77,18 @@ import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
|||
import org.apache.hadoop.hdfs.client.impl.LeaseRenewer;
|
||||
import org.apache.hadoop.hdfs.DFSOpsCountStatistics.OpType;
|
||||
import org.apache.hadoop.hdfs.net.Peer;
|
||||
import org.apache.hadoop.hdfs.protocol.AddingECPolicyResponse;
|
||||
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||
import org.apache.hadoop.hdfs.web.WebHdfsConstants;
|
||||
import org.apache.hadoop.io.erasurecode.ECSchema;
|
||||
import org.apache.hadoop.net.DNSToSwitchMapping;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.net.ScriptBasedMapping;
|
||||
|
@ -1443,4 +1449,37 @@ public class TestDistributedFileSystem {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAddErasureCodingPolicies() throws Exception {
|
||||
Configuration conf = getTestConfiguration();
|
||||
MiniDFSCluster cluster = null;
|
||||
|
||||
try {
|
||||
ErasureCodingPolicy policy1 =
|
||||
SystemErasureCodingPolicies.getPolicies().get(0);
|
||||
conf.set(DFSConfigKeys.DFS_NAMENODE_EC_POLICIES_ENABLED_KEY,
|
||||
Stream.of(policy1).map(ErasureCodingPolicy::getName)
|
||||
.collect(Collectors.joining(", ")));
|
||||
|
||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
|
||||
DistributedFileSystem fs = cluster.getFileSystem();
|
||||
|
||||
ECSchema toAddSchema = new ECSchema("testcodec", 3, 2);
|
||||
ErasureCodingPolicy toAddPolicy =
|
||||
new ErasureCodingPolicy(toAddSchema, 128 * 1024, (byte) 254);
|
||||
ErasureCodingPolicy[] policies = new ErasureCodingPolicy[]{
|
||||
policy1, toAddPolicy};
|
||||
AddingECPolicyResponse[] responses =
|
||||
fs.addErasureCodingPolicies(policies);
|
||||
assertEquals(2, responses.length);
|
||||
assertFalse(responses[0].isSucceed());
|
||||
assertTrue(responses[1].isSucceed());
|
||||
assertTrue(responses[1].getPolicy().getId() > 0);
|
||||
} finally {
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.protocolPB;
|
|||
|
||||
|
||||
import com.google.protobuf.UninitializedMessageException;
|
||||
import org.apache.hadoop.hdfs.protocol.AddingECPolicyResponse;
|
||||
import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
|
||||
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
|
||||
|
||||
|
@ -907,6 +908,40 @@ public class TestPBHelper {
|
|||
fsServerDefaults.getKeyProviderUri());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConvertAddingECPolicyResponse() throws Exception {
|
||||
// Check conversion of the built-in policies.
|
||||
for (ErasureCodingPolicy policy :
|
||||
SystemErasureCodingPolicies.getPolicies()) {
|
||||
AddingECPolicyResponse response = new AddingECPolicyResponse(policy);
|
||||
HdfsProtos.AddingECPolicyResponseProto proto = PBHelperClient
|
||||
.convertAddingECPolicyResponse(response);
|
||||
// Optional fields should not be set.
|
||||
assertFalse("Unnecessary field is set.", proto.hasErrorMsg());
|
||||
// Convert proto back to an object and check for equality.
|
||||
AddingECPolicyResponse convertedResponse = PBHelperClient
|
||||
.convertAddingECPolicyResponse(proto);
|
||||
assertEquals("Converted policy not equal", response.getPolicy(),
|
||||
convertedResponse.getPolicy());
|
||||
assertEquals("Converted policy not equal", response.isSucceed(),
|
||||
convertedResponse.isSucceed());
|
||||
}
|
||||
|
||||
ErasureCodingPolicy policy = SystemErasureCodingPolicies
|
||||
.getPolicies().get(0);
|
||||
AddingECPolicyResponse response =
|
||||
new AddingECPolicyResponse(policy, "failed");
|
||||
HdfsProtos.AddingECPolicyResponseProto proto = PBHelperClient
|
||||
.convertAddingECPolicyResponse(response);
|
||||
// Convert proto back to an object and check for equality.
|
||||
AddingECPolicyResponse convertedResponse = PBHelperClient
|
||||
.convertAddingECPolicyResponse(proto);
|
||||
assertEquals("Converted policy not equal", response.getPolicy(),
|
||||
convertedResponse.getPolicy());
|
||||
assertEquals("Converted policy not equal", response.getErrorMsg(),
|
||||
convertedResponse.getErrorMsg());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConvertErasureCodingPolicy() throws Exception {
|
||||
// Check conversion of the built-in policies.
|
||||
|
|
|
@ -50,7 +50,7 @@ public class TestEnabledECPolicies {
|
|||
conf.set(DFSConfigKeys.DFS_NAMENODE_EC_POLICIES_ENABLED_KEY,
|
||||
value);
|
||||
try {
|
||||
new ErasureCodingPolicyManager(conf);
|
||||
ErasureCodingPolicyManager.getInstance().init(conf);
|
||||
fail("Expected exception when instantiating ECPolicyManager");
|
||||
} catch (IllegalArgumentException e) {
|
||||
GenericTestUtils.assertExceptionContains("is not a valid policy", e);
|
||||
|
@ -62,7 +62,9 @@ public class TestEnabledECPolicies {
|
|||
HdfsConfiguration conf = new HdfsConfiguration();
|
||||
conf.set(DFSConfigKeys.DFS_NAMENODE_EC_POLICIES_ENABLED_KEY,
|
||||
value);
|
||||
ErasureCodingPolicyManager manager = new ErasureCodingPolicyManager(conf);
|
||||
ErasureCodingPolicyManager manager =
|
||||
ErasureCodingPolicyManager.getInstance();
|
||||
manager.init(conf);
|
||||
assertEquals("Incorrect number of enabled policies",
|
||||
numEnabled, manager.getEnabledPolicies().length);
|
||||
}
|
||||
|
@ -130,8 +132,9 @@ public class TestEnabledECPolicies {
|
|||
Arrays.asList(enabledPolicies).stream()
|
||||
.map(ErasureCodingPolicy::getName)
|
||||
.collect(Collectors.joining(", ")));
|
||||
ErasureCodingPolicyManager manager = new ErasureCodingPolicyManager(conf);
|
||||
|
||||
ErasureCodingPolicyManager manager =
|
||||
ErasureCodingPolicyManager.getInstance();
|
||||
manager.init(conf);
|
||||
// Check that returned values are unique
|
||||
Set<String> found = new HashSet<>();
|
||||
for (ErasureCodingPolicy p : manager.getEnabledPolicies()) {
|
||||
|
|
|
@ -144,6 +144,25 @@
|
|||
</comparators>
|
||||
</test>
|
||||
|
||||
<test>
|
||||
<description>help: addPolicies command</description>
|
||||
<test-commands>
|
||||
<ec-admin-command>-fs NAMENODE -help addPolicies</ec-admin-command>
|
||||
</test-commands>
|
||||
<cleanup-commands>
|
||||
</cleanup-commands>
|
||||
<comparators>
|
||||
<comparator>
|
||||
<type>SubstringComparator</type>
|
||||
<expected-output>Add a list of erasure coding policies</expected-output>
|
||||
</comparator>
|
||||
<comparator>
|
||||
<type>SubstringComparator</type>
|
||||
<expected-output>[-addPolicies -policyFile <file>]</expected-output>
|
||||
</comparator>
|
||||
</comparators>
|
||||
</test>
|
||||
|
||||
<!-- Test erasure code commands -->
|
||||
<test>
|
||||
<description>setPolicy : set erasure coding policy on a directory to encode files</description>
|
||||
|
@ -311,6 +330,25 @@
|
|||
</comparators>
|
||||
</test>
|
||||
|
||||
<test>
|
||||
<description>addPolicies : add a list of ECPolicies</description>
|
||||
<test-commands>
|
||||
<ec-admin-command>-fs NAMENODE -addPolicies -policyFile CLITEST_DATA/test_ec_policies.xml</ec-admin-command>
|
||||
</test-commands>
|
||||
<cleanup-commands>
|
||||
</cleanup-commands>
|
||||
<comparators>
|
||||
<comparator>
|
||||
<type>SubstringComparator</type>
|
||||
<expected-output>Add ErasureCodingPolicy XOR-2-1-128k succeed</expected-output>
|
||||
</comparator>
|
||||
<comparator>
|
||||
<type>SubstringComparator</type>
|
||||
<expected-output>Add ErasureCodingPolicy RS-6-3-64k failed</expected-output>
|
||||
</comparator>
|
||||
</comparators>
|
||||
</test>
|
||||
|
||||
<!-- Test illegal parameters -->
|
||||
<test>
|
||||
<description>setPolicy : illegal parameters - path is missing</description>
|
||||
|
@ -473,5 +511,35 @@
|
|||
</comparators>
|
||||
</test>
|
||||
|
||||
<test>
|
||||
<description>addPolicies : illegal parameters - policyFile is missing</description>
|
||||
<test-commands>
|
||||
<ec-admin-command>-fs NAMENODE -addPolicies /etc</ec-admin-command>
|
||||
</test-commands>
|
||||
<cleanup-commands>
|
||||
</cleanup-commands>
|
||||
<comparators>
|
||||
<comparator>
|
||||
<type>SubstringComparator</type>
|
||||
<expected-output>Please specify the path with -policyFile</expected-output>
|
||||
</comparator>
|
||||
</comparators>
|
||||
</test>
|
||||
|
||||
<test>
|
||||
<description>addPolicies : illegal parameters - too many parameters</description>
|
||||
<test-commands>
|
||||
<ec-admin-command>-fs NAMENODE -addPolicies -policyFile /ecdir /ecdir2</ec-admin-command>
|
||||
</test-commands>
|
||||
<cleanup-commands>
|
||||
</cleanup-commands>
|
||||
<comparators>
|
||||
<comparator>
|
||||
<type>SubstringComparator</type>
|
||||
<expected-output>-addPolicies: Too many arguments</expected-output>
|
||||
</comparator>
|
||||
</comparators>
|
||||
</test>
|
||||
|
||||
</tests>
|
||||
</configuration>
|
||||
|
|
|
@ -0,0 +1,65 @@
|
|||
<?xml version="1.0"?>
|
||||
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one
|
||||
or more contributor license agreements. See the NOTICE file
|
||||
distributed with this work for additional information
|
||||
regarding copyright ownership. The ASF licenses this file
|
||||
to you under the Apache License, Version 2.0 (the
|
||||
"License"); you may not use this file except in compliance
|
||||
with the License. You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
|
||||
<!--
|
||||
This is the template for user-defined EC policies configuration.
|
||||
All policies and schemas are defined within the 'configuration' tag
|
||||
which is the top level element for this XML document. The 'layoutversion'
|
||||
tag contains the version of EC policy XML file format, and user-defined EC
|
||||
schemas are included within the 'schemas' tag. The 'policies' tag
|
||||
contains all the user defined EC policies, and each policy consists of
|
||||
schema id and cellsize.
|
||||
-->
|
||||
<configuration>
|
||||
<!-- The version of EC policy XML file format, it must be an integer -->
|
||||
<layoutversion>1</layoutversion>
|
||||
<schemas>
|
||||
<!-- schema id is only used to reference internally in this document -->
|
||||
<schema id="XORk2m1">
|
||||
<!-- The combination of codec, k, m and options as the schema ID, defines
|
||||
a unique schema, for example 'xor-2-1'. schema ID is case insensitive -->
|
||||
<!-- codec with this specific name should exist already in this system -->
|
||||
<codec>xor</codec>
|
||||
<k>2</k>
|
||||
<m>1</m>
|
||||
<options> </options>
|
||||
</schema>
|
||||
<schema id="RSk6m3">
|
||||
<codec>RS</codec>
|
||||
<k>6</k>
|
||||
<m>3</m>
|
||||
<options> </options>
|
||||
</schema>
|
||||
</schemas>
|
||||
<policies>
|
||||
<policy>
|
||||
<!-- the combination of schema ID and cellsize(in unit k) defines a unique
|
||||
policy, for example 'xor-2-1-256k', case insensitive -->
|
||||
<!-- schema is referred by its id -->
|
||||
<schema>XORk2m1</schema>
|
||||
<!-- cellsize must be an positive integer multiple of 1024(1k) -->
|
||||
<cellsize>131072</cellsize>
|
||||
</policy>
|
||||
<policy>
|
||||
<schema>RSk6m3</schema>
|
||||
<cellsize>65536</cellsize>
|
||||
</policy>
|
||||
</policies>
|
||||
</configuration>
|
Loading…
Reference in New Issue