HDFS-15117. EC: Add getECTopologyResultForPolicies to DistributedFileSystem. Contributed by Ayush Saxena

This commit is contained in:
Ayush Saxena 2020-01-23 18:18:34 +05:30
parent 9520b2ad79
commit 92c58901d7
22 changed files with 438 additions and 96 deletions

View File

@ -119,6 +119,7 @@ import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.ECTopologyVerifierResult;
import org.apache.hadoop.hdfs.protocol.EncryptionZone;
import org.apache.hadoop.hdfs.protocol.EncryptionZoneIterator;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
@ -2808,6 +2809,17 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
}
}
public ECTopologyVerifierResult getECTopologyResultForPolicies(
final String... policyNames) throws IOException {
checkOpen();
try {
return namenode.getECTopologyResultForPolicies(policyNames);
} catch (RemoteException re) {
throw re.unwrapRemoteException(AccessControlException.class,
SafeModeException.class);
}
}
public void setXAttr(String src, String name, byte[] value,
EnumSet<XAttrSetFlag> flag) throws IOException {
checkOpen();

View File

@ -32,7 +32,6 @@ import org.apache.hadoop.crypto.key.KeyProviderTokenIssuer;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.BlockStoragePolicySpi;
import org.apache.hadoop.fs.CacheFlag;
import org.apache.hadoop.fs.CommonPathCapabilities;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataInputStream;
@ -84,6 +83,7 @@ import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.ECTopologyVerifierResult;
import org.apache.hadoop.hdfs.protocol.HdfsPartialListing;
import org.apache.hadoop.hdfs.protocol.EncryptionZone;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
@ -3198,6 +3198,18 @@ public class DistributedFileSystem extends FileSystem
}.resolve(this, absF);
}
/**
* Verifies if the given policies are supported in the given cluster setup.
* If not policy is specified checks for all enabled policies.
* @param policyNames name of policies.
* @return the result if the given policies are supported in the cluster setup
* @throws IOException
*/
public ECTopologyVerifierResult getECTopologyResultForPolicies(
final String... policyNames) throws IOException {
return dfs.getECTopologyResultForPolicies(policyNames);
}
/**
* Get the root directory of Trash for a path in HDFS.
* 1. File in encryption zone returns /ez1/.Trash/username

View File

@ -1761,6 +1761,18 @@ public interface ClientProtocol {
@AtMostOnce
void unsetErasureCodingPolicy(String src) throws IOException;
/**
* Verifies if the given policies are supported in the given cluster setup.
* If not policy is specified checks for all enabled policies.
* @param policyNames name of policies.
* @return the result if the given policies are supported in the cluster setup
* @throws IOException
*/
@Idempotent
@ReadOnly
ECTopologyVerifierResult getECTopologyResultForPolicies(String... policyNames)
throws IOException;
/**
* Get {@link QuotaUsage} rooted at the specified directory.
*

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
package org.apache.hadoop.hdfs.protocol;
import org.apache.hadoop.classification.InterfaceAudience;

View File

@ -66,6 +66,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.HdfsPartialListing;
import org.apache.hadoop.hdfs.protocol.ECBlockGroupStats;
import org.apache.hadoop.hdfs.protocol.ECTopologyVerifierResult;
import org.apache.hadoop.hdfs.protocol.EncryptionZone;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyInfo;
@ -221,6 +222,8 @@ import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.SetErasureCodin
import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.UnsetErasureCodingPolicyRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.CodecProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BatchedDirectoryListingProto;
import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.GetECTopologyResultForPoliciesRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.GetECTopologyResultForPoliciesResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ErasureCodingPolicyProto;
import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.GetXAttrsRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.ListXAttrsRequestProto;
@ -1665,10 +1668,9 @@ public class ClientNamenodeProtocolTranslatorPB implements
}
@Override
public void unsetErasureCodingPolicy(String src)
throws IOException {
public void unsetErasureCodingPolicy(String src) throws IOException {
final UnsetErasureCodingPolicyRequestProto.Builder builder =
ErasureCodingProtos.UnsetErasureCodingPolicyRequestProto.newBuilder();
UnsetErasureCodingPolicyRequestProto.newBuilder();
builder.setSrc(src);
UnsetErasureCodingPolicyRequestProto req = builder.build();
try {
@ -1678,6 +1680,23 @@ public class ClientNamenodeProtocolTranslatorPB implements
}
}
@Override
public ECTopologyVerifierResult getECTopologyResultForPolicies(
final String... policyNames) throws IOException {
final GetECTopologyResultForPoliciesRequestProto.Builder builder =
GetECTopologyResultForPoliciesRequestProto.newBuilder();
builder.addAllPolicies(Arrays.asList(policyNames));
GetECTopologyResultForPoliciesRequestProto req = builder.build();
try {
GetECTopologyResultForPoliciesResponseProto response =
rpcProxy.getECTopologyResultForPolicies(null, req);
return PBHelperClient
.convertECTopologyVerifierResultProto(response.getResponse());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
@Override
public void reencryptEncryptionZone(String zone, ReencryptAction action)
throws IOException {

View File

@ -83,6 +83,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates;
import org.apache.hadoop.hdfs.protocol.DatanodeLocalInfo;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.ECBlockGroupStats;
import org.apache.hadoop.hdfs.protocol.ECTopologyVerifierResult;
import org.apache.hadoop.hdfs.protocol.EncryptionZone;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyInfo;
@ -3318,6 +3319,21 @@ public class PBHelperClient {
return builder.build();
}
public static ECTopologyVerifierResult convertECTopologyVerifierResultProto(
HdfsProtos.ECTopologyVerifierResultProto resp) {
return new ECTopologyVerifierResult(resp.getIsSupported(),
resp.getResultMessage());
}
public static HdfsProtos.ECTopologyVerifierResultProto convertECTopologyVerifierResult(
ECTopologyVerifierResult resp) {
final HdfsProtos.ECTopologyVerifierResultProto.Builder builder =
HdfsProtos.ECTopologyVerifierResultProto.newBuilder()
.setIsSupported(resp.isSupported())
.setResultMessage(resp.getResultMessage());
return builder.build();
}
public static EnumSet<AddBlockFlag> convertAddBlockFlags(
List<AddBlockFlagProto> addBlockFlags) {
EnumSet<AddBlockFlag> flags =

View File

@ -1030,6 +1030,8 @@ service ClientNamenodeProtocol {
returns(SetErasureCodingPolicyResponseProto);
rpc unsetErasureCodingPolicy(UnsetErasureCodingPolicyRequestProto)
returns(UnsetErasureCodingPolicyResponseProto);
rpc getECTopologyResultForPolicies(GetECTopologyResultForPoliciesRequestProto)
returns(GetECTopologyResultForPoliciesResponseProto);
rpc getCurrentEditLogTxid(GetCurrentEditLogTxidRequestProto)
returns(GetCurrentEditLogTxidResponseProto);
rpc getEditsFromTxid(GetEditsFromTxidRequestProto)

View File

@ -89,6 +89,14 @@ message UnsetErasureCodingPolicyRequestProto {
message UnsetErasureCodingPolicyResponseProto {
}
message GetECTopologyResultForPoliciesRequestProto {
repeated string policies = 1;
}
message GetECTopologyResultForPoliciesResponseProto {
required ECTopologyVerifierResultProto response = 1;
}
/**
* Block erasure coding reconstruction info
*/

View File

@ -418,6 +418,11 @@ message AddErasureCodingPolicyResponseProto {
optional string errorMsg = 3;
}
message ECTopologyVerifierResultProto {
required string resultMessage = 1;
required bool isSupported = 2;
}
/**
* Placeholder type for consistent HDFS operations.
*/

View File

@ -74,7 +74,8 @@ public class TestReadOnly {
"getEditsFromTxid",
"getQuotaUsage",
"msync",
"getHAServiceState"
"getHAServiceState",
"getECTopologyResultForPolicies"
)
);

View File

@ -28,6 +28,7 @@ import java.util.Set;
import org.apache.hadoop.hdfs.protocol.AddErasureCodingPolicyResponse;
import org.apache.hadoop.hdfs.protocol.ECBlockGroupStats;
import org.apache.hadoop.hdfs.protocol.ECTopologyVerifierResult;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyInfo;
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
@ -178,6 +179,27 @@ public class ErasureCoding {
}
}
public ECTopologyVerifierResult getECTopologyResultForPolicies(
String[] policyNames) throws IOException {
RemoteMethod method = new RemoteMethod("getECTopologyResultForPolicies",
new Class<?>[] {String[].class}, new Object[] {policyNames});
Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
if (nss.isEmpty()) {
throw new IOException("No namespace availaible.");
}
Map<FederationNamespaceInfo, ECTopologyVerifierResult> ret = rpcClient
.invokeConcurrent(nss, method, true, false,
ECTopologyVerifierResult.class);
for (Map.Entry<FederationNamespaceInfo, ECTopologyVerifierResult> entry : ret
.entrySet()) {
if (!entry.getValue().isSupported()) {
return entry.getValue();
}
}
// If no negative result, return the result from the first namespace.
return ret.get(nss.iterator().next());
}
public ECBlockGroupStats getECBlockGroupStats() throws IOException {
rpcServer.checkOperation(OperationCategory.READ);

View File

@ -55,6 +55,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.ECBlockGroupStats;
import org.apache.hadoop.hdfs.protocol.ECTopologyVerifierResult;
import org.apache.hadoop.hdfs.protocol.EncryptionZone;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyInfo;
@ -1716,6 +1717,13 @@ public class RouterClientProtocol implements ClientProtocol {
erasureCoding.unsetErasureCodingPolicy(src);
}
@Override
public ECTopologyVerifierResult getECTopologyResultForPolicies(
String... policyNames) throws IOException {
rpcServer.checkOperation(NameNode.OperationCategory.UNCHECKED, true);
return erasureCoding.getECTopologyResultForPolicies(policyNames);
}
@Override
public ECBlockGroupStats getECBlockGroupStats() throws IOException {
return erasureCoding.getECBlockGroupStats();

View File

@ -77,6 +77,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.ECBlockGroupStats;
import org.apache.hadoop.hdfs.protocol.ECTopologyVerifierResult;
import org.apache.hadoop.hdfs.protocol.EncryptionZone;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyInfo;
@ -1309,6 +1310,12 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
clientProto.unsetErasureCodingPolicy(src);
}
@Override
public ECTopologyVerifierResult getECTopologyResultForPolicies(
String... policyNames) throws IOException {
return clientProto.getECTopologyResultForPolicies(policyNames);
}
@Override // ClientProtocol
public ECBlockGroupStats getECBlockGroupStats() throws IOException {
return clientProto.getECBlockGroupStats();

View File

@ -123,6 +123,8 @@ public class MiniRouterDFSCluster {
private int numDatanodesPerNameservice = 2;
/** Custom storage type for each datanode. */
private StorageType[][] storageTypes = null;
/** Racks for datanodes. */
private String[] racks = null;
/** Mini cluster. */
private MiniDFSCluster cluster;
@ -638,6 +640,14 @@ public class MiniRouterDFSCluster {
this.storageTypes = storageTypes;
}
/**
* Set racks for each datanode. If racks is uninitialized or passed null then
* default is used.
*/
public void setRacks(String[] racks) {
this.racks = racks;
}
/**
* Set the DNs to belong to only one subcluster.
*/
@ -794,6 +804,7 @@ public class MiniRouterDFSCluster {
.nnTopology(topology)
.dataNodeConfOverlays(dnConfs)
.storageTypes(storageTypes)
.racks(racks)
.build();
cluster.waitActive();

View File

@ -0,0 +1,129 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.router;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.*;
import org.apache.hadoop.hdfs.protocol.ECTopologyVerifierResult;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.NamenodeContext;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext;
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
import org.apache.hadoop.hdfs.server.federation.resolver.MultipleDestinationMountTableResolver;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.IOException;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
/**
* Test class with clusters having multiple racks.
*/
public class TestRouterMultiRack {
private static StateStoreDFSCluster cluster;
private static RouterContext routerContext;
private static DistributedFileSystem routerFs;
private static NamenodeContext nnContext0;
private static NamenodeContext nnContext1;
private static DistributedFileSystem nnFs0;
private static DistributedFileSystem nnFs1;
@BeforeClass
public static void setUp() throws Exception {
// Build and start a federated cluster
cluster = new StateStoreDFSCluster(false, 2,
MultipleDestinationMountTableResolver.class);
Configuration routerConf =
new RouterConfigBuilder().stateStore().admin().quota().rpc().build();
Configuration hdfsConf = new Configuration(false);
cluster.addNamenodeOverrides(hdfsConf);
cluster.addRouterOverrides(routerConf);
cluster.setNumDatanodesPerNameservice(9);
cluster.setIndependentDNs();
cluster.setRacks(
new String[] {"/rack1", "/rack1", "/rack1", "/rack2", "/rack2",
"/rack2", "/rack3", "/rack3", "/rack3", "/rack4", "/rack4",
"/rack4", "/rack5", "/rack5", "/rack5", "/rack6", "/rack6",
"/rack6"});
cluster.startCluster();
cluster.startRouters();
cluster.waitClusterUp();
routerContext = cluster.getRandomRouter();
routerFs = (DistributedFileSystem) routerContext.getFileSystem();
nnContext0 = cluster.getNamenode("ns0", null);
nnContext1 = cluster.getNamenode("ns1", null);
nnFs0 = (DistributedFileSystem) nnContext0.getFileSystem();
nnFs1 = (DistributedFileSystem) nnContext1.getFileSystem();
}
@AfterClass
public static void tearDown() {
if (cluster != null) {
cluster.stopRouter(routerContext);
cluster.shutdown();
cluster = null;
}
}
@Test
public void testGetECTopologyResultForPolicies() throws IOException {
routerFs.enableErasureCodingPolicy("RS-6-3-1024k");
// No policies specified should return result for the enabled policy.
ECTopologyVerifierResult result = routerFs.getECTopologyResultForPolicies();
assertTrue(result.isSupported());
// Specified policy requiring more datanodes than present in
// the actual cluster.
result = routerFs.getECTopologyResultForPolicies("RS-10-4-1024k");
assertFalse(result.isSupported());
// Specify multiple policies with one policy requiring more datanodes than
// present in the actual cluster
result = routerFs
.getECTopologyResultForPolicies("RS-10-4-1024k", "RS-3-2-1024k");
assertFalse(result.isSupported());
// Specify multiple policies that require datanodes equal or less then
// present in the actual cluster
result = routerFs
.getECTopologyResultForPolicies("XOR-2-1-1024k", "RS-3-2-1024k");
assertTrue(result.isSupported());
// Specify multiple policies with one policy requiring more datanodes than
// present in the actual cluster
result = routerFs
.getECTopologyResultForPolicies("RS-10-4-1024k", "RS-3-2-1024k");
assertFalse(result.isSupported());
// Enable a policy requiring more datanodes than present in
// the actual cluster.
routerFs.enableErasureCodingPolicy("RS-10-4-1024k");
result = routerFs.getECTopologyResultForPolicies();
assertFalse(result.isSupported());
// Check without specifying any policy, with one cluster having
// all supported, but one cluster having one unsupported policy. The
nnFs0.disableErasureCodingPolicy("RS-10-4-1024k");
nnFs1.enableErasureCodingPolicy("RS-10-4-1024k");
result = routerFs.getECTopologyResultForPolicies();
assertFalse(result.isSupported());
nnFs1.disableErasureCodingPolicy("RS-10-4-1024k");
nnFs0.enableErasureCodingPolicy("RS-10-4-1024k");
result = routerFs.getECTopologyResultForPolicies();
assertFalse(result.isSupported());
}
}

View File

@ -26,6 +26,7 @@ import java.util.Map;
import java.util.stream.Collectors;
import com.google.protobuf.ByteString;
import com.google.protobuf.ProtocolStringList;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries;
@ -50,6 +51,7 @@ import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.HdfsPartialListing;
import org.apache.hadoop.hdfs.protocol.ECTopologyVerifierResult;
import org.apache.hadoop.hdfs.protocol.EncryptionZone;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyInfo;
@ -267,6 +269,8 @@ import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.UnsetErasureCod
import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.UnsetErasureCodingPolicyResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.GetErasureCodingCodecsRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.GetErasureCodingCodecsResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.GetECTopologyResultForPoliciesRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.GetECTopologyResultForPoliciesResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BatchedDirectoryListingProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockStoragePolicyProto;
@ -1679,6 +1683,24 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
}
}
@Override
public GetECTopologyResultForPoliciesResponseProto getECTopologyResultForPolicies(
RpcController controller, GetECTopologyResultForPoliciesRequestProto req)
throws ServiceException {
try {
ProtocolStringList policies = req.getPoliciesList();
ECTopologyVerifierResult result = server.getECTopologyResultForPolicies(
policies.toArray(policies.toArray(new String[policies.size()])));
GetECTopologyResultForPoliciesResponseProto.Builder builder =
GetECTopologyResultForPoliciesResponseProto.newBuilder();
builder
.setResponse(PBHelperClient.convertECTopologyVerifierResult(result));
return builder.build();
} catch (IOException e) {
throw new ServiceException(e);
}
}
@Override
public SetXAttrResponseProto setXAttr(RpcController controller,
SetXAttrRequestProto req) throws ServiceException {

View File

@ -18,12 +18,12 @@ package org.apache.hadoop.hdfs.server.common;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ECTopologyVerifierResult;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.server.namenode.ECTopologyVerifierResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;
@ -52,7 +52,8 @@ public final class ECTopologyVerifier {
* @return the status of the verification
*/
public static ECTopologyVerifierResult getECTopologyVerifierResult(
final DatanodeInfo[] report, final ErasureCodingPolicy... policies) {
final DatanodeInfo[] report,
final Collection<ErasureCodingPolicy> policies) {
final int numOfRacks = getNumberOfRacks(report);
return getECTopologyVerifierResult(numOfRacks, report.length, policies);
}
@ -60,14 +61,14 @@ public final class ECTopologyVerifier {
/**
* Verifies whether the cluster setup can support all enabled EC policies.
*
* @param policies erasure coding policies to verify
* @param numOfRacks number of racks
* @param numOfDataNodes number of data nodes
* @param policies erasure coding policies to verify
* @return the status of the verification
*/
public static ECTopologyVerifierResult getECTopologyVerifierResult(
final int numOfRacks, final int numOfDataNodes,
final ErasureCodingPolicy... policies) {
final Collection<ErasureCodingPolicy> policies) {
int minDN = 0;
int minRack = 0;
for (ErasureCodingPolicy policy: policies) {
@ -127,10 +128,8 @@ public final class ECTopologyVerifier {
}
private static String getReadablePolicies(
final ErasureCodingPolicy... policies) {
return Arrays.asList(policies)
.stream()
.map(policyInfo -> policyInfo.getName())
final Collection<ErasureCodingPolicy> policies) {
return policies.stream().map(policyInfo -> policyInfo.getName())
.collect(Collectors.joining(", "));
}
}

View File

@ -68,7 +68,7 @@ final class FSDirErasureCodingOp {
* @return an erasure coding policy if ecPolicyName is valid and enabled
* @throws IOException
*/
static ErasureCodingPolicy getErasureCodingPolicyByName(
static ErasureCodingPolicy getEnabledErasureCodingPolicyByName(
final FSNamesystem fsn, final String ecPolicyName) throws IOException {
assert fsn.hasReadLock();
ErasureCodingPolicy ecPolicy = fsn.getErasureCodingPolicyManager()
@ -92,6 +92,27 @@ final class FSDirErasureCodingOp {
return ecPolicy;
}
/**
* Check if the ecPolicyName is valid, return the corresponding
* EC policy if is, including the REPLICATION EC policy.
* @param fsn namespace
* @param ecPolicyName name of EC policy to be checked
* @return an erasure coding policy if ecPolicyName is valid
* @throws IOException
*/
static ErasureCodingPolicy getErasureCodingPolicyByName(
final FSNamesystem fsn, final String ecPolicyName) throws IOException {
assert fsn.hasReadLock();
ErasureCodingPolicy ecPolicy = fsn.getErasureCodingPolicyManager()
.getErasureCodingPolicyByName(ecPolicyName);
if (ecPolicy == null) {
throw new HadoopIllegalArgumentException(
"The given erasure coding " + "policy " + ecPolicyName
+ " does not exist.");
}
return ecPolicy;
}
/**
* Set an erasure coding policy on the given path.
*
@ -118,7 +139,7 @@ final class FSDirErasureCodingOp {
List<XAttr> xAttrs;
fsd.writeLock();
try {
ErasureCodingPolicy ecPolicy = getErasureCodingPolicyByName(fsn,
ErasureCodingPolicy ecPolicy = getEnabledErasureCodingPolicyByName(fsn,
ecPolicyName);
iip = fsd.resolvePath(pc, src, DirOp.WRITE_LINK);
// Write access is required to set erasure coding policy
@ -374,7 +395,7 @@ final class FSDirErasureCodingOp {
String ecPolicyName, INodesInPath iip) throws IOException {
ErasureCodingPolicy ecPolicy;
if (!StringUtils.isEmpty(ecPolicyName)) {
ecPolicy = FSDirErasureCodingOp.getErasureCodingPolicyByName(
ecPolicy = FSDirErasureCodingOp.getEnabledErasureCodingPolicyByName(
fsn, ecPolicyName);
} else {
ecPolicy = FSDirErasureCodingOp.unprotectedGetErasureCodingPolicy(

View File

@ -91,6 +91,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SNAPSHOT_DIFF_LI
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SNAPSHOT_DIFF_LISTING_LIMIT_DEFAULT;
import static org.apache.hadoop.hdfs.DFSUtil.isParentEntry;
import org.apache.hadoop.hdfs.protocol.ECTopologyVerifierResult;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY;
import static org.apache.hadoop.hdfs.server.namenode.FSDirStatAndListingOp.*;
@ -7980,6 +7981,48 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
logAuditEvent(true, operationName, srcArg, null, resultingStat);
}
/**
* Verifies if the given policies are supported in the given cluster setup.
* If not policy is specified checks for all enabled policies.
* @param policyNames name of policies.
* @return the result if the given policies are supported in the cluster setup
* @throws IOException
*/
public ECTopologyVerifierResult getECTopologyResultForPolicies(
String[] policyNames) throws IOException {
String operationName = "getECTopologyResultForPolicies";
checkSuperuserPrivilege(operationName);
checkOperation(OperationCategory.UNCHECKED);
ECTopologyVerifierResult result;
readLock();
try {
checkOperation(OperationCategory.UNCHECKED);
// If no policy name is specified return the result
// for all enabled policies.
if (policyNames == null || policyNames.length == 0) {
result = getEcTopologyVerifierResultForEnabledPolicies();
} else {
Collection<ErasureCodingPolicy> policies =
new ArrayList<ErasureCodingPolicy>();
for (int i = 0; i < policyNames.length; i++) {
policies.add(FSDirErasureCodingOp
.getErasureCodingPolicyByName(this, policyNames[i]));
}
int numOfDataNodes =
getBlockManager().getDatanodeManager().getNumOfDataNodes();
int numOfRacks =
getBlockManager().getDatanodeManager().getNetworkTopology()
.getNumOfRacks();
result = ECTopologyVerifier
.getECTopologyVerifierResult(numOfRacks, numOfDataNodes, policies);
}
} finally {
readUnlock();
}
logAuditEvent(true, operationName, null);
return result;
}
/**
* Get the erasure coding policy information for specified path
*/
@ -8385,15 +8428,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
@Override // NameNodeMXBean
public String getVerifyECWithTopologyResult() {
int numOfDataNodes = getBlockManager().getDatanodeManager()
.getNumOfDataNodes();
int numOfRacks = getBlockManager().getDatanodeManager()
.getNetworkTopology().getNumOfRacks();
ErasureCodingPolicy[] enabledEcPolicies =
getErasureCodingPolicyManager().getCopyOfEnabledPolicies();
ECTopologyVerifierResult result =
ECTopologyVerifier.getECTopologyVerifierResult(
numOfRacks, numOfDataNodes, enabledEcPolicies);
getEcTopologyVerifierResultForEnabledPolicies();
Map<String, String> resultMap = new HashMap<String, String>();
resultMap.put("isSupported", Boolean.toString(result.isSupported()));
@ -8401,6 +8437,18 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
return JSON.toString(resultMap);
}
private ECTopologyVerifierResult getEcTopologyVerifierResultForEnabledPolicies() {
int numOfDataNodes =
getBlockManager().getDatanodeManager().getNumOfDataNodes();
int numOfRacks = getBlockManager().getDatanodeManager().getNetworkTopology()
.getNumOfRacks();
ErasureCodingPolicy[] enabledEcPolicies =
getErasureCodingPolicyManager().getCopyOfEnabledPolicies();
return ECTopologyVerifier
.getECTopologyVerifierResult(numOfRacks, numOfDataNodes,
Arrays.asList(enabledEcPolicies));
}
// This method logs operatoinName without super user privilege.
// It should be called without holding FSN lock.
void checkSuperuserPrivilege(String operationName)

View File

@ -102,6 +102,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.ECBlockGroupStats;
import org.apache.hadoop.hdfs.protocol.ECTopologyVerifierResult;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.EncryptionZone;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyInfo;
@ -2530,6 +2531,12 @@ public class NameNodeRpcServer implements NamenodeProtocols {
}
}
@Override
public ECTopologyVerifierResult getECTopologyResultForPolicies(
String... policyNames) throws IOException {
return namesystem.getECTopologyResultForPolicies(policyNames);
}
@Override
public AddErasureCodingPolicyResponse[] addErasureCodingPolicies(
ErasureCodingPolicy[] policies) throws IOException {

View File

@ -26,15 +26,13 @@ import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.shell.CommandFormat;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.AddErasureCodingPolicyResponse;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ECTopologyVerifierResult;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.NoECPolicySetException;
import org.apache.hadoop.hdfs.server.common.ECTopologyVerifier;
import org.apache.hadoop.hdfs.server.namenode.ECTopologyVerifierResult;
import org.apache.hadoop.hdfs.util.ECPolicyLoader;
import org.apache.hadoop.io.erasurecode.ErasureCodeConstants;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.tools.TableListing;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Tool;
@ -536,14 +534,14 @@ public class ECAdmin extends Configured implements Tool {
final DistributedFileSystem dfs = AdminHelper.getDFS(conf);
try {
dfs.enableErasureCodingPolicy(ecPolicyName);
System.out.println("Erasure coding policy " + ecPolicyName +
" is enabled");
System.out
.println("Erasure coding policy " + ecPolicyName + " is enabled");
ECTopologyVerifierResult result =
getECTopologyVerifierResultForPolicy(dfs, ecPolicyName);
dfs.getECTopologyResultForPolicies(ecPolicyName);
if (!result.isSupported()) {
System.err.println("Warning: The cluster setup does not support " +
"EC policy " + ecPolicyName + ". Reason: " +
result.getResultMessage());
System.err.println(
"Warning: The cluster setup does not support " + "EC policy "
+ ecPolicyName + ". Reason: " + result.getResultMessage());
}
} catch (IOException e) {
System.err.println(AdminHelper.prettifyException(e));
@ -630,14 +628,21 @@ public class ECAdmin extends Configured implements Tool {
public int run(Configuration conf, List<String> args) throws IOException {
boolean isPolicyOption = StringUtils.popOption("-policy", args);
final DistributedFileSystem dfs = AdminHelper.getDFS(conf);
ECTopologyVerifierResult result;
ECTopologyVerifierResult result = null;
if (isPolicyOption) {
CommandFormat c = new CommandFormat(1, Integer.MAX_VALUE);
c.parse(args);
String[] parameters = args.toArray(new String[args.size()]);
result = getECTopologyResultForPolicies(dfs, parameters);
try {
result = dfs.getECTopologyResultForPolicies(parameters);
} catch (RemoteException e) {
if (e.getClassName().contains("HadoopIllegalArgumentException")) {
throw new HadoopIllegalArgumentException(e.getMessage());
}
throw e;
}
} else {
result = getECTopologyVerifierResult(dfs);
result = dfs.getECTopologyResultForPolicies();
}
System.out.println(result.getResultMessage());
if (result.isSupported()) {
@ -647,62 +652,6 @@ public class ECAdmin extends Configured implements Tool {
}
}
private static ECTopologyVerifierResult getECTopologyVerifierResult(
final DistributedFileSystem dfs) throws IOException {
final ErasureCodingPolicyInfo[] policies =
dfs.getClient().getNamenode().getErasureCodingPolicies();
final DatanodeInfo[] report = dfs.getClient().getNamenode()
.getDatanodeReport(HdfsConstants.DatanodeReportType.ALL);
return ECTopologyVerifier.getECTopologyVerifierResult(report,
getEnabledPolicies(policies));
}
private static ECTopologyVerifierResult getECTopologyResultForPolicies(
final DistributedFileSystem dfs, final String... policyNames)
throws IOException {
ErasureCodingPolicy[] policies =
new ErasureCodingPolicy[policyNames.length];
for (int i = 0; i < policyNames.length; i++) {
policies[i] =
getPolicy(dfs.getClient().getNamenode().getErasureCodingPolicies(),
policyNames[i]);
}
final DatanodeInfo[] report = dfs.getClient().getNamenode()
.getDatanodeReport(HdfsConstants.DatanodeReportType.ALL);
return ECTopologyVerifier.getECTopologyVerifierResult(report, policies);
}
private static ECTopologyVerifierResult getECTopologyVerifierResultForPolicy(
final DistributedFileSystem dfs, final String policyName)
throws IOException {
final ErasureCodingPolicy policy =
getPolicy(dfs.getClient().getNamenode().getErasureCodingPolicies(),
policyName);
final DatanodeInfo[] report = dfs.getClient().getNamenode()
.getDatanodeReport(HdfsConstants.DatanodeReportType.ALL);
return ECTopologyVerifier.getECTopologyVerifierResult(report, policy);
}
private static ErasureCodingPolicy getPolicy(
final ErasureCodingPolicyInfo[] policies, final String policyName) {
for (ErasureCodingPolicyInfo policy : policies) {
if (policyName.equals(policy.getPolicy().getName())) {
return policy.getPolicy();
}
}
throw new HadoopIllegalArgumentException("The given erasure coding " +
"policy " + policyName + " does not exist.");
}
private static ErasureCodingPolicy[] getEnabledPolicies(
final ErasureCodingPolicyInfo[] policies) {
return Arrays.asList(policies).stream()
.filter(policyInfo -> policyInfo.isEnabled())
.map(ErasureCodingPolicyInfo::getPolicy)
.toArray(ErasureCodingPolicy[]::new);
}
private static final AdminHelper.Command[] COMMANDS = {
new ListECPoliciesCommand(),
new AddECPoliciesCommand(),

View File

@ -89,6 +89,7 @@ import org.apache.hadoop.hdfs.DFSOpsCountStatistics.OpType;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.ECTopologyVerifierResult;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
@ -2057,4 +2058,35 @@ public class TestDistributedFileSystem {
assertEquals("Number of SSD should be 1 but was : " + numSSD, 1, numSSD);
}
}
@Test
public void testGetECTopologyResultForPolicies() throws Exception {
Configuration conf = new HdfsConfiguration();
try (MiniDFSCluster cluster = DFSTestUtil.setupCluster(conf, 9, 3, 0)) {
DistributedFileSystem dfs = cluster.getFileSystem();
dfs.enableErasureCodingPolicy("RS-6-3-1024k");
// No policies specified should return result for the enabled policy.
ECTopologyVerifierResult result = dfs.getECTopologyResultForPolicies();
assertTrue(result.isSupported());
// Specified policy requiring more datanodes than present in
// the actual cluster.
result = dfs.getECTopologyResultForPolicies("RS-10-4-1024k");
assertFalse(result.isSupported());
// Specify multiple policies that require datanodes equlal or less then
// present in the actual cluster
result =
dfs.getECTopologyResultForPolicies("XOR-2-1-1024k", "RS-3-2-1024k");
assertTrue(result.isSupported());
// Specify multiple policies with one policy requiring more datanodes than
// present in the actual cluster
result =
dfs.getECTopologyResultForPolicies("RS-10-4-1024k", "RS-3-2-1024k");
assertFalse(result.isSupported());
// Enable a policy requiring more datanodes than present in
// the actual cluster.
dfs.enableErasureCodingPolicy("RS-10-4-1024k");
result = dfs.getECTopologyResultForPolicies();
assertFalse(result.isSupported());
}
}
}