HDFS-8023. Erasure Coding: retrieve eraure coding schema for a file from NameNode (Contributed by Vinayakumar B)
This commit is contained in:
parent
bff59392ee
commit
16484f0030
|
@ -49,4 +49,7 @@
|
|||
(Hui Zheng via Zhe Zhang)
|
||||
|
||||
HDFS-7839. Erasure coding: implement facilities in NameNode to create and
|
||||
manage EC zones (Zhe Zhang)
|
||||
manage EC zones (Zhe Zhang)
|
||||
|
||||
HDFS-8023. Erasure Coding: retrieve eraure coding schema for a file from
|
||||
NameNode (vinayakumarb)
|
|
@ -119,6 +119,7 @@ import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
|
|||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
|
||||
import org.apache.hadoop.hdfs.protocol.ECInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.EncryptionZone;
|
||||
import org.apache.hadoop.hdfs.protocol.EncryptionZoneIterator;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
|
@ -3135,6 +3136,19 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
|
|||
}
|
||||
}
|
||||
|
||||
public ECInfo getErasureCodingInfo(String src) throws IOException {
|
||||
checkOpen();
|
||||
TraceScope scope = getPathTraceScope("getErasureCodingInfo", src);
|
||||
try {
|
||||
return namenode.getErasureCodingInfo(src);
|
||||
} catch (RemoteException re) {
|
||||
throw re.unwrapRemoteException(AccessControlException.class,
|
||||
FileNotFoundException.class, UnresolvedPathException.class);
|
||||
} finally {
|
||||
scope.close();
|
||||
}
|
||||
}
|
||||
|
||||
public DFSInotifyEventInputStream getInotifyEventStream() throws IOException {
|
||||
checkOpen();
|
||||
return new DFSInotifyEventInputStream(traceSampler, namenode);
|
||||
|
|
|
@ -1464,4 +1464,14 @@ public interface ClientProtocol {
|
|||
*/
|
||||
@Idempotent
|
||||
public EventBatchList getEditsFromTxid(long txid) throws IOException;
|
||||
|
||||
/**
|
||||
* Gets the ECInfo for the specified file/directory
|
||||
*
|
||||
* @param src
|
||||
* @return Returns the ECInfo if the file/directory is erasure coded, null otherwise
|
||||
* @throws IOException
|
||||
*/
|
||||
@Idempotent
|
||||
public ECInfo getErasureCodingInfo(String src) throws IOException;
|
||||
}
|
||||
|
|
|
@ -35,6 +35,7 @@ import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
|
|||
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
||||
import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
|
||||
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
|
||||
import org.apache.hadoop.hdfs.protocol.ECInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.EncryptionZone;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||
|
@ -108,6 +109,8 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDat
|
|||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDatanodeStorageReportResponseProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetEditsFromTxidRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetEditsFromTxidResponseProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetErasureCodingInfoRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetErasureCodingInfoResponseProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFileInfoRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFileInfoResponseProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFileLinkInfoRequestProto;
|
||||
|
@ -1511,4 +1514,20 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
|
|||
throw new ServiceException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetErasureCodingInfoResponseProto getErasureCodingInfo(RpcController controller,
|
||||
GetErasureCodingInfoRequestProto request) throws ServiceException {
|
||||
try {
|
||||
ECInfo ecInfo = server.getErasureCodingInfo(request.getSrc());
|
||||
GetErasureCodingInfoResponseProto.Builder resBuilder = GetErasureCodingInfoResponseProto
|
||||
.newBuilder();
|
||||
if (ecInfo != null) {
|
||||
resBuilder.setECInfo(PBHelper.convertECInfo(ecInfo));
|
||||
}
|
||||
return resBuilder.build();
|
||||
} catch (IOException e) {
|
||||
throw new ServiceException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -58,6 +58,7 @@ import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
|
|||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
|
||||
import org.apache.hadoop.hdfs.protocol.ECInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.EncryptionZone;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
|
||||
|
@ -107,6 +108,8 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDat
|
|||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDatanodeReportRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDatanodeStorageReportRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetEditsFromTxidRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetErasureCodingInfoRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetErasureCodingInfoResponseProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFileInfoRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFileInfoResponseProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFileLinkInfoRequestProto;
|
||||
|
@ -1532,4 +1535,19 @@ public class ClientNamenodeProtocolTranslatorPB implements
|
|||
throw ProtobufHelper.getRemoteException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ECInfo getErasureCodingInfo(String src) throws IOException {
|
||||
GetErasureCodingInfoRequestProto req = GetErasureCodingInfoRequestProto.newBuilder()
|
||||
.setSrc(src).build();
|
||||
try {
|
||||
GetErasureCodingInfoResponseProto res = rpcProxy.getErasureCodingInfo(null, req);
|
||||
if (res.hasECInfo()) {
|
||||
return PBHelper.convertECInfo(res.getECInfo());
|
||||
}
|
||||
return null;
|
||||
} catch (ServiceException e) {
|
||||
throw ProtobufHelper.getRemoteException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -29,7 +29,11 @@ import java.io.InputStream;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.hadoop.fs.CacheFlag;
|
||||
import org.apache.hadoop.fs.ContentSummary;
|
||||
|
@ -77,6 +81,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
|||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
|
||||
import org.apache.hadoop.hdfs.protocol.ECInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
|
@ -146,6 +151,9 @@ import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeLocalInfoProto;
|
|||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeStorageProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeStorageProto.StorageState;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DirectoryListingProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ECInfoProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ECSchemaOptionEntryProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ECSchemaProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExportedBlockKeysProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.FsPermissionProto;
|
||||
|
@ -226,6 +234,7 @@ import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId;
|
|||
import org.apache.hadoop.hdfs.util.ExactSizeInputStream;
|
||||
import org.apache.hadoop.io.EnumSetWritable;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.io.erasurecode.ECSchema;
|
||||
import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.util.DataChecksum;
|
||||
|
@ -3095,4 +3104,41 @@ public class PBHelper {
|
|||
setId(context.getReportId()).
|
||||
build();
|
||||
}
|
||||
|
||||
public static ECInfo convertECInfo(ECInfoProto ecInfoProto) {
|
||||
return new ECInfo(ecInfoProto.getSrc(),
|
||||
convertECSchema(ecInfoProto.getSchema()));
|
||||
}
|
||||
|
||||
public static ECInfoProto convertECInfo(ECInfo ecInfo) {
|
||||
return ECInfoProto.newBuilder().setSrc(ecInfo.getSrc())
|
||||
.setSchema(convertECSchema(ecInfo.getSchema())).build();
|
||||
}
|
||||
|
||||
public static ECSchema convertECSchema(ECSchemaProto schema) {
|
||||
List<ECSchemaOptionEntryProto> optionsList = schema.getOptionsList();
|
||||
Map<String, String> options = new HashMap<>(optionsList.size());
|
||||
for (ECSchemaOptionEntryProto option : optionsList) {
|
||||
options.put(option.getKey(), option.getValue());
|
||||
}
|
||||
// include chunksize in options.
|
||||
options.put(ECSchema.CHUNK_SIZE_KEY, String.valueOf(schema.getChunkSize()));
|
||||
return new ECSchema(schema.getSchemaName(), schema.getCodecName(),
|
||||
schema.getDataUnits(), schema.getParityUnits(), options);
|
||||
}
|
||||
|
||||
public static ECSchemaProto convertECSchema(ECSchema schema) {
|
||||
ECSchemaProto.Builder builder = ECSchemaProto.newBuilder()
|
||||
.setSchemaName(schema.getSchemaName())
|
||||
.setCodecName(schema.getCodecName())
|
||||
.setDataUnits(schema.getNumDataUnits())
|
||||
.setParityUnits(schema.getNumParityUnits())
|
||||
.setChunkSize(schema.getChunkSize());
|
||||
Set<Entry<String, String>> entrySet = schema.getOptions().entrySet();
|
||||
for (Entry<String, String> entry : entrySet) {
|
||||
builder.addOptions(ECSchemaOptionEntryProto.newBuilder()
|
||||
.setKey(entry.getKey()).setValue(entry.getValue()).build());
|
||||
}
|
||||
return builder.build();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -180,6 +180,7 @@ import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
|||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
|
||||
import org.apache.hadoop.hdfs.protocol.ECInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.EncryptionZone;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
|
@ -262,6 +263,7 @@ import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
|
|||
import org.apache.hadoop.io.EnumSetWritable;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.io.erasurecode.ECSchema;
|
||||
import org.apache.hadoop.ipc.RetriableException;
|
||||
import org.apache.hadoop.ipc.RetryCache;
|
||||
import org.apache.hadoop.ipc.Server;
|
||||
|
@ -7554,6 +7556,35 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
logAuditEvent(true, "createErasureCodingZone", srcArg, null, resultingStat);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the erasure coding information for specified src
|
||||
*/
|
||||
ECInfo getErasureCodingInfo(String src) throws AccessControlException,
|
||||
UnresolvedLinkException, IOException {
|
||||
checkOperation(OperationCategory.READ);
|
||||
final byte[][] pathComponents = FSDirectory
|
||||
.getPathComponentsForReservedPath(src);
|
||||
final FSPermissionChecker pc = getPermissionChecker();
|
||||
readLock();
|
||||
try {
|
||||
checkOperation(OperationCategory.READ);
|
||||
src = dir.resolvePath(pc, src, pathComponents);
|
||||
final INodesInPath iip = dir.getINodesInPath(src, true);
|
||||
if (isPermissionEnabled) {
|
||||
dir.checkPathAccess(pc, iip, FsAction.READ);
|
||||
}
|
||||
if (dir.getECPolicy(iip)) {
|
||||
// TODO HDFS-8074 and HDFS-7859 : To get from loaded schemas
|
||||
Map<String, String> options = new HashMap<String, String>();
|
||||
ECSchema defaultSchema = new ECSchema("RS-6-3", "rs", 6, 3, options);
|
||||
return new ECInfo(src, defaultSchema);
|
||||
}
|
||||
} finally {
|
||||
readUnlock();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
void setXAttr(String src, XAttr xAttr, EnumSet<XAttrSetFlag> flag,
|
||||
boolean logRetryCache)
|
||||
throws IOException {
|
||||
|
|
|
@ -84,6 +84,7 @@ import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
|
|||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
|
||||
import org.apache.hadoop.hdfs.protocol.ECInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.EncryptionZone;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.FSLimitException;
|
||||
|
@ -2029,4 +2030,10 @@ class NameNodeRpcServer implements NamenodeProtocols {
|
|||
namesystem.checkSuperuserPrivilege();
|
||||
nn.spanReceiverHost.removeSpanReceiver(id);
|
||||
}
|
||||
|
||||
@Override // ClientNameNodeProtocol
|
||||
public ECInfo getErasureCodingInfo(String src) throws IOException {
|
||||
checkNNStartup();
|
||||
return namesystem.getErasureCodingInfo(src);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -721,6 +721,14 @@ message CreateErasureCodingZoneRequestProto {
|
|||
message CreateErasureCodingZoneResponseProto {
|
||||
}
|
||||
|
||||
message GetErasureCodingInfoRequestProto {
|
||||
required string src = 1;
|
||||
}
|
||||
|
||||
message GetErasureCodingInfoResponseProto {
|
||||
optional ECInfoProto ECInfo = 1;
|
||||
}
|
||||
|
||||
service ClientNamenodeProtocol {
|
||||
rpc getBlockLocations(GetBlockLocationsRequestProto)
|
||||
returns(GetBlockLocationsResponseProto);
|
||||
|
@ -869,4 +877,6 @@ service ClientNamenodeProtocol {
|
|||
returns(GetCurrentEditLogTxidResponseProto);
|
||||
rpc getEditsFromTxid(GetEditsFromTxidRequestProto)
|
||||
returns(GetEditsFromTxidResponseProto);
|
||||
rpc getErasureCodingInfo(GetErasureCodingInfoRequestProto)
|
||||
returns(GetErasureCodingInfoResponseProto);
|
||||
}
|
||||
|
|
|
@ -620,3 +620,31 @@ message RollingUpgradeStatusProto {
|
|||
required string blockPoolId = 1;
|
||||
optional bool finalized = 2 [default = false];
|
||||
}
|
||||
|
||||
/**
|
||||
* ECSchema options entry
|
||||
*/
|
||||
message ECSchemaOptionEntryProto {
|
||||
required string key = 1;
|
||||
required string value = 2;
|
||||
}
|
||||
|
||||
/**
|
||||
* ECShema for erasurecoding
|
||||
*/
|
||||
message ECSchemaProto {
|
||||
required string schemaName = 1;
|
||||
required string codecName = 2;
|
||||
required uint32 dataUnits = 3;
|
||||
required uint32 parityUnits = 4;
|
||||
required uint32 chunkSize = 5;
|
||||
repeated ECSchemaOptionEntryProto options = 6;
|
||||
}
|
||||
|
||||
/**
|
||||
* ECInfo
|
||||
*/
|
||||
message ECInfoProto {
|
||||
required string src = 1;
|
||||
required ECSchemaProto schema = 2;
|
||||
}
|
|
@ -20,8 +20,10 @@ package org.apache.hadoop.hdfs;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.hdfs.protocol.ECInfo;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||
import org.apache.hadoop.hdfs.server.namenode.INode;
|
||||
import org.apache.hadoop.io.erasurecode.ECSchema;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
@ -29,8 +31,7 @@ import org.junit.Test;
|
|||
import java.io.IOException;
|
||||
|
||||
import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
public class TestErasureCodingZones {
|
||||
private final int NUM_OF_DATANODES = 3;
|
||||
|
@ -148,4 +149,37 @@ public class TestErasureCodingZones {
|
|||
"destination have different erasure coding policies", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetErasureCodingInfo() throws Exception {
|
||||
String src = "/ec";
|
||||
final Path ecDir = new Path(src);
|
||||
fs.mkdir(ecDir, FsPermission.getDirDefault());
|
||||
// dir ECInfo before creating ec zone
|
||||
assertNull(fs.getClient().getErasureCodingInfo(src));
|
||||
// dir ECInfo after creating ec zone
|
||||
fs.getClient().createErasureCodingZone(src);
|
||||
verifyErasureCodingInfo(src);
|
||||
fs.create(new Path(ecDir, "/child1")).close();
|
||||
// verify for the files in ec zone
|
||||
verifyErasureCodingInfo(src + "/child1");
|
||||
}
|
||||
|
||||
private void verifyErasureCodingInfo(String src) throws IOException {
|
||||
ECInfo ecInfo = fs.getClient().getErasureCodingInfo(src);
|
||||
assertNotNull("ECInfo should have been non-null", ecInfo);
|
||||
assertEquals(src, ecInfo.getSrc());
|
||||
ECSchema schema = ecInfo.getSchema();
|
||||
assertNotNull(schema);
|
||||
assertEquals("Default schema should be returned", "RS-6-3",
|
||||
schema.getSchemaName());
|
||||
assertEquals("Default codec(rs) should be returned", "rs",
|
||||
schema.getCodecName());
|
||||
assertEquals("Default numDataUnits should be used", 6,
|
||||
schema.getNumDataUnits());
|
||||
assertEquals("Default numParityUnits should be used", 3,
|
||||
schema.getNumParityUnits());
|
||||
assertEquals("Default chunkSize should be used",
|
||||
ECSchema.DEFAULT_CHUNK_SIZE, schema.getChunkSize());
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue