HDFS-8375. Add cellSize as an XAttr to ECZone. Contributed by Vinayakumar B.
This commit is contained in:
parent
914580934c
commit
91c81fdc24
|
@ -49,7 +49,8 @@ public class HdfsFileStatus {
|
||||||
|
|
||||||
private final FileEncryptionInfo feInfo;
|
private final FileEncryptionInfo feInfo;
|
||||||
|
|
||||||
private final ECSchema schema;
|
private final ECSchema ecSchema;
|
||||||
|
private final int stripeCellSize;
|
||||||
|
|
||||||
// Used by dir, not including dot and dotdot. Always zero for a regular file.
|
// Used by dir, not including dot and dotdot. Always zero for a regular file.
|
||||||
private final int childrenNum;
|
private final int childrenNum;
|
||||||
|
@ -76,7 +77,7 @@ public class HdfsFileStatus {
|
||||||
long blocksize, long modification_time, long access_time,
|
long blocksize, long modification_time, long access_time,
|
||||||
FsPermission permission, String owner, String group, byte[] symlink,
|
FsPermission permission, String owner, String group, byte[] symlink,
|
||||||
byte[] path, long fileId, int childrenNum, FileEncryptionInfo feInfo,
|
byte[] path, long fileId, int childrenNum, FileEncryptionInfo feInfo,
|
||||||
byte storagePolicy, ECSchema schema) {
|
byte storagePolicy, ECSchema ecSchema, int stripeCellSize) {
|
||||||
this.length = length;
|
this.length = length;
|
||||||
this.isdir = isdir;
|
this.isdir = isdir;
|
||||||
this.block_replication = (short)block_replication;
|
this.block_replication = (short)block_replication;
|
||||||
|
@ -96,7 +97,8 @@ public class HdfsFileStatus {
|
||||||
this.childrenNum = childrenNum;
|
this.childrenNum = childrenNum;
|
||||||
this.feInfo = feInfo;
|
this.feInfo = feInfo;
|
||||||
this.storagePolicy = storagePolicy;
|
this.storagePolicy = storagePolicy;
|
||||||
this.schema = schema;
|
this.ecSchema = ecSchema;
|
||||||
|
this.stripeCellSize = stripeCellSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -255,7 +257,11 @@ public class HdfsFileStatus {
|
||||||
}
|
}
|
||||||
|
|
||||||
public ECSchema getECSchema() {
|
public ECSchema getECSchema() {
|
||||||
return schema;
|
return ecSchema;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getStripeCellSize() {
|
||||||
|
return stripeCellSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
public final int getChildrenNum() {
|
public final int getChildrenNum() {
|
||||||
|
|
|
@ -61,7 +61,7 @@ public class SnapshottableDirectoryStatus {
|
||||||
int snapshotNumber, int snapshotQuota, byte[] parentFullPath) {
|
int snapshotNumber, int snapshotQuota, byte[] parentFullPath) {
|
||||||
this.dirStatus = new HdfsFileStatus(0, true, 0, 0, modification_time,
|
this.dirStatus = new HdfsFileStatus(0, true, 0, 0, modification_time,
|
||||||
access_time, permission, owner, group, null, localName, inodeId,
|
access_time, permission, owner, group, null, localName, inodeId,
|
||||||
childrenNum, null, HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED, null);
|
childrenNum, null, HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED, null, 0);
|
||||||
this.snapshotNumber = snapshotNumber;
|
this.snapshotNumber = snapshotNumber;
|
||||||
this.snapshotQuota = snapshotQuota;
|
this.snapshotQuota = snapshotQuota;
|
||||||
this.parentFullPath = parentFullPath;
|
this.parentFullPath = parentFullPath;
|
||||||
|
|
|
@ -132,7 +132,7 @@ class JsonUtilClient {
|
||||||
blockSize, mTime, aTime, permission, owner, group,
|
blockSize, mTime, aTime, permission, owner, group,
|
||||||
symlink, DFSUtilClient.string2Bytes(localName),
|
symlink, DFSUtilClient.string2Bytes(localName),
|
||||||
fileId, childrenNum, null,
|
fileId, childrenNum, null,
|
||||||
storagePolicy, null);
|
storagePolicy, null, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Convert a Json map to an ExtendedBlock object. */
|
/** Convert a Json map to an ExtendedBlock object. */
|
||||||
|
|
|
@ -227,9 +227,11 @@
|
||||||
(Yi Liu via jing9)
|
(Yi Liu via jing9)
|
||||||
|
|
||||||
HDFS-8320. Erasure coding: consolidate striping-related terminologies. (zhz)
|
HDFS-8320. Erasure coding: consolidate striping-related terminologies. (zhz)
|
||||||
|
|
||||||
HDFS-8366. Erasure Coding: Make the timeout parameter of polling blocking queue
|
HDFS-8366. Erasure Coding: Make the timeout parameter of polling blocking queue
|
||||||
configurable in DFSStripedOutputStream. (Li Bo)
|
configurable in DFSStripedOutputStream. (Li Bo)
|
||||||
|
|
||||||
HDFS-8378. Erasure Coding: Few improvements for the erasure coding worker.
|
HDFS-8378. Erasure Coding: Few improvements for the erasure coding worker.
|
||||||
(Rakesh R via waltersu4549)
|
(Rakesh R via waltersu4549)
|
||||||
|
|
||||||
|
HDFS-8375. Add cellSize as an XAttr to ECZone. ( Vinayakumar B via zhz).
|
||||||
|
|
|
@ -1197,7 +1197,8 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
|
||||||
if (fileInfo != null) {
|
if (fileInfo != null) {
|
||||||
ECSchema schema = fileInfo.getECSchema();
|
ECSchema schema = fileInfo.getECSchema();
|
||||||
if (schema != null) {
|
if (schema != null) {
|
||||||
return new DFSStripedInputStream(this, src, verifyChecksum, schema);
|
return new DFSStripedInputStream(this, src, verifyChecksum, schema,
|
||||||
|
fileInfo.getStripeCellSize());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return new DFSInputStream(this, src, verifyChecksum);
|
return new DFSInputStream(this, src, verifyChecksum);
|
||||||
|
@ -3009,12 +3010,12 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
|
||||||
return new EncryptionZoneIterator(namenode, traceSampler);
|
return new EncryptionZoneIterator(namenode, traceSampler);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void createErasureCodingZone(String src, ECSchema schema)
|
public void createErasureCodingZone(String src, ECSchema schema, int cellSize)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
checkOpen();
|
checkOpen();
|
||||||
TraceScope scope = getPathTraceScope("createErasureCodingZone", src);
|
TraceScope scope = getPathTraceScope("createErasureCodingZone", src);
|
||||||
try {
|
try {
|
||||||
namenode.createErasureCodingZone(src, schema);
|
namenode.createErasureCodingZone(src, schema, cellSize);
|
||||||
} catch (RemoteException re) {
|
} catch (RemoteException re) {
|
||||||
throw re.unwrapRemoteException(AccessControlException.class,
|
throw re.unwrapRemoteException(AccessControlException.class,
|
||||||
SafeModeException.class,
|
SafeModeException.class,
|
||||||
|
|
|
@ -125,12 +125,12 @@ public class DFSStripedInputStream extends DFSInputStream {
|
||||||
private final CompletionService<Integer> readingService;
|
private final CompletionService<Integer> readingService;
|
||||||
|
|
||||||
DFSStripedInputStream(DFSClient dfsClient, String src, boolean verifyChecksum,
|
DFSStripedInputStream(DFSClient dfsClient, String src, boolean verifyChecksum,
|
||||||
ECSchema schema) throws IOException {
|
ECSchema schema, int cellSize) throws IOException {
|
||||||
super(dfsClient, src, verifyChecksum);
|
super(dfsClient, src, verifyChecksum);
|
||||||
|
|
||||||
assert schema != null;
|
assert schema != null;
|
||||||
this.schema = schema;
|
this.schema = schema;
|
||||||
cellSize = schema.getChunkSize();
|
this.cellSize = cellSize;
|
||||||
dataBlkNum = (short) schema.getNumDataUnits();
|
dataBlkNum = (short) schema.getNumDataUnits();
|
||||||
parityBlkNum = (short) schema.getNumParityUnits();
|
parityBlkNum = (short) schema.getNumParityUnits();
|
||||||
groupSize = dataBlkNum;
|
groupSize = dataBlkNum;
|
||||||
|
@ -189,7 +189,7 @@ public class DFSStripedInputStream extends DFSInputStream {
|
||||||
targetBlockGroup, cellSize, dataBlkNum, parityBlkNum);
|
targetBlockGroup, cellSize, dataBlkNum, parityBlkNum);
|
||||||
// The purpose is to get start offset into each block.
|
// The purpose is to get start offset into each block.
|
||||||
long[] offsetsForInternalBlocks = getStartOffsetsForInternalBlocks(schema,
|
long[] offsetsForInternalBlocks = getStartOffsetsForInternalBlocks(schema,
|
||||||
targetBlockGroup, offsetIntoBlockGroup);
|
cellSize, targetBlockGroup, offsetIntoBlockGroup);
|
||||||
Preconditions.checkNotNull(offsetsForInternalBlocks);
|
Preconditions.checkNotNull(offsetsForInternalBlocks);
|
||||||
|
|
||||||
final ReaderRetryPolicy retry = new ReaderRetryPolicy();
|
final ReaderRetryPolicy retry = new ReaderRetryPolicy();
|
||||||
|
@ -514,8 +514,8 @@ public class DFSStripedInputStream extends DFSInputStream {
|
||||||
// Refresh the striped block group
|
// Refresh the striped block group
|
||||||
LocatedStripedBlock blockGroup = getBlockGroupAt(blockStartOffset);
|
LocatedStripedBlock blockGroup = getBlockGroupAt(blockStartOffset);
|
||||||
|
|
||||||
AlignedStripe[] stripes = divideByteRangeIntoStripes(schema, blockGroup,
|
AlignedStripe[] stripes = divideByteRangeIntoStripes(schema, cellSize,
|
||||||
start, end, buf, offset);
|
blockGroup, start, end, buf, offset);
|
||||||
for (AlignedStripe stripe : stripes) {
|
for (AlignedStripe stripe : stripes) {
|
||||||
fetchOneStripe(blockGroup, buf, stripe, corruptedBlockMap);
|
fetchOneStripe(blockGroup, buf, stripe, corruptedBlockMap);
|
||||||
}
|
}
|
||||||
|
|
|
@ -230,7 +230,7 @@ public class DFSStripedOutputStream extends DFSOutputStream {
|
||||||
|
|
||||||
final ECSchema schema = stat.getECSchema();
|
final ECSchema schema = stat.getECSchema();
|
||||||
final int numParityBlocks = schema.getNumParityUnits();
|
final int numParityBlocks = schema.getNumParityUnits();
|
||||||
cellSize = schema.getChunkSize();
|
cellSize = stat.getStripeCellSize();
|
||||||
numDataBlocks = schema.getNumDataUnits();
|
numDataBlocks = schema.getNumDataUnits();
|
||||||
numAllBlocks = numDataBlocks + numParityBlocks;
|
numAllBlocks = numDataBlocks + numParityBlocks;
|
||||||
|
|
||||||
|
|
|
@ -2281,16 +2281,17 @@ public class DistributedFileSystem extends FileSystem {
|
||||||
*
|
*
|
||||||
* @param path Directory to create the ec zone
|
* @param path Directory to create the ec zone
|
||||||
* @param schema ECSchema for the zone. If not specified default will be used.
|
* @param schema ECSchema for the zone. If not specified default will be used.
|
||||||
|
* @param cellSize Cellsize for the striped erasure coding
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public void createErasureCodingZone(final Path path, final ECSchema schema)
|
public void createErasureCodingZone(final Path path, final ECSchema schema,
|
||||||
throws IOException {
|
final int cellSize) throws IOException {
|
||||||
Path absF = fixRelativePart(path);
|
Path absF = fixRelativePart(path);
|
||||||
new FileSystemLinkResolver<Void>() {
|
new FileSystemLinkResolver<Void>() {
|
||||||
@Override
|
@Override
|
||||||
public Void doCall(final Path p) throws IOException,
|
public Void doCall(final Path p) throws IOException,
|
||||||
UnresolvedLinkException {
|
UnresolvedLinkException {
|
||||||
dfs.createErasureCodingZone(getPathName(p), schema);
|
dfs.createErasureCodingZone(getPathName(p), schema, cellSize);
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2298,7 +2299,7 @@ public class DistributedFileSystem extends FileSystem {
|
||||||
public Void next(final FileSystem fs, final Path p) throws IOException {
|
public Void next(final FileSystem fs, final Path p) throws IOException {
|
||||||
if (fs instanceof DistributedFileSystem) {
|
if (fs instanceof DistributedFileSystem) {
|
||||||
DistributedFileSystem myDfs = (DistributedFileSystem) fs;
|
DistributedFileSystem myDfs = (DistributedFileSystem) fs;
|
||||||
myDfs.createErasureCodingZone(p, schema);
|
myDfs.createErasureCodingZone(p, schema, cellSize);
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
throw new UnsupportedOperationException(
|
throw new UnsupportedOperationException(
|
||||||
|
|
|
@ -1463,7 +1463,7 @@ public interface ClientProtocol {
|
||||||
* default
|
* default
|
||||||
*/
|
*/
|
||||||
@AtMostOnce
|
@AtMostOnce
|
||||||
public void createErasureCodingZone(String src, ECSchema schema)
|
public void createErasureCodingZone(String src, ECSchema schema, int cellSize)
|
||||||
throws IOException;
|
throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -25,10 +25,12 @@ public class ErasureCodingZoneInfo {
|
||||||
|
|
||||||
private String dir;
|
private String dir;
|
||||||
private ECSchema schema;
|
private ECSchema schema;
|
||||||
|
private int cellSize;
|
||||||
|
|
||||||
public ErasureCodingZoneInfo(String dir, ECSchema schema) {
|
public ErasureCodingZoneInfo(String dir, ECSchema schema, int cellSize) {
|
||||||
this.dir = dir;
|
this.dir = dir;
|
||||||
this.schema = schema;
|
this.schema = schema;
|
||||||
|
this.cellSize = cellSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -49,8 +51,16 @@ public class ErasureCodingZoneInfo {
|
||||||
return schema;
|
return schema;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get cellSize for the EC Zone
|
||||||
|
*/
|
||||||
|
public int getCellSize() {
|
||||||
|
return cellSize;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "Dir: " + getDir() + ", Schema: " + schema;
|
return "Dir: " + getDir() + ", Schema: " + schema + ", cellSize: "
|
||||||
|
+ cellSize;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -59,10 +59,11 @@ public class HdfsLocatedFileStatus extends HdfsFileStatus {
|
||||||
int block_replication, long blocksize, long modification_time,
|
int block_replication, long blocksize, long modification_time,
|
||||||
long access_time, FsPermission permission, String owner, String group,
|
long access_time, FsPermission permission, String owner, String group,
|
||||||
byte[] symlink, byte[] path, long fileId, LocatedBlocks locations,
|
byte[] symlink, byte[] path, long fileId, LocatedBlocks locations,
|
||||||
int childrenNum, FileEncryptionInfo feInfo, byte storagePolicy, ECSchema schema) {
|
int childrenNum, FileEncryptionInfo feInfo, byte storagePolicy,
|
||||||
|
ECSchema schema, int stripeCellSize) {
|
||||||
super(length, isdir, block_replication, blocksize, modification_time,
|
super(length, isdir, block_replication, blocksize, modification_time,
|
||||||
access_time, permission, owner, group, symlink, path, fileId,
|
access_time, permission, owner, group, symlink, path, fileId,
|
||||||
childrenNum, feInfo, storagePolicy, schema);
|
childrenNum, feInfo, storagePolicy, schema, stripeCellSize);
|
||||||
this.locations = locations;
|
this.locations = locations;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1408,7 +1408,8 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
|
||||||
try {
|
try {
|
||||||
ECSchema schema = req.hasSchema() ? PBHelper.convertECSchema(req
|
ECSchema schema = req.hasSchema() ? PBHelper.convertECSchema(req
|
||||||
.getSchema()) : null;
|
.getSchema()) : null;
|
||||||
server.createErasureCodingZone(req.getSrc(), schema);
|
int cellSize = req.hasCellSize() ? req.getCellSize() : 0;
|
||||||
|
server.createErasureCodingZone(req.getSrc(), schema, cellSize);
|
||||||
return CreateErasureCodingZoneResponseProto.newBuilder().build();
|
return CreateErasureCodingZoneResponseProto.newBuilder().build();
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw new ServiceException(e);
|
throw new ServiceException(e);
|
||||||
|
|
|
@ -1422,7 +1422,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void createErasureCodingZone(String src, ECSchema schema)
|
public void createErasureCodingZone(String src, ECSchema schema, int cellSize)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
final CreateErasureCodingZoneRequestProto.Builder builder =
|
final CreateErasureCodingZoneRequestProto.Builder builder =
|
||||||
CreateErasureCodingZoneRequestProto.newBuilder();
|
CreateErasureCodingZoneRequestProto.newBuilder();
|
||||||
|
@ -1430,6 +1430,9 @@ public class ClientNamenodeProtocolTranslatorPB implements
|
||||||
if (schema != null) {
|
if (schema != null) {
|
||||||
builder.setSchema(PBHelper.convertECSchema(schema));
|
builder.setSchema(PBHelper.convertECSchema(schema));
|
||||||
}
|
}
|
||||||
|
if (cellSize > 0) {
|
||||||
|
builder.setCellSize(cellSize);
|
||||||
|
}
|
||||||
CreateErasureCodingZoneRequestProto req = builder.build();
|
CreateErasureCodingZoneRequestProto req = builder.build();
|
||||||
try {
|
try {
|
||||||
rpcProxy.createErasureCodingZone(null, req);
|
rpcProxy.createErasureCodingZone(null, req);
|
||||||
|
|
|
@ -1506,7 +1506,8 @@ public class PBHelper {
|
||||||
fs.hasFileEncryptionInfo() ? convert(fs.getFileEncryptionInfo()) : null,
|
fs.hasFileEncryptionInfo() ? convert(fs.getFileEncryptionInfo()) : null,
|
||||||
fs.hasStoragePolicy() ? (byte) fs.getStoragePolicy()
|
fs.hasStoragePolicy() ? (byte) fs.getStoragePolicy()
|
||||||
: HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED,
|
: HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED,
|
||||||
fs.hasEcSchema() ? PBHelper.convertECSchema(fs.getEcSchema()) : null);
|
fs.hasEcSchema() ? PBHelper.convertECSchema(fs.getEcSchema()) : null,
|
||||||
|
fs.hasStripeCellSize() ? fs.getStripeCellSize() : 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static SnapshottableDirectoryStatus convert(
|
public static SnapshottableDirectoryStatus convert(
|
||||||
|
@ -1570,6 +1571,7 @@ public class PBHelper {
|
||||||
if(fs.getECSchema() != null) {
|
if(fs.getECSchema() != null) {
|
||||||
builder.setEcSchema(PBHelper.convertECSchema(fs.getECSchema()));
|
builder.setEcSchema(PBHelper.convertECSchema(fs.getECSchema()));
|
||||||
}
|
}
|
||||||
|
builder.setStripeCellSize(fs.getStripeCellSize());
|
||||||
return builder.build();
|
return builder.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3157,12 +3159,14 @@ public class PBHelper {
|
||||||
|
|
||||||
public static ErasureCodingZoneInfoProto convertECZoneInfo(ErasureCodingZoneInfo ecZoneInfo) {
|
public static ErasureCodingZoneInfoProto convertECZoneInfo(ErasureCodingZoneInfo ecZoneInfo) {
|
||||||
return ErasureCodingZoneInfoProto.newBuilder().setDir(ecZoneInfo.getDir())
|
return ErasureCodingZoneInfoProto.newBuilder().setDir(ecZoneInfo.getDir())
|
||||||
.setSchema(convertECSchema(ecZoneInfo.getSchema())).build();
|
.setSchema(convertECSchema(ecZoneInfo.getSchema()))
|
||||||
|
.setCellSize(ecZoneInfo.getCellSize()).build();
|
||||||
}
|
}
|
||||||
|
|
||||||
public static ErasureCodingZoneInfo convertECZoneInfo(ErasureCodingZoneInfoProto ecZoneInfoProto) {
|
public static ErasureCodingZoneInfo convertECZoneInfo(ErasureCodingZoneInfoProto ecZoneInfoProto) {
|
||||||
return new ErasureCodingZoneInfo(ecZoneInfoProto.getDir(),
|
return new ErasureCodingZoneInfo(ecZoneInfoProto.getDir(),
|
||||||
convertECSchema(ecZoneInfoProto.getSchema()));
|
convertECSchema(ecZoneInfoProto.getSchema()),
|
||||||
|
ecZoneInfoProto.getCellSize());
|
||||||
}
|
}
|
||||||
|
|
||||||
public static BlockECRecoveryInfo convertBlockECRecoveryInfo(
|
public static BlockECRecoveryInfo convertBlockECRecoveryInfo(
|
||||||
|
@ -3196,9 +3200,11 @@ public class PBHelper {
|
||||||
}
|
}
|
||||||
|
|
||||||
ECSchema ecSchema = convertECSchema(blockEcRecoveryInfoProto.getEcSchema());
|
ECSchema ecSchema = convertECSchema(blockEcRecoveryInfoProto.getEcSchema());
|
||||||
|
int cellSize = blockEcRecoveryInfoProto.getCellSize();
|
||||||
|
|
||||||
return new BlockECRecoveryInfo(block, sourceDnInfos, targetDnInfos,
|
return new BlockECRecoveryInfo(block, sourceDnInfos, targetDnInfos,
|
||||||
targetStorageUuids, convertStorageTypes, liveBlkIndices, ecSchema);
|
targetStorageUuids, convertStorageTypes, liveBlkIndices, ecSchema,
|
||||||
|
cellSize);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static BlockECRecoveryInfoProto convertBlockECRecoveryInfo(
|
public static BlockECRecoveryInfoProto convertBlockECRecoveryInfo(
|
||||||
|
@ -3224,6 +3230,7 @@ public class PBHelper {
|
||||||
builder.addAllLiveBlockIndices(convertIntArray(liveBlockIndices));
|
builder.addAllLiveBlockIndices(convertIntArray(liveBlockIndices));
|
||||||
|
|
||||||
builder.setEcSchema(convertECSchema(blockEcRecoveryInfo.getECSchema()));
|
builder.setEcSchema(convertECSchema(blockEcRecoveryInfo.getECSchema()));
|
||||||
|
builder.setCellSize(blockEcRecoveryInfo.getCellSize());
|
||||||
|
|
||||||
return builder.build();
|
return builder.build();
|
||||||
}
|
}
|
||||||
|
|
|
@ -51,6 +51,7 @@ import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
||||||
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportReplica;
|
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportReplica;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.ErasureCodingZoneInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
import org.apache.hadoop.fs.FileEncryptionInfo;
|
import org.apache.hadoop.fs.FileEncryptionInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
|
@ -1560,14 +1561,14 @@ public class BlockManager {
|
||||||
assert rw instanceof ErasureCodingWork;
|
assert rw instanceof ErasureCodingWork;
|
||||||
assert rw.targets.length > 0;
|
assert rw.targets.length > 0;
|
||||||
String src = block.getBlockCollection().getName();
|
String src = block.getBlockCollection().getName();
|
||||||
ECSchema ecSchema = null;
|
ErasureCodingZoneInfo ecZoneInfo = null;
|
||||||
try {
|
try {
|
||||||
ecSchema = namesystem.getECSchemaForPath(src);
|
ecZoneInfo = namesystem.getErasureCodingZoneInfoForPath(src);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
blockLog
|
blockLog
|
||||||
.warn("Failed to get the EC schema for the file {} ", src);
|
.warn("Failed to get the EC zone info for the file {} ", src);
|
||||||
}
|
}
|
||||||
if (ecSchema == null) {
|
if (ecZoneInfo == null) {
|
||||||
blockLog.warn("No EC schema found for the file {}. "
|
blockLog.warn("No EC schema found for the file {}. "
|
||||||
+ "So cannot proceed for recovery", src);
|
+ "So cannot proceed for recovery", src);
|
||||||
// TODO: we may have to revisit later for what we can do better to
|
// TODO: we may have to revisit later for what we can do better to
|
||||||
|
@ -1577,7 +1578,8 @@ public class BlockManager {
|
||||||
rw.targets[0].getDatanodeDescriptor().addBlockToBeErasureCoded(
|
rw.targets[0].getDatanodeDescriptor().addBlockToBeErasureCoded(
|
||||||
new ExtendedBlock(namesystem.getBlockPoolId(), block),
|
new ExtendedBlock(namesystem.getBlockPoolId(), block),
|
||||||
rw.srcNodes, rw.targets,
|
rw.srcNodes, rw.targets,
|
||||||
((ErasureCodingWork) rw).liveBlockIndicies, ecSchema);
|
((ErasureCodingWork) rw).liveBlockIndicies,
|
||||||
|
ecZoneInfo.getSchema(), ecZoneInfo.getCellSize());
|
||||||
} else {
|
} else {
|
||||||
rw.srcNodes[0].addBlockToBeReplicated(block, targets);
|
rw.srcNodes[0].addBlockToBeReplicated(block, targets);
|
||||||
}
|
}
|
||||||
|
|
|
@ -610,10 +610,10 @@ public class DatanodeDescriptor extends DatanodeInfo {
|
||||||
*/
|
*/
|
||||||
void addBlockToBeErasureCoded(ExtendedBlock block,
|
void addBlockToBeErasureCoded(ExtendedBlock block,
|
||||||
DatanodeDescriptor[] sources, DatanodeStorageInfo[] targets,
|
DatanodeDescriptor[] sources, DatanodeStorageInfo[] targets,
|
||||||
short[] liveBlockIndices, ECSchema ecSchema) {
|
short[] liveBlockIndices, ECSchema ecSchema, int cellSize) {
|
||||||
assert (block != null && sources != null && sources.length > 0);
|
assert (block != null && sources != null && sources.length > 0);
|
||||||
BlockECRecoveryInfo task = new BlockECRecoveryInfo(block, sources, targets,
|
BlockECRecoveryInfo task = new BlockECRecoveryInfo(block, sources, targets,
|
||||||
liveBlockIndices, ecSchema);
|
liveBlockIndices, ecSchema, cellSize);
|
||||||
erasurecodeBlocks.offer(task);
|
erasurecodeBlocks.offer(task);
|
||||||
BlockManager.LOG.debug("Adding block recovery task " + task + "to "
|
BlockManager.LOG.debug("Adding block recovery task " + task + "to "
|
||||||
+ getName() + ", current queue size is " + erasurecodeBlocks.size());
|
+ getName() + ", current queue size is " + erasurecodeBlocks.size());
|
||||||
|
|
|
@ -269,7 +269,7 @@ public final class ErasureCodingWorker {
|
||||||
ECSchema schema = recoveryInfo.getECSchema();
|
ECSchema schema = recoveryInfo.getECSchema();
|
||||||
dataBlkNum = schema.getNumDataUnits();
|
dataBlkNum = schema.getNumDataUnits();
|
||||||
parityBlkNum = schema.getNumParityUnits();
|
parityBlkNum = schema.getNumParityUnits();
|
||||||
cellSize = schema.getChunkSize();
|
cellSize = recoveryInfo.getCellSize();
|
||||||
|
|
||||||
blockGroup = recoveryInfo.getExtendedBlock();
|
blockGroup = recoveryInfo.getExtendedBlock();
|
||||||
|
|
||||||
|
|
|
@ -19,12 +19,20 @@ package org.apache.hadoop.hdfs.server.namenode;
|
||||||
|
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
|
|
||||||
import org.apache.hadoop.fs.XAttr;
|
import org.apache.hadoop.fs.XAttr;
|
||||||
import org.apache.hadoop.fs.XAttrSetFlag;
|
import org.apache.hadoop.fs.XAttrSetFlag;
|
||||||
import org.apache.hadoop.hdfs.XAttrHelper;
|
import org.apache.hadoop.hdfs.XAttrHelper;
|
||||||
import org.apache.hadoop.hdfs.protocol.ErasureCodingZoneInfo;
|
import org.apache.hadoop.hdfs.protocol.ErasureCodingZoneInfo;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
|
import org.apache.hadoop.io.IOUtils;
|
||||||
|
import org.apache.hadoop.io.WritableUtils;
|
||||||
import org.apache.hadoop.io.erasurecode.ECSchema;
|
import org.apache.hadoop.io.erasurecode.ECSchema;
|
||||||
|
|
||||||
|
import java.io.ByteArrayInputStream;
|
||||||
|
import java.io.ByteArrayOutputStream;
|
||||||
|
import java.io.DataInputStream;
|
||||||
|
import java.io.DataOutputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
|
@ -78,17 +86,21 @@ public class ErasureCodingZoneManager {
|
||||||
: inode.getXAttrFeature().getXAttrs();
|
: inode.getXAttrFeature().getXAttrs();
|
||||||
for (XAttr xAttr : xAttrs) {
|
for (XAttr xAttr : xAttrs) {
|
||||||
if (XATTR_ERASURECODING_ZONE.equals(XAttrHelper.getPrefixName(xAttr))) {
|
if (XATTR_ERASURECODING_ZONE.equals(XAttrHelper.getPrefixName(xAttr))) {
|
||||||
String schemaName = new String(xAttr.getValue());
|
ByteArrayInputStream bIn=new ByteArrayInputStream(xAttr.getValue());
|
||||||
|
DataInputStream dIn=new DataInputStream(bIn);
|
||||||
|
int cellSize = WritableUtils.readVInt(dIn);
|
||||||
|
String schemaName = WritableUtils.readString(dIn);
|
||||||
ECSchema schema = dir.getFSNamesystem().getECSchemaManager()
|
ECSchema schema = dir.getFSNamesystem().getECSchemaManager()
|
||||||
.getSchema(schemaName);
|
.getSchema(schemaName);
|
||||||
return new ErasureCodingZoneInfo(inode.getFullPathName(), schema);
|
return new ErasureCodingZoneInfo(inode.getFullPathName(), schema,
|
||||||
|
cellSize);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
XAttr createErasureCodingZone(String src, ECSchema schema)
|
XAttr createErasureCodingZone(String src, ECSchema schema, int cellSize)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
assert dir.hasWriteLock();
|
assert dir.hasWriteLock();
|
||||||
final INodesInPath srcIIP = dir.getINodesInPath4Write(src, false);
|
final INodesInPath srcIIP = dir.getINodesInPath4Write(src, false);
|
||||||
|
@ -113,10 +125,24 @@ public class ErasureCodingZoneManager {
|
||||||
schema = ErasureCodingSchemaManager.getSystemDefaultSchema();
|
schema = ErasureCodingSchemaManager.getSystemDefaultSchema();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Now persist the schema name in xattr
|
if (cellSize <= 0) {
|
||||||
byte[] schemaBytes = schema.getSchemaName().getBytes();
|
cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
|
||||||
final XAttr ecXAttr = XAttrHelper.buildXAttr(XATTR_ERASURECODING_ZONE,
|
}
|
||||||
schemaBytes);
|
|
||||||
|
// Write the cellsize first and then schema name
|
||||||
|
final XAttr ecXAttr;
|
||||||
|
DataOutputStream dOut = null;
|
||||||
|
try {
|
||||||
|
ByteArrayOutputStream bOut = new ByteArrayOutputStream();
|
||||||
|
dOut = new DataOutputStream(bOut);
|
||||||
|
WritableUtils.writeVInt(dOut, cellSize);
|
||||||
|
// Now persist the schema name in xattr
|
||||||
|
WritableUtils.writeString(dOut, schema.getSchemaName());
|
||||||
|
ecXAttr = XAttrHelper.buildXAttr(XATTR_ERASURECODING_ZONE,
|
||||||
|
bOut.toByteArray());
|
||||||
|
} finally {
|
||||||
|
IOUtils.closeStream(dOut);
|
||||||
|
}
|
||||||
final List<XAttr> xattrs = Lists.newArrayListWithCapacity(1);
|
final List<XAttr> xattrs = Lists.newArrayListWithCapacity(1);
|
||||||
xattrs.add(ecXAttr);
|
xattrs.add(ecXAttr);
|
||||||
FSDirXAttrOp.unprotectedSetXAttrs(dir, src, xattrs,
|
FSDirXAttrOp.unprotectedSetXAttrs(dir, src, xattrs,
|
||||||
|
|
|
@ -29,6 +29,7 @@ import org.apache.hadoop.fs.permission.FsAction;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.hdfs.DFSUtil;
|
import org.apache.hadoop.hdfs.DFSUtil;
|
||||||
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
|
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.ErasureCodingZoneInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.FsPermissionExtension;
|
import org.apache.hadoop.hdfs.protocol.FsPermissionExtension;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||||
|
@ -317,7 +318,7 @@ class FSDirStatAndListingOp {
|
||||||
if (fsd.getINode4DotSnapshot(srcs) != null) {
|
if (fsd.getINode4DotSnapshot(srcs) != null) {
|
||||||
return new HdfsFileStatus(0, true, 0, 0, 0, 0, null, null, null, null,
|
return new HdfsFileStatus(0, true, 0, 0, 0, 0, null, null, null, null,
|
||||||
HdfsFileStatus.EMPTY_NAME, -1L, 0, null,
|
HdfsFileStatus.EMPTY_NAME, -1L, 0, null,
|
||||||
HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED, null);
|
HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED, null, 0);
|
||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
@ -385,8 +386,10 @@ class FSDirStatAndListingOp {
|
||||||
final FileEncryptionInfo feInfo = isRawPath ? null :
|
final FileEncryptionInfo feInfo = isRawPath ? null :
|
||||||
fsd.getFileEncryptionInfo(node, snapshot, iip);
|
fsd.getFileEncryptionInfo(node, snapshot, iip);
|
||||||
|
|
||||||
final ECSchema schema = fsd.getECSchema(iip);
|
final ErasureCodingZoneInfo ecZoneInfo = fsd.getECZoneInfo(iip);
|
||||||
|
final ECSchema schema = ecZoneInfo != null ? ecZoneInfo.getSchema() : null;
|
||||||
|
final int cellSize = ecZoneInfo != null ? ecZoneInfo.getCellSize() : 0;
|
||||||
|
|
||||||
if (node.isFile()) {
|
if (node.isFile()) {
|
||||||
final INodeFile fileNode = node.asFile();
|
final INodeFile fileNode = node.asFile();
|
||||||
size = fileNode.computeFileSize(snapshot);
|
size = fileNode.computeFileSize(snapshot);
|
||||||
|
@ -417,7 +420,8 @@ class FSDirStatAndListingOp {
|
||||||
childrenNum,
|
childrenNum,
|
||||||
feInfo,
|
feInfo,
|
||||||
storagePolicy,
|
storagePolicy,
|
||||||
schema);
|
schema,
|
||||||
|
cellSize);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static INodeAttributes getINodeAttributes(
|
private static INodeAttributes getINodeAttributes(
|
||||||
|
@ -464,8 +468,10 @@ class FSDirStatAndListingOp {
|
||||||
}
|
}
|
||||||
int childrenNum = node.isDirectory() ?
|
int childrenNum = node.isDirectory() ?
|
||||||
node.asDirectory().getChildrenNum(snapshot) : 0;
|
node.asDirectory().getChildrenNum(snapshot) : 0;
|
||||||
final ECSchema schema = fsd.getECSchema(iip);
|
final ErasureCodingZoneInfo ecZoneInfo = fsd.getECZoneInfo(iip);
|
||||||
|
final ECSchema schema = ecZoneInfo != null ? ecZoneInfo.getSchema() : null;
|
||||||
|
final int cellSize = ecZoneInfo != null ? ecZoneInfo.getCellSize() : 0;
|
||||||
|
|
||||||
HdfsLocatedFileStatus status =
|
HdfsLocatedFileStatus status =
|
||||||
new HdfsLocatedFileStatus(size, node.isDirectory(), replication,
|
new HdfsLocatedFileStatus(size, node.isDirectory(), replication,
|
||||||
blocksize, node.getModificationTime(snapshot),
|
blocksize, node.getModificationTime(snapshot),
|
||||||
|
@ -473,7 +479,8 @@ class FSDirStatAndListingOp {
|
||||||
getPermissionForFileStatus(nodeAttrs, isEncrypted),
|
getPermissionForFileStatus(nodeAttrs, isEncrypted),
|
||||||
nodeAttrs.getUserName(), nodeAttrs.getGroupName(),
|
nodeAttrs.getUserName(), nodeAttrs.getGroupName(),
|
||||||
node.isSymlink() ? node.asSymlink().getSymlink() : null, path,
|
node.isSymlink() ? node.asSymlink().getSymlink() : null, path,
|
||||||
node.getId(), loc, childrenNum, feInfo, storagePolicy, schema);
|
node.getId(), loc, childrenNum, feInfo, storagePolicy, schema,
|
||||||
|
cellSize);
|
||||||
// Set caching information for the located blocks.
|
// Set caching information for the located blocks.
|
||||||
if (loc != null) {
|
if (loc != null) {
|
||||||
CacheManager cacheManager = fsd.getFSNamesystem().getCacheManager();
|
CacheManager cacheManager = fsd.getFSNamesystem().getCacheManager();
|
||||||
|
|
|
@ -1230,11 +1230,11 @@ public class FSDirectory implements Closeable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
XAttr createErasureCodingZone(String src, ECSchema schema)
|
XAttr createErasureCodingZone(String src, ECSchema schema, int cellSize)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
writeLock();
|
writeLock();
|
||||||
try {
|
try {
|
||||||
return ecZoneManager.createErasureCodingZone(src, schema);
|
return ecZoneManager.createErasureCodingZone(src, schema, cellSize);
|
||||||
} finally {
|
} finally {
|
||||||
writeUnlock();
|
writeUnlock();
|
||||||
}
|
}
|
||||||
|
|
|
@ -7555,14 +7555,14 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
||||||
* @param srcArg the path of a directory which will be the root of the
|
* @param srcArg the path of a directory which will be the root of the
|
||||||
* erasure coding zone. The directory must be empty.
|
* erasure coding zone. The directory must be empty.
|
||||||
* @param schema ECSchema for the erasure coding zone
|
* @param schema ECSchema for the erasure coding zone
|
||||||
*
|
* @param cellSize Cell size of stripe
|
||||||
* @throws AccessControlException if the caller is not the superuser.
|
* @throws AccessControlException if the caller is not the superuser.
|
||||||
* @throws UnresolvedLinkException if the path can't be resolved.
|
* @throws UnresolvedLinkException if the path can't be resolved.
|
||||||
* @throws SafeModeException if the Namenode is in safe mode.
|
* @throws SafeModeException if the Namenode is in safe mode.
|
||||||
*/
|
*/
|
||||||
void createErasureCodingZone(final String srcArg, final ECSchema schema,
|
void createErasureCodingZone(final String srcArg, final ECSchema schema,
|
||||||
final boolean logRetryCache) throws IOException, UnresolvedLinkException,
|
int cellSize, final boolean logRetryCache) throws IOException,
|
||||||
SafeModeException, AccessControlException {
|
UnresolvedLinkException, SafeModeException, AccessControlException {
|
||||||
String src = srcArg;
|
String src = srcArg;
|
||||||
HdfsFileStatus resultingStat = null;
|
HdfsFileStatus resultingStat = null;
|
||||||
FSPermissionChecker pc = null;
|
FSPermissionChecker pc = null;
|
||||||
|
@ -7585,7 +7585,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
||||||
checkNameNodeSafeMode("Cannot create erasure coding zone on " + src);
|
checkNameNodeSafeMode("Cannot create erasure coding zone on " + src);
|
||||||
src = dir.resolvePath(pc, src, pathComponents);
|
src = dir.resolvePath(pc, src, pathComponents);
|
||||||
|
|
||||||
final XAttr ecXAttr = dir.createErasureCodingZone(src, schema);
|
final XAttr ecXAttr = dir.createErasureCodingZone(src, schema, cellSize);
|
||||||
List<XAttr> xAttrs = Lists.newArrayListWithCapacity(1);
|
List<XAttr> xAttrs = Lists.newArrayListWithCapacity(1);
|
||||||
xAttrs.add(ecXAttr);
|
xAttrs.add(ecXAttr);
|
||||||
getEditLog().logSetXAttrs(src, xAttrs, logRetryCache);
|
getEditLog().logSetXAttrs(src, xAttrs, logRetryCache);
|
||||||
|
@ -7604,9 +7604,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
||||||
*/
|
*/
|
||||||
ErasureCodingInfo getErasureCodingInfo(String src) throws AccessControlException,
|
ErasureCodingInfo getErasureCodingInfo(String src) throws AccessControlException,
|
||||||
UnresolvedLinkException, IOException {
|
UnresolvedLinkException, IOException {
|
||||||
ECSchema schema = getECSchemaForPath(src);
|
ErasureCodingZoneInfo zoneInfo = getErasureCodingZoneInfo(src);
|
||||||
if (schema != null) {
|
if (zoneInfo != null) {
|
||||||
return new ErasureCodingInfo(src, schema);
|
return new ErasureCodingInfo(src, zoneInfo.getSchema());
|
||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
@ -7614,21 +7614,13 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
||||||
/**
|
/**
|
||||||
* Get the erasure coding zone information for specified path
|
* Get the erasure coding zone information for specified path
|
||||||
*/
|
*/
|
||||||
ErasureCodingZoneInfo getErasureCodingZoneInfo(String src) throws AccessControlException,
|
ErasureCodingZoneInfo getErasureCodingZoneInfo(String src)
|
||||||
UnresolvedLinkException, IOException {
|
throws AccessControlException, UnresolvedLinkException, IOException {
|
||||||
checkOperation(OperationCategory.READ);
|
checkOperation(OperationCategory.READ);
|
||||||
final byte[][] pathComponents = FSDirectory
|
|
||||||
.getPathComponentsForReservedPath(src);
|
|
||||||
final FSPermissionChecker pc = getPermissionChecker();
|
|
||||||
readLock();
|
readLock();
|
||||||
try {
|
try {
|
||||||
checkOperation(OperationCategory.READ);
|
checkOperation(OperationCategory.READ);
|
||||||
src = dir.resolvePath(pc, src, pathComponents);
|
return getErasureCodingZoneInfoForPath(src);
|
||||||
final INodesInPath iip = dir.getINodesInPath(src, true);
|
|
||||||
if (isPermissionEnabled) {
|
|
||||||
dir.checkPathAccess(pc, iip, FsAction.READ);
|
|
||||||
}
|
|
||||||
return dir.getECZoneInfo(iip);
|
|
||||||
} finally {
|
} finally {
|
||||||
readUnlock();
|
readUnlock();
|
||||||
}
|
}
|
||||||
|
@ -7849,24 +7841,17 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ECSchema getECSchemaForPath(String src) throws IOException {
|
public ErasureCodingZoneInfo getErasureCodingZoneInfoForPath(String src)
|
||||||
checkOperation(OperationCategory.READ);
|
throws IOException {
|
||||||
final byte[][] pathComponents = FSDirectory
|
final byte[][] pathComponents = FSDirectory
|
||||||
.getPathComponentsForReservedPath(src);
|
.getPathComponentsForReservedPath(src);
|
||||||
final FSPermissionChecker pc = getPermissionChecker();
|
final FSPermissionChecker pc = getPermissionChecker();
|
||||||
readLock();
|
src = dir.resolvePath(pc, src, pathComponents);
|
||||||
try {
|
final INodesInPath iip = dir.getINodesInPath(src, true);
|
||||||
checkOperation(OperationCategory.READ);
|
if (isPermissionEnabled) {
|
||||||
src = dir.resolvePath(pc, src, pathComponents);
|
dir.checkPathAccess(pc, iip, FsAction.READ);
|
||||||
final INodesInPath iip = dir.getINodesInPath(src, true);
|
|
||||||
if (isPermissionEnabled) {
|
|
||||||
dir.checkPathAccess(pc, iip, FsAction.READ);
|
|
||||||
}
|
|
||||||
// Get schema set for the zone
|
|
||||||
return dir.getECSchema(iip);
|
|
||||||
} finally {
|
|
||||||
readUnlock();
|
|
||||||
}
|
}
|
||||||
|
return dir.getECZoneInfo(iip);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1824,7 +1824,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override // ClientProtocol
|
@Override // ClientProtocol
|
||||||
public void createErasureCodingZone(String src, ECSchema schema)
|
public void createErasureCodingZone(String src, ECSchema schema, int cellSize)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
checkNNStartup();
|
checkNNStartup();
|
||||||
final CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
|
final CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
|
||||||
|
@ -1833,7 +1833,8 @@ class NameNodeRpcServer implements NamenodeProtocols {
|
||||||
}
|
}
|
||||||
boolean success = false;
|
boolean success = false;
|
||||||
try {
|
try {
|
||||||
namesystem.createErasureCodingZone(src, schema, cacheEntry != null);
|
namesystem.createErasureCodingZone(src, schema, cellSize,
|
||||||
|
cacheEntry != null);
|
||||||
success = true;
|
success = true;
|
||||||
} finally {
|
} finally {
|
||||||
RetryCache.setState(cacheEntry, success);
|
RetryCache.setState(cacheEntry, success);
|
||||||
|
|
|
@ -21,6 +21,7 @@ import java.io.IOException;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.ErasureCodingZoneInfo;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
|
import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
|
||||||
import org.apache.hadoop.hdfs.util.RwLock;
|
import org.apache.hadoop.hdfs.util.RwLock;
|
||||||
|
@ -51,12 +52,13 @@ public interface Namesystem extends RwLock, SafeMode {
|
||||||
public boolean isInSnapshot(BlockCollection bc);
|
public boolean isInSnapshot(BlockCollection bc);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Gets the ECSchema for the specified path
|
* Gets the ECZone info for path
|
||||||
*
|
*
|
||||||
* @param src
|
* @param src
|
||||||
* - path
|
* - path
|
||||||
* @return ECSchema
|
* @return {@link ErasureCodingZoneInfo}
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public ECSchema getECSchemaForPath(String src) throws IOException;
|
public ErasureCodingZoneInfo getErasureCodingZoneInfoForPath(String src)
|
||||||
|
throws IOException;
|
||||||
}
|
}
|
|
@ -78,25 +78,22 @@ public class BlockECRecoveryCommand extends DatanodeCommand {
|
||||||
private StorageType[] targetStorageTypes;
|
private StorageType[] targetStorageTypes;
|
||||||
private final short[] liveBlockIndices;
|
private final short[] liveBlockIndices;
|
||||||
private final ECSchema ecSchema;
|
private final ECSchema ecSchema;
|
||||||
|
private final int cellSize;
|
||||||
|
|
||||||
public BlockECRecoveryInfo(ExtendedBlock block, DatanodeInfo[] sources,
|
public BlockECRecoveryInfo(ExtendedBlock block, DatanodeInfo[] sources,
|
||||||
DatanodeStorageInfo[] targetDnStorageInfo, short[] liveBlockIndices,
|
DatanodeStorageInfo[] targetDnStorageInfo, short[] liveBlockIndices,
|
||||||
ECSchema ecSchema) {
|
ECSchema ecSchema, int cellSize) {
|
||||||
this.block = block;
|
this(block, sources, DatanodeStorageInfo
|
||||||
this.sources = sources;
|
.toDatanodeInfos(targetDnStorageInfo), DatanodeStorageInfo
|
||||||
this.targets = DatanodeStorageInfo.toDatanodeInfos(targetDnStorageInfo);
|
.toStorageIDs(targetDnStorageInfo), DatanodeStorageInfo
|
||||||
this.targetStorageIDs = DatanodeStorageInfo
|
.toStorageTypes(targetDnStorageInfo), liveBlockIndices, ecSchema,
|
||||||
.toStorageIDs(targetDnStorageInfo);
|
cellSize);
|
||||||
this.targetStorageTypes = DatanodeStorageInfo
|
|
||||||
.toStorageTypes(targetDnStorageInfo);
|
|
||||||
this.liveBlockIndices = liveBlockIndices;
|
|
||||||
this.ecSchema = ecSchema;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public BlockECRecoveryInfo(ExtendedBlock block, DatanodeInfo[] sources,
|
public BlockECRecoveryInfo(ExtendedBlock block, DatanodeInfo[] sources,
|
||||||
DatanodeInfo[] targets, String[] targetStorageIDs,
|
DatanodeInfo[] targets, String[] targetStorageIDs,
|
||||||
StorageType[] targetStorageTypes, short[] liveBlockIndices,
|
StorageType[] targetStorageTypes, short[] liveBlockIndices,
|
||||||
ECSchema ecSchema) {
|
ECSchema ecSchema, int cellSize) {
|
||||||
this.block = block;
|
this.block = block;
|
||||||
this.sources = sources;
|
this.sources = sources;
|
||||||
this.targets = targets;
|
this.targets = targets;
|
||||||
|
@ -104,6 +101,7 @@ public class BlockECRecoveryCommand extends DatanodeCommand {
|
||||||
this.targetStorageTypes = targetStorageTypes;
|
this.targetStorageTypes = targetStorageTypes;
|
||||||
this.liveBlockIndices = liveBlockIndices;
|
this.liveBlockIndices = liveBlockIndices;
|
||||||
this.ecSchema = ecSchema;
|
this.ecSchema = ecSchema;
|
||||||
|
this.cellSize = cellSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
public ExtendedBlock getExtendedBlock() {
|
public ExtendedBlock getExtendedBlock() {
|
||||||
|
@ -134,6 +132,10 @@ public class BlockECRecoveryCommand extends DatanodeCommand {
|
||||||
return ecSchema;
|
return ecSchema;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public int getCellSize() {
|
||||||
|
return cellSize;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return new StringBuilder().append("BlockECRecoveryInfo(\n ")
|
return new StringBuilder().append("BlockECRecoveryInfo(\n ")
|
||||||
|
|
|
@ -31,6 +31,7 @@ import org.apache.hadoop.fs.shell.CommandFactory;
|
||||||
import org.apache.hadoop.fs.shell.PathData;
|
import org.apache.hadoop.fs.shell.PathData;
|
||||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
import org.apache.hadoop.hdfs.protocol.ErasureCodingZoneInfo;
|
import org.apache.hadoop.hdfs.protocol.ErasureCodingZoneInfo;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.UnsupportedActionException;
|
import org.apache.hadoop.hdfs.server.namenode.UnsupportedActionException;
|
||||||
import org.apache.hadoop.io.erasurecode.ECSchema;
|
import org.apache.hadoop.io.erasurecode.ECSchema;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
|
@ -88,14 +89,23 @@ public abstract class ECCommand extends Command {
|
||||||
+ "Options :\n"
|
+ "Options :\n"
|
||||||
+ " -s <schemaName> : EC schema name to encode files. "
|
+ " -s <schemaName> : EC schema name to encode files. "
|
||||||
+ "If not passed default schema will be used\n"
|
+ "If not passed default schema will be used\n"
|
||||||
|
+ " -c <cellSize> : cell size to use for striped encoding files."
|
||||||
|
+ " If not passed default cellsize of "
|
||||||
|
+ HdfsConstants.BLOCK_STRIPED_CELL_SIZE + " will be used\n"
|
||||||
+ " <path> : Path to an empty directory. Under this directory "
|
+ " <path> : Path to an empty directory. Under this directory "
|
||||||
+ "files will be encoded using specified schema";
|
+ "files will be encoded using specified schema";
|
||||||
private String schemaName;
|
private String schemaName;
|
||||||
|
private int cellSize = 0;
|
||||||
private ECSchema schema = null;
|
private ECSchema schema = null;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void processOptions(LinkedList<String> args) throws IOException {
|
protected void processOptions(LinkedList<String> args) throws IOException {
|
||||||
schemaName = StringUtils.popOptionWithArgument("-s", args);
|
schemaName = StringUtils.popOptionWithArgument("-s", args);
|
||||||
|
String cellSizeStr = StringUtils.popOptionWithArgument("-c", args);
|
||||||
|
if (cellSizeStr != null) {
|
||||||
|
cellSize = (int) StringUtils.TraditionalBinaryPrefix
|
||||||
|
.string2long(cellSizeStr);
|
||||||
|
}
|
||||||
if (args.isEmpty()) {
|
if (args.isEmpty()) {
|
||||||
throw new HadoopIllegalArgumentException("<path> is missing");
|
throw new HadoopIllegalArgumentException("<path> is missing");
|
||||||
}
|
}
|
||||||
|
@ -131,7 +141,7 @@ public abstract class ECCommand extends Command {
|
||||||
throw new HadoopIllegalArgumentException(sb.toString());
|
throw new HadoopIllegalArgumentException(sb.toString());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
dfs.createErasureCodingZone(item.path, schema);
|
dfs.createErasureCodingZone(item.path, schema, cellSize);
|
||||||
out.println("EC Zone created successfully at " + item.path);
|
out.println("EC Zone created successfully at " + item.path);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw new IOException("Unable to create EC zone for the path "
|
throw new IOException("Unable to create EC zone for the path "
|
||||||
|
@ -213,4 +223,4 @@ public abstract class ECCommand extends Command {
|
||||||
out.println(sb.toString());
|
out.println(sb.toString());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -306,6 +306,7 @@ public class StripedBlockUtil {
|
||||||
* {@link AlignedStripe}.
|
* {@link AlignedStripe}.
|
||||||
* @param ecSchema The codec schema for the file, which carries the numbers
|
* @param ecSchema The codec schema for the file, which carries the numbers
|
||||||
* of data / parity blocks, as well as cell size
|
* of data / parity blocks, as well as cell size
|
||||||
|
* @param cellSize Cell size of stripe
|
||||||
* @param blockGroup The striped block group
|
* @param blockGroup The striped block group
|
||||||
* @param rangeStartInBlockGroup The byte range's start offset in block group
|
* @param rangeStartInBlockGroup The byte range's start offset in block group
|
||||||
* @param rangeEndInBlockGroup The byte range's end offset in block group
|
* @param rangeEndInBlockGroup The byte range's end offset in block group
|
||||||
|
@ -315,28 +316,29 @@ public class StripedBlockUtil {
|
||||||
* At most 5 stripes will be generated from each logical range, as
|
* At most 5 stripes will be generated from each logical range, as
|
||||||
* demonstrated in the header of {@link AlignedStripe}.
|
* demonstrated in the header of {@link AlignedStripe}.
|
||||||
*/
|
*/
|
||||||
public static AlignedStripe[] divideByteRangeIntoStripes (
|
public static AlignedStripe[] divideByteRangeIntoStripes(ECSchema ecSchema,
|
||||||
ECSchema ecSchema, LocatedStripedBlock blockGroup,
|
int cellSize, LocatedStripedBlock blockGroup,
|
||||||
long rangeStartInBlockGroup, long rangeEndInBlockGroup, byte[] buf,
|
long rangeStartInBlockGroup, long rangeEndInBlockGroup, byte[] buf,
|
||||||
int offsetInBuf) {
|
int offsetInBuf) {
|
||||||
// TODO: change ECSchema naming to use cell size instead of chunk size
|
// TODO: change ECSchema naming to use cell size instead of chunk size
|
||||||
|
|
||||||
// Step 0: analyze range and calculate basic parameters
|
// Step 0: analyze range and calculate basic parameters
|
||||||
int cellSize = ecSchema.getChunkSize();
|
|
||||||
int dataBlkNum = ecSchema.getNumDataUnits();
|
int dataBlkNum = ecSchema.getNumDataUnits();
|
||||||
|
|
||||||
// Step 1: map the byte range to StripingCells
|
// Step 1: map the byte range to StripingCells
|
||||||
StripingCell[] cells = getStripingCellsOfByteRange(ecSchema, blockGroup,
|
StripingCell[] cells = getStripingCellsOfByteRange(ecSchema, cellSize,
|
||||||
rangeStartInBlockGroup, rangeEndInBlockGroup);
|
blockGroup, rangeStartInBlockGroup, rangeEndInBlockGroup);
|
||||||
|
|
||||||
// Step 2: get the unmerged ranges on each internal block
|
// Step 2: get the unmerged ranges on each internal block
|
||||||
VerticalRange[] ranges = getRangesForInternalBlocks(ecSchema, cells);
|
VerticalRange[] ranges = getRangesForInternalBlocks(ecSchema, cellSize,
|
||||||
|
cells);
|
||||||
|
|
||||||
// Step 3: merge into at most 5 stripes
|
// Step 3: merge into at most 5 stripes
|
||||||
AlignedStripe[] stripes = mergeRangesForInternalBlocks(ecSchema, ranges);
|
AlignedStripe[] stripes = mergeRangesForInternalBlocks(ecSchema, ranges);
|
||||||
|
|
||||||
// Step 4: calculate each chunk's position in destination buffer
|
// Step 4: calculate each chunk's position in destination buffer
|
||||||
calcualteChunkPositionsInBuf(ecSchema, stripes, cells, buf, offsetInBuf);
|
calcualteChunkPositionsInBuf(ecSchema, cellSize, stripes, cells, buf,
|
||||||
|
offsetInBuf);
|
||||||
|
|
||||||
// Step 5: prepare ALLZERO blocks
|
// Step 5: prepare ALLZERO blocks
|
||||||
prepareAllZeroChunks(blockGroup, buf, stripes, cellSize, dataBlkNum);
|
prepareAllZeroChunks(blockGroup, buf, stripes, cellSize, dataBlkNum);
|
||||||
|
@ -351,19 +353,18 @@ public class StripedBlockUtil {
|
||||||
*/
|
*/
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
private static StripingCell[] getStripingCellsOfByteRange(ECSchema ecSchema,
|
private static StripingCell[] getStripingCellsOfByteRange(ECSchema ecSchema,
|
||||||
LocatedStripedBlock blockGroup,
|
int cellSize, LocatedStripedBlock blockGroup,
|
||||||
long rangeStartInBlockGroup, long rangeEndInBlockGroup) {
|
long rangeStartInBlockGroup, long rangeEndInBlockGroup) {
|
||||||
Preconditions.checkArgument(
|
Preconditions.checkArgument(
|
||||||
rangeStartInBlockGroup <= rangeEndInBlockGroup &&
|
rangeStartInBlockGroup <= rangeEndInBlockGroup &&
|
||||||
rangeEndInBlockGroup < blockGroup.getBlockSize());
|
rangeEndInBlockGroup < blockGroup.getBlockSize());
|
||||||
int cellSize = ecSchema.getChunkSize();
|
|
||||||
int len = (int) (rangeEndInBlockGroup - rangeStartInBlockGroup + 1);
|
int len = (int) (rangeEndInBlockGroup - rangeStartInBlockGroup + 1);
|
||||||
int firstCellIdxInBG = (int) (rangeStartInBlockGroup / cellSize);
|
int firstCellIdxInBG = (int) (rangeStartInBlockGroup / cellSize);
|
||||||
int lastCellIdxInBG = (int) (rangeEndInBlockGroup / cellSize);
|
int lastCellIdxInBG = (int) (rangeEndInBlockGroup / cellSize);
|
||||||
int numCells = lastCellIdxInBG - firstCellIdxInBG + 1;
|
int numCells = lastCellIdxInBG - firstCellIdxInBG + 1;
|
||||||
StripingCell[] cells = new StripingCell[numCells];
|
StripingCell[] cells = new StripingCell[numCells];
|
||||||
cells[0] = new StripingCell(ecSchema, firstCellIdxInBG);
|
cells[0] = new StripingCell(ecSchema, cellSize, firstCellIdxInBG);
|
||||||
cells[numCells - 1] = new StripingCell(ecSchema, lastCellIdxInBG);
|
cells[numCells - 1] = new StripingCell(ecSchema, cellSize, lastCellIdxInBG);
|
||||||
|
|
||||||
cells[0].offset = (int) (rangeStartInBlockGroup % cellSize);
|
cells[0].offset = (int) (rangeStartInBlockGroup % cellSize);
|
||||||
cells[0].size =
|
cells[0].size =
|
||||||
|
@ -373,7 +374,7 @@ public class StripedBlockUtil {
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int i = 1; i < numCells - 1; i++) {
|
for (int i = 1; i < numCells - 1; i++) {
|
||||||
cells[i] = new StripingCell(ecSchema, i + firstCellIdxInBG);
|
cells[i] = new StripingCell(ecSchema, cellSize, i + firstCellIdxInBG);
|
||||||
}
|
}
|
||||||
|
|
||||||
return cells;
|
return cells;
|
||||||
|
@ -383,18 +384,16 @@ public class StripedBlockUtil {
|
||||||
* Given a logical start offset in a block group, calculate the physical
|
* Given a logical start offset in a block group, calculate the physical
|
||||||
* start offset into each stored internal block.
|
* start offset into each stored internal block.
|
||||||
*/
|
*/
|
||||||
public static long[] getStartOffsetsForInternalBlocks(
|
public static long[] getStartOffsetsForInternalBlocks(ECSchema ecSchema,
|
||||||
ECSchema ecSchema, LocatedStripedBlock blockGroup,
|
int cellSize, LocatedStripedBlock blockGroup, long rangeStartInBlockGroup) {
|
||||||
long rangeStartInBlockGroup) {
|
|
||||||
Preconditions.checkArgument(
|
Preconditions.checkArgument(
|
||||||
rangeStartInBlockGroup < blockGroup.getBlockSize());
|
rangeStartInBlockGroup < blockGroup.getBlockSize());
|
||||||
int dataBlkNum = ecSchema.getNumDataUnits();
|
int dataBlkNum = ecSchema.getNumDataUnits();
|
||||||
int parityBlkNum = ecSchema.getNumParityUnits();
|
int parityBlkNum = ecSchema.getNumParityUnits();
|
||||||
int cellSize = ecSchema.getChunkSize();
|
|
||||||
long[] startOffsets = new long[dataBlkNum + parityBlkNum];
|
long[] startOffsets = new long[dataBlkNum + parityBlkNum];
|
||||||
Arrays.fill(startOffsets, -1L);
|
Arrays.fill(startOffsets, -1L);
|
||||||
int firstCellIdxInBG = (int) (rangeStartInBlockGroup / cellSize);
|
int firstCellIdxInBG = (int) (rangeStartInBlockGroup / cellSize);
|
||||||
StripingCell firstCell = new StripingCell(ecSchema, firstCellIdxInBG);
|
StripingCell firstCell = new StripingCell(ecSchema, cellSize, firstCellIdxInBG);
|
||||||
firstCell.offset = (int) (rangeStartInBlockGroup % cellSize);
|
firstCell.offset = (int) (rangeStartInBlockGroup % cellSize);
|
||||||
startOffsets[firstCell.idxInStripe] =
|
startOffsets[firstCell.idxInStripe] =
|
||||||
firstCell.idxInInternalBlk * cellSize + firstCell.offset;
|
firstCell.idxInInternalBlk * cellSize + firstCell.offset;
|
||||||
|
@ -404,7 +403,7 @@ public class StripedBlockUtil {
|
||||||
if (idx * cellSize >= blockGroup.getBlockSize()) {
|
if (idx * cellSize >= blockGroup.getBlockSize()) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
StripingCell cell = new StripingCell(ecSchema, idx);
|
StripingCell cell = new StripingCell(ecSchema, cellSize, idx);
|
||||||
startOffsets[cell.idxInStripe] = cell.idxInInternalBlk * cellSize;
|
startOffsets[cell.idxInStripe] = cell.idxInInternalBlk * cellSize;
|
||||||
if (startOffsets[cell.idxInStripe] < earliestStart) {
|
if (startOffsets[cell.idxInStripe] < earliestStart) {
|
||||||
earliestStart = startOffsets[cell.idxInStripe];
|
earliestStart = startOffsets[cell.idxInStripe];
|
||||||
|
@ -422,8 +421,7 @@ public class StripedBlockUtil {
|
||||||
*/
|
*/
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
private static VerticalRange[] getRangesForInternalBlocks(ECSchema ecSchema,
|
private static VerticalRange[] getRangesForInternalBlocks(ECSchema ecSchema,
|
||||||
StripingCell[] cells) {
|
int cellSize, StripingCell[] cells) {
|
||||||
int cellSize = ecSchema.getChunkSize();
|
|
||||||
int dataBlkNum = ecSchema.getNumDataUnits();
|
int dataBlkNum = ecSchema.getNumDataUnits();
|
||||||
int parityBlkNum = ecSchema.getNumParityUnits();
|
int parityBlkNum = ecSchema.getNumParityUnits();
|
||||||
|
|
||||||
|
@ -486,7 +484,7 @@ public class StripedBlockUtil {
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void calcualteChunkPositionsInBuf(ECSchema ecSchema,
|
private static void calcualteChunkPositionsInBuf(ECSchema ecSchema,
|
||||||
AlignedStripe[] stripes, StripingCell[] cells, byte[] buf,
|
int cellSize, AlignedStripe[] stripes, StripingCell[] cells, byte[] buf,
|
||||||
int offsetInBuf) {
|
int offsetInBuf) {
|
||||||
/**
|
/**
|
||||||
* | <--------------- AlignedStripe --------------->|
|
* | <--------------- AlignedStripe --------------->|
|
||||||
|
@ -505,7 +503,6 @@ public class StripedBlockUtil {
|
||||||
*
|
*
|
||||||
* Cell indexing convention defined in {@link StripingCell}
|
* Cell indexing convention defined in {@link StripingCell}
|
||||||
*/
|
*/
|
||||||
int cellSize = ecSchema.getChunkSize();
|
|
||||||
int done = 0;
|
int done = 0;
|
||||||
for (StripingCell cell : cells) {
|
for (StripingCell cell : cells) {
|
||||||
long cellStart = cell.idxInInternalBlk * cellSize + cell.offset;
|
long cellStart = cell.idxInInternalBlk * cellSize + cell.offset;
|
||||||
|
@ -587,17 +584,17 @@ public class StripedBlockUtil {
|
||||||
int offset;
|
int offset;
|
||||||
int size;
|
int size;
|
||||||
|
|
||||||
StripingCell(ECSchema ecSchema, int idxInBlkGroup) {
|
StripingCell(ECSchema ecSchema, int cellSize, int idxInBlkGroup) {
|
||||||
this.schema = ecSchema;
|
this.schema = ecSchema;
|
||||||
this.idxInBlkGroup = idxInBlkGroup;
|
this.idxInBlkGroup = idxInBlkGroup;
|
||||||
this.idxInInternalBlk = idxInBlkGroup / ecSchema.getNumDataUnits();
|
this.idxInInternalBlk = idxInBlkGroup / ecSchema.getNumDataUnits();
|
||||||
this.idxInStripe = idxInBlkGroup -
|
this.idxInStripe = idxInBlkGroup -
|
||||||
this.idxInInternalBlk * ecSchema.getNumDataUnits();
|
this.idxInInternalBlk * ecSchema.getNumDataUnits();
|
||||||
this.offset = 0;
|
this.offset = 0;
|
||||||
this.size = ecSchema.getChunkSize();
|
this.size = cellSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
StripingCell(ECSchema ecSchema, int idxInInternalBlk,
|
StripingCell(ECSchema ecSchema, int cellSize, int idxInInternalBlk,
|
||||||
int idxInStripe) {
|
int idxInStripe) {
|
||||||
this.schema = ecSchema;
|
this.schema = ecSchema;
|
||||||
this.idxInInternalBlk = idxInInternalBlk;
|
this.idxInInternalBlk = idxInInternalBlk;
|
||||||
|
@ -605,7 +602,7 @@ public class StripedBlockUtil {
|
||||||
this.idxInBlkGroup =
|
this.idxInBlkGroup =
|
||||||
idxInInternalBlk * ecSchema.getNumDataUnits() + idxInStripe;
|
idxInInternalBlk * ecSchema.getNumDataUnits() + idxInStripe;
|
||||||
this.offset = 0;
|
this.offset = 0;
|
||||||
this.size = ecSchema.getChunkSize();
|
this.size = cellSize;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -37,11 +37,13 @@ message ErasureCodingInfoProto {
|
||||||
message ErasureCodingZoneInfoProto {
|
message ErasureCodingZoneInfoProto {
|
||||||
required string dir = 1;
|
required string dir = 1;
|
||||||
required ECSchemaProto schema = 2;
|
required ECSchemaProto schema = 2;
|
||||||
|
required uint32 cellSize = 3;
|
||||||
}
|
}
|
||||||
|
|
||||||
message CreateErasureCodingZoneRequestProto {
|
message CreateErasureCodingZoneRequestProto {
|
||||||
required string src = 1;
|
required string src = 1;
|
||||||
optional ECSchemaProto schema = 2;
|
optional ECSchemaProto schema = 2;
|
||||||
|
optional uint32 cellSize = 3;
|
||||||
}
|
}
|
||||||
|
|
||||||
message CreateErasureCodingZoneResponseProto {
|
message CreateErasureCodingZoneResponseProto {
|
||||||
|
@ -81,4 +83,5 @@ message BlockECRecoveryInfoProto {
|
||||||
required StorageTypesProto targetStorageTypes = 5;
|
required StorageTypesProto targetStorageTypes = 5;
|
||||||
repeated uint32 liveBlockIndices = 6;
|
repeated uint32 liveBlockIndices = 6;
|
||||||
required ECSchemaProto ecSchema = 7;
|
required ECSchemaProto ecSchema = 7;
|
||||||
|
required uint32 cellSize = 8;
|
||||||
}
|
}
|
|
@ -359,7 +359,8 @@ message HdfsFileStatusProto {
|
||||||
|
|
||||||
// Optional field for erasure coding
|
// Optional field for erasure coding
|
||||||
optional ECSchemaProto ecSchema = 17;
|
optional ECSchemaProto ecSchema = 17;
|
||||||
}
|
optional uint32 stripeCellSize = 18;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Checksum algorithms/types used in HDFS
|
* Checksum algorithms/types used in HDFS
|
||||||
|
|
|
@ -1867,7 +1867,7 @@ public class DFSTestUtil {
|
||||||
assert dir != null;
|
assert dir != null;
|
||||||
dfs.mkdirs(dir);
|
dfs.mkdirs(dir);
|
||||||
try {
|
try {
|
||||||
dfs.getClient().createErasureCodingZone(dir.toString(), null);
|
dfs.getClient().createErasureCodingZone(dir.toString(), null, 0);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
if (!e.getMessage().contains("non-empty directory")) {
|
if (!e.getMessage().contains("non-empty directory")) {
|
||||||
throw e;
|
throw e;
|
||||||
|
|
|
@ -255,12 +255,12 @@ public class TestDFSClientRetries {
|
||||||
Mockito.doReturn(
|
Mockito.doReturn(
|
||||||
new HdfsFileStatus(0, false, 1, 1024, 0, 0, new FsPermission(
|
new HdfsFileStatus(0, false, 1, 1024, 0, 0, new FsPermission(
|
||||||
(short) 777), "owner", "group", new byte[0], new byte[0],
|
(short) 777), "owner", "group", new byte[0], new byte[0],
|
||||||
1010, 0, null, (byte) 0, null)).when(mockNN).getFileInfo(anyString());
|
1010, 0, null, (byte) 0, null, 0)).when(mockNN).getFileInfo(anyString());
|
||||||
|
|
||||||
Mockito.doReturn(
|
Mockito.doReturn(
|
||||||
new HdfsFileStatus(0, false, 1, 1024, 0, 0, new FsPermission(
|
new HdfsFileStatus(0, false, 1, 1024, 0, 0, new FsPermission(
|
||||||
(short) 777), "owner", "group", new byte[0], new byte[0],
|
(short) 777), "owner", "group", new byte[0], new byte[0],
|
||||||
1010, 0, null, (byte) 0, null))
|
1010, 0, null, (byte) 0, null, 0))
|
||||||
.when(mockNN)
|
.when(mockNN)
|
||||||
.create(anyString(), (FsPermission) anyObject(), anyString(),
|
.create(anyString(), (FsPermission) anyObject(), anyString(),
|
||||||
(EnumSetWritable<CreateFlag>) anyObject(), anyBoolean(),
|
(EnumSetWritable<CreateFlag>) anyObject(), anyBoolean(),
|
||||||
|
|
|
@ -73,7 +73,7 @@ public class TestDFSStripedInputStream {
|
||||||
cluster.waitActive();
|
cluster.waitActive();
|
||||||
fs = cluster.getFileSystem();
|
fs = cluster.getFileSystem();
|
||||||
fs.mkdirs(dirPath);
|
fs.mkdirs(dirPath);
|
||||||
fs.getClient().createErasureCodingZone(dirPath.toString(), null);
|
fs.getClient().createErasureCodingZone(dirPath.toString(), null, CELLSIZE);
|
||||||
}
|
}
|
||||||
|
|
||||||
@After
|
@After
|
||||||
|
@ -94,7 +94,7 @@ public class TestDFSStripedInputStream {
|
||||||
LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations(
|
LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations(
|
||||||
filePath.toString(), 0, BLOCK_GROUP_SIZE * numBlocks);
|
filePath.toString(), 0, BLOCK_GROUP_SIZE * numBlocks);
|
||||||
final DFSStripedInputStream in = new DFSStripedInputStream(fs.getClient(),
|
final DFSStripedInputStream in = new DFSStripedInputStream(fs.getClient(),
|
||||||
filePath.toString(), false, schema);
|
filePath.toString(), false, schema, CELLSIZE);
|
||||||
|
|
||||||
List<LocatedBlock> lbList = lbs.getLocatedBlocks();
|
List<LocatedBlock> lbList = lbs.getLocatedBlocks();
|
||||||
for (LocatedBlock aLbList : lbList) {
|
for (LocatedBlock aLbList : lbList) {
|
||||||
|
@ -146,7 +146,7 @@ public class TestDFSStripedInputStream {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
DFSStripedInputStream in = new DFSStripedInputStream(fs.getClient(),
|
DFSStripedInputStream in = new DFSStripedInputStream(fs.getClient(),
|
||||||
filePath.toString(), false, schema);
|
filePath.toString(), false, schema, CELLSIZE);
|
||||||
|
|
||||||
int[] startOffsets = {0, 1, CELLSIZE - 102, CELLSIZE, CELLSIZE + 102,
|
int[] startOffsets = {0, 1, CELLSIZE - 102, CELLSIZE, CELLSIZE + 102,
|
||||||
CELLSIZE*DATA_BLK_NUM, CELLSIZE*DATA_BLK_NUM + 102,
|
CELLSIZE*DATA_BLK_NUM, CELLSIZE*DATA_BLK_NUM + 102,
|
||||||
|
@ -188,7 +188,7 @@ public class TestDFSStripedInputStream {
|
||||||
}
|
}
|
||||||
DFSStripedInputStream in =
|
DFSStripedInputStream in =
|
||||||
new DFSStripedInputStream(fs.getClient(), filePath.toString(), false,
|
new DFSStripedInputStream(fs.getClient(), filePath.toString(), false,
|
||||||
ErasureCodingSchemaManager.getSystemDefaultSchema());
|
ErasureCodingSchemaManager.getSystemDefaultSchema(), CELLSIZE);
|
||||||
int readSize = BLOCK_GROUP_SIZE;
|
int readSize = BLOCK_GROUP_SIZE;
|
||||||
byte[] readBuffer = new byte[readSize];
|
byte[] readBuffer = new byte[readSize];
|
||||||
byte[] expected = new byte[readSize];
|
byte[] expected = new byte[readSize];
|
||||||
|
@ -284,7 +284,7 @@ public class TestDFSStripedInputStream {
|
||||||
|
|
||||||
DFSStripedInputStream in =
|
DFSStripedInputStream in =
|
||||||
new DFSStripedInputStream(fs.getClient(), filePath.toString(),
|
new DFSStripedInputStream(fs.getClient(), filePath.toString(),
|
||||||
false, schema);
|
false, schema, CELLSIZE);
|
||||||
|
|
||||||
byte[] expected = new byte[fileSize];
|
byte[] expected = new byte[fileSize];
|
||||||
|
|
||||||
|
|
|
@ -65,7 +65,7 @@ public class TestDFSStripedOutputStream {
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
|
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
|
||||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
|
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
|
||||||
cluster.getFileSystem().getClient().createErasureCodingZone("/", null);
|
cluster.getFileSystem().getClient().createErasureCodingZone("/", null, 0);
|
||||||
fs = cluster.getFileSystem();
|
fs = cluster.getFileSystem();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -74,7 +74,7 @@ public class TestDFSStripedOutputStreamWithFailure {
|
||||||
cluster.waitActive();
|
cluster.waitActive();
|
||||||
dfs = cluster.getFileSystem();
|
dfs = cluster.getFileSystem();
|
||||||
dfs.mkdirs(dir);
|
dfs.mkdirs(dir);
|
||||||
dfs.createErasureCodingZone(dir, null);
|
dfs.createErasureCodingZone(dir, null, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
@After
|
@After
|
||||||
|
|
|
@ -737,7 +737,7 @@ public class TestEncryptionZones {
|
||||||
version, new byte[suite.getAlgorithmBlockSize()],
|
version, new byte[suite.getAlgorithmBlockSize()],
|
||||||
new byte[suite.getAlgorithmBlockSize()],
|
new byte[suite.getAlgorithmBlockSize()],
|
||||||
"fakeKey", "fakeVersion"),
|
"fakeKey", "fakeVersion"),
|
||||||
(byte) 0, null))
|
(byte) 0, null, 0))
|
||||||
.when(mcp)
|
.when(mcp)
|
||||||
.create(anyString(), (FsPermission) anyObject(), anyString(),
|
.create(anyString(), (FsPermission) anyObject(), anyString(),
|
||||||
(EnumSetWritable<CreateFlag>) anyObject(), anyBoolean(),
|
(EnumSetWritable<CreateFlag>) anyObject(), anyBoolean(),
|
||||||
|
|
|
@ -63,7 +63,7 @@ public class TestErasureCodingZones {
|
||||||
fs.mkdir(testDir, FsPermission.getDirDefault());
|
fs.mkdir(testDir, FsPermission.getDirDefault());
|
||||||
|
|
||||||
/* Normal creation of an erasure coding zone */
|
/* Normal creation of an erasure coding zone */
|
||||||
fs.getClient().createErasureCodingZone(testDir.toString(), null);
|
fs.getClient().createErasureCodingZone(testDir.toString(), null, 0);
|
||||||
|
|
||||||
/* Verify files under the zone are striped */
|
/* Verify files under the zone are striped */
|
||||||
final Path ECFilePath = new Path(testDir, "foo");
|
final Path ECFilePath = new Path(testDir, "foo");
|
||||||
|
@ -76,7 +76,7 @@ public class TestErasureCodingZones {
|
||||||
fs.mkdir(notEmpty, FsPermission.getDirDefault());
|
fs.mkdir(notEmpty, FsPermission.getDirDefault());
|
||||||
fs.create(new Path(notEmpty, "foo"));
|
fs.create(new Path(notEmpty, "foo"));
|
||||||
try {
|
try {
|
||||||
fs.getClient().createErasureCodingZone(notEmpty.toString(), null);
|
fs.getClient().createErasureCodingZone(notEmpty.toString(), null, 0);
|
||||||
fail("Erasure coding zone on non-empty dir");
|
fail("Erasure coding zone on non-empty dir");
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
assertExceptionContains("erasure coding zone for a non-empty directory", e);
|
assertExceptionContains("erasure coding zone for a non-empty directory", e);
|
||||||
|
@ -86,10 +86,10 @@ public class TestErasureCodingZones {
|
||||||
final Path zone1 = new Path("/zone1");
|
final Path zone1 = new Path("/zone1");
|
||||||
final Path zone2 = new Path(zone1, "zone2");
|
final Path zone2 = new Path(zone1, "zone2");
|
||||||
fs.mkdir(zone1, FsPermission.getDirDefault());
|
fs.mkdir(zone1, FsPermission.getDirDefault());
|
||||||
fs.getClient().createErasureCodingZone(zone1.toString(), null);
|
fs.getClient().createErasureCodingZone(zone1.toString(), null, 0);
|
||||||
fs.mkdir(zone2, FsPermission.getDirDefault());
|
fs.mkdir(zone2, FsPermission.getDirDefault());
|
||||||
try {
|
try {
|
||||||
fs.getClient().createErasureCodingZone(zone2.toString(), null);
|
fs.getClient().createErasureCodingZone(zone2.toString(), null, 0);
|
||||||
fail("Nested erasure coding zones");
|
fail("Nested erasure coding zones");
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
assertExceptionContains("already in an erasure coding zone", e);
|
assertExceptionContains("already in an erasure coding zone", e);
|
||||||
|
@ -99,7 +99,7 @@ public class TestErasureCodingZones {
|
||||||
final Path fPath = new Path("/file");
|
final Path fPath = new Path("/file");
|
||||||
fs.create(fPath);
|
fs.create(fPath);
|
||||||
try {
|
try {
|
||||||
fs.getClient().createErasureCodingZone(fPath.toString(), null);
|
fs.getClient().createErasureCodingZone(fPath.toString(), null, 0);
|
||||||
fail("Erasure coding zone on file");
|
fail("Erasure coding zone on file");
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
assertExceptionContains("erasure coding zone for a file", e);
|
assertExceptionContains("erasure coding zone for a file", e);
|
||||||
|
@ -112,8 +112,8 @@ public class TestErasureCodingZones {
|
||||||
final Path dstECDir = new Path("/dstEC");
|
final Path dstECDir = new Path("/dstEC");
|
||||||
fs.mkdir(srcECDir, FsPermission.getDirDefault());
|
fs.mkdir(srcECDir, FsPermission.getDirDefault());
|
||||||
fs.mkdir(dstECDir, FsPermission.getDirDefault());
|
fs.mkdir(dstECDir, FsPermission.getDirDefault());
|
||||||
fs.getClient().createErasureCodingZone(srcECDir.toString(), null);
|
fs.getClient().createErasureCodingZone(srcECDir.toString(), null, 0);
|
||||||
fs.getClient().createErasureCodingZone(dstECDir.toString(), null);
|
fs.getClient().createErasureCodingZone(dstECDir.toString(), null, 0);
|
||||||
final Path srcFile = new Path(srcECDir, "foo");
|
final Path srcFile = new Path(srcECDir, "foo");
|
||||||
fs.create(srcFile);
|
fs.create(srcFile);
|
||||||
|
|
||||||
|
@ -157,7 +157,7 @@ public class TestErasureCodingZones {
|
||||||
// dir ECInfo before creating ec zone
|
// dir ECInfo before creating ec zone
|
||||||
assertNull(fs.getClient().getErasureCodingInfo(src));
|
assertNull(fs.getClient().getErasureCodingInfo(src));
|
||||||
// dir ECInfo after creating ec zone
|
// dir ECInfo after creating ec zone
|
||||||
fs.getClient().createErasureCodingZone(src, null); //Default one will be used.
|
fs.getClient().createErasureCodingZone(src, null, 0); //Default one will be used.
|
||||||
ECSchema sysDefaultSchema = ErasureCodingSchemaManager.getSystemDefaultSchema();
|
ECSchema sysDefaultSchema = ErasureCodingSchemaManager.getSystemDefaultSchema();
|
||||||
verifyErasureCodingInfo(src, sysDefaultSchema);
|
verifyErasureCodingInfo(src, sysDefaultSchema);
|
||||||
fs.create(new Path(ecDir, "/child1")).close();
|
fs.create(new Path(ecDir, "/child1")).close();
|
||||||
|
@ -178,7 +178,7 @@ public class TestErasureCodingZones {
|
||||||
// dir ECInfo before creating ec zone
|
// dir ECInfo before creating ec zone
|
||||||
assertNull(fs.getClient().getErasureCodingInfo(src));
|
assertNull(fs.getClient().getErasureCodingInfo(src));
|
||||||
// dir ECInfo after creating ec zone
|
// dir ECInfo after creating ec zone
|
||||||
fs.getClient().createErasureCodingZone(src, usingSchema);
|
fs.getClient().createErasureCodingZone(src, usingSchema, 0);
|
||||||
verifyErasureCodingInfo(src, usingSchema);
|
verifyErasureCodingInfo(src, usingSchema);
|
||||||
fs.create(new Path(ecDir, "/child1")).close();
|
fs.create(new Path(ecDir, "/child1")).close();
|
||||||
// verify for the files in ec zone
|
// verify for the files in ec zone
|
||||||
|
|
|
@ -50,7 +50,7 @@ public class TestFileStatusWithECschema {
|
||||||
|
|
||||||
final ECSchema schema1 = ErasureCodingSchemaManager.getSystemDefaultSchema();
|
final ECSchema schema1 = ErasureCodingSchemaManager.getSystemDefaultSchema();
|
||||||
// create EC zone on dir
|
// create EC zone on dir
|
||||||
fs.createErasureCodingZone(dir, schema1);
|
fs.createErasureCodingZone(dir, schema1, 0);
|
||||||
final ECSchema schame2 = client.getFileInfo(dir.toUri().getPath()).getECSchema();
|
final ECSchema schame2 = client.getFileInfo(dir.toUri().getPath()).getECSchema();
|
||||||
assertNotNull(schame2);
|
assertNotNull(schame2);
|
||||||
assertTrue(schema1.equals(schame2));
|
assertTrue(schema1.equals(schame2));
|
||||||
|
|
|
@ -354,12 +354,12 @@ public class TestLease {
|
||||||
Mockito.doReturn(
|
Mockito.doReturn(
|
||||||
new HdfsFileStatus(0, false, 1, 1024, 0, 0, new FsPermission(
|
new HdfsFileStatus(0, false, 1, 1024, 0, 0, new FsPermission(
|
||||||
(short) 777), "owner", "group", new byte[0], new byte[0],
|
(short) 777), "owner", "group", new byte[0], new byte[0],
|
||||||
1010, 0, null, (byte) 0, null)).when(mcp).getFileInfo(anyString());
|
1010, 0, null, (byte) 0, null, 0)).when(mcp).getFileInfo(anyString());
|
||||||
Mockito
|
Mockito
|
||||||
.doReturn(
|
.doReturn(
|
||||||
new HdfsFileStatus(0, false, 1, 1024, 0, 0, new FsPermission(
|
new HdfsFileStatus(0, false, 1, 1024, 0, 0, new FsPermission(
|
||||||
(short) 777), "owner", "group", new byte[0], new byte[0],
|
(short) 777), "owner", "group", new byte[0], new byte[0],
|
||||||
1010, 0, null, (byte) 0, null))
|
1010, 0, null, (byte) 0, null, 0))
|
||||||
.when(mcp)
|
.when(mcp)
|
||||||
.create(anyString(), (FsPermission) anyObject(), anyString(),
|
.create(anyString(), (FsPermission) anyObject(), anyString(),
|
||||||
(EnumSetWritable<CreateFlag>) anyObject(), anyBoolean(),
|
(EnumSetWritable<CreateFlag>) anyObject(), anyBoolean(),
|
||||||
|
|
|
@ -77,7 +77,7 @@ public class TestRecoverStripedFile {
|
||||||
cluster.waitActive();
|
cluster.waitActive();
|
||||||
|
|
||||||
fs = cluster.getFileSystem();
|
fs = cluster.getFileSystem();
|
||||||
fs.getClient().createErasureCodingZone("/", null);
|
fs.getClient().createErasureCodingZone("/", null, 0);
|
||||||
|
|
||||||
List<DataNode> datanodes = cluster.getDataNodes();
|
List<DataNode> datanodes = cluster.getDataNodes();
|
||||||
for (int i = 0; i < dnNum; i++) {
|
for (int i = 0; i < dnNum; i++) {
|
||||||
|
|
|
@ -59,7 +59,7 @@ public class TestWriteReadStripedFile {
|
||||||
conf = new Configuration();
|
conf = new Configuration();
|
||||||
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
|
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
|
||||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
|
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
|
||||||
cluster.getFileSystem().getClient().createErasureCodingZone("/", null);
|
cluster.getFileSystem().getClient().createErasureCodingZone("/", null, cellSize);
|
||||||
fs = cluster.getFileSystem();
|
fs = cluster.getFileSystem();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -663,7 +663,8 @@ public class TestPBHelper {
|
||||||
short[] liveBlkIndices0 = new short[2];
|
short[] liveBlkIndices0 = new short[2];
|
||||||
BlockECRecoveryInfo blkECRecoveryInfo0 = new BlockECRecoveryInfo(
|
BlockECRecoveryInfo blkECRecoveryInfo0 = new BlockECRecoveryInfo(
|
||||||
new ExtendedBlock("bp1", 1234), dnInfos0, targetDnInfos0,
|
new ExtendedBlock("bp1", 1234), dnInfos0, targetDnInfos0,
|
||||||
liveBlkIndices0, ErasureCodingSchemaManager.getSystemDefaultSchema());
|
liveBlkIndices0, ErasureCodingSchemaManager.getSystemDefaultSchema(),
|
||||||
|
64 * 1024);
|
||||||
DatanodeInfo[] dnInfos1 = new DatanodeInfo[] {
|
DatanodeInfo[] dnInfos1 = new DatanodeInfo[] {
|
||||||
DFSTestUtil.getLocalDatanodeInfo(), DFSTestUtil.getLocalDatanodeInfo() };
|
DFSTestUtil.getLocalDatanodeInfo(), DFSTestUtil.getLocalDatanodeInfo() };
|
||||||
DatanodeStorageInfo targetDnInfos_2 = BlockManagerTestUtil
|
DatanodeStorageInfo targetDnInfos_2 = BlockManagerTestUtil
|
||||||
|
@ -677,7 +678,8 @@ public class TestPBHelper {
|
||||||
short[] liveBlkIndices1 = new short[2];
|
short[] liveBlkIndices1 = new short[2];
|
||||||
BlockECRecoveryInfo blkECRecoveryInfo1 = new BlockECRecoveryInfo(
|
BlockECRecoveryInfo blkECRecoveryInfo1 = new BlockECRecoveryInfo(
|
||||||
new ExtendedBlock("bp2", 3256), dnInfos1, targetDnInfos1,
|
new ExtendedBlock("bp2", 3256), dnInfos1, targetDnInfos1,
|
||||||
liveBlkIndices1, ErasureCodingSchemaManager.getSystemDefaultSchema());
|
liveBlkIndices1, ErasureCodingSchemaManager.getSystemDefaultSchema(),
|
||||||
|
64 * 1024);
|
||||||
List<BlockECRecoveryInfo> blkRecoveryInfosList = new ArrayList<BlockECRecoveryInfo>();
|
List<BlockECRecoveryInfo> blkRecoveryInfosList = new ArrayList<BlockECRecoveryInfo>();
|
||||||
blkRecoveryInfosList.add(blkECRecoveryInfo0);
|
blkRecoveryInfosList.add(blkECRecoveryInfo0);
|
||||||
blkRecoveryInfosList.add(blkECRecoveryInfo1);
|
blkRecoveryInfosList.add(blkECRecoveryInfo1);
|
||||||
|
|
|
@ -74,7 +74,7 @@ public class TestAddStripedBlocks {
|
||||||
.numDataNodes(GROUP_SIZE).build();
|
.numDataNodes(GROUP_SIZE).build();
|
||||||
cluster.waitActive();
|
cluster.waitActive();
|
||||||
dfs = cluster.getFileSystem();
|
dfs = cluster.getFileSystem();
|
||||||
dfs.getClient().createErasureCodingZone("/", null);
|
dfs.getClient().createErasureCodingZone("/", null, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
@After
|
@After
|
||||||
|
|
|
@ -451,7 +451,7 @@ public class TestFSEditLogLoader {
|
||||||
|
|
||||||
//set the storage policy of the directory
|
//set the storage policy of the directory
|
||||||
fs.mkdir(new Path(testDir), new FsPermission("755"));
|
fs.mkdir(new Path(testDir), new FsPermission("755"));
|
||||||
fs.getClient().getNamenode().createErasureCodingZone(testDir, null);
|
fs.getClient().getNamenode().createErasureCodingZone(testDir, null, 0);
|
||||||
|
|
||||||
// Create a file with striped block
|
// Create a file with striped block
|
||||||
Path p = new Path(testFilePath);
|
Path p = new Path(testFilePath);
|
||||||
|
@ -523,7 +523,7 @@ public class TestFSEditLogLoader {
|
||||||
|
|
||||||
//set the storage policy of the directory
|
//set the storage policy of the directory
|
||||||
fs.mkdir(new Path(testDir), new FsPermission("755"));
|
fs.mkdir(new Path(testDir), new FsPermission("755"));
|
||||||
fs.getClient().getNamenode().createErasureCodingZone(testDir, null);
|
fs.getClient().getNamenode().createErasureCodingZone(testDir, null, 0);
|
||||||
|
|
||||||
//create a file with striped blocks
|
//create a file with striped blocks
|
||||||
Path p = new Path(testFilePath);
|
Path p = new Path(testFilePath);
|
||||||
|
|
|
@ -140,7 +140,7 @@ public class TestFSImage {
|
||||||
private void testSaveAndLoadStripedINodeFile(FSNamesystem fsn, Configuration conf,
|
private void testSaveAndLoadStripedINodeFile(FSNamesystem fsn, Configuration conf,
|
||||||
boolean isUC) throws IOException{
|
boolean isUC) throws IOException{
|
||||||
// contruct a INode with StripedBlock for saving and loading
|
// contruct a INode with StripedBlock for saving and loading
|
||||||
fsn.createErasureCodingZone("/", null, false);
|
fsn.createErasureCodingZone("/", null, 0, false);
|
||||||
long id = 123456789;
|
long id = 123456789;
|
||||||
byte[] name = "testSaveAndLoadInodeFile_testfile".getBytes();
|
byte[] name = "testSaveAndLoadInodeFile_testfile".getBytes();
|
||||||
PermissionStatus permissionStatus = new PermissionStatus("testuser_a",
|
PermissionStatus permissionStatus = new PermissionStatus("testuser_a",
|
||||||
|
@ -402,7 +402,7 @@ public class TestFSImage {
|
||||||
.build();
|
.build();
|
||||||
cluster.waitActive();
|
cluster.waitActive();
|
||||||
DistributedFileSystem fs = cluster.getFileSystem();
|
DistributedFileSystem fs = cluster.getFileSystem();
|
||||||
fs.getClient().getNamenode().createErasureCodingZone("/", null);
|
fs.getClient().getNamenode().createErasureCodingZone("/", null, 0);
|
||||||
Path file = new Path("/striped");
|
Path file = new Path("/striped");
|
||||||
FSDataOutputStream out = fs.create(file);
|
FSDataOutputStream out = fs.create(file);
|
||||||
byte[] bytes = DFSTestUtil.generateSequentialBytes(0, BLOCK_SIZE);
|
byte[] bytes = DFSTestUtil.generateSequentialBytes(0, BLOCK_SIZE);
|
||||||
|
|
|
@ -1198,7 +1198,7 @@ public class TestFsck {
|
||||||
|
|
||||||
HdfsFileStatus file = new HdfsFileStatus(length, isDir, blockReplication,
|
HdfsFileStatus file = new HdfsFileStatus(length, isDir, blockReplication,
|
||||||
blockSize, modTime, accessTime, perms, owner, group, symlink,
|
blockSize, modTime, accessTime, perms, owner, group, symlink,
|
||||||
path, fileId, numChildren, null, storagePolicy, null);
|
path, fileId, numChildren, null, storagePolicy, null, 0);
|
||||||
Result res = new Result(conf);
|
Result res = new Result(conf);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
@ -1629,4 +1629,4 @@ public class TestFsck {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -66,7 +66,7 @@ public class TestQuotaWithStripedBlocks {
|
||||||
dfs = cluster.getFileSystem();
|
dfs = cluster.getFileSystem();
|
||||||
|
|
||||||
dfs.mkdirs(ecDir);
|
dfs.mkdirs(ecDir);
|
||||||
dfs.getClient().createErasureCodingZone(ecDir.toString(), ecSchema);
|
dfs.getClient().createErasureCodingZone(ecDir.toString(), ecSchema, 0);
|
||||||
dfs.setQuota(ecDir, Long.MAX_VALUE - 1, DISK_QUOTA);
|
dfs.setQuota(ecDir, Long.MAX_VALUE - 1, DISK_QUOTA);
|
||||||
dfs.setQuotaByStorageType(ecDir, StorageType.DISK, DISK_QUOTA);
|
dfs.setQuotaByStorageType(ecDir, StorageType.DISK, DISK_QUOTA);
|
||||||
dfs.setStoragePolicy(ecDir, HdfsServerConstants.HOT_STORAGE_POLICY_NAME);
|
dfs.setStoragePolicy(ecDir, HdfsServerConstants.HOT_STORAGE_POLICY_NAME);
|
||||||
|
|
|
@ -61,7 +61,7 @@ public class TestOfflineImageViewerWithStripedBlocks {
|
||||||
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
|
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
|
||||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
|
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
|
||||||
cluster.waitActive();
|
cluster.waitActive();
|
||||||
cluster.getFileSystem().getClient().createErasureCodingZone("/", null);
|
cluster.getFileSystem().getClient().createErasureCodingZone("/", null, 0);
|
||||||
fs = cluster.getFileSystem();
|
fs = cluster.getFileSystem();
|
||||||
Path eczone = new Path("/eczone");
|
Path eczone = new Path("/eczone");
|
||||||
fs.mkdirs(eczone);
|
fs.mkdirs(eczone);
|
||||||
|
|
|
@ -152,7 +152,7 @@ public class TestStripedBlockUtil {
|
||||||
int done = 0;
|
int done = 0;
|
||||||
while (done < bgSize) {
|
while (done < bgSize) {
|
||||||
Preconditions.checkState(done % CELLSIZE == 0);
|
Preconditions.checkState(done % CELLSIZE == 0);
|
||||||
StripingCell cell = new StripingCell(SCEHMA, done / CELLSIZE);
|
StripingCell cell = new StripingCell(SCEHMA, CELLSIZE, done / CELLSIZE);
|
||||||
int idxInStripe = cell.idxInStripe;
|
int idxInStripe = cell.idxInStripe;
|
||||||
int size = Math.min(CELLSIZE, bgSize - done);
|
int size = Math.min(CELLSIZE, bgSize - done);
|
||||||
for (int i = 0; i < size; i++) {
|
for (int i = 0; i < size; i++) {
|
||||||
|
@ -247,7 +247,7 @@ public class TestStripedBlockUtil {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
AlignedStripe[] stripes = divideByteRangeIntoStripes(SCEHMA,
|
AlignedStripe[] stripes = divideByteRangeIntoStripes(SCEHMA,
|
||||||
blockGroup, brStart, brStart + brSize - 1, assembled, 0);
|
CELLSIZE, blockGroup, brStart, brStart + brSize - 1, assembled, 0);
|
||||||
|
|
||||||
for (AlignedStripe stripe : stripes) {
|
for (AlignedStripe stripe : stripes) {
|
||||||
for (int i = 0; i < DATA_BLK_NUM; i++) {
|
for (int i = 0; i < DATA_BLK_NUM; i++) {
|
||||||
|
|
|
@ -65,7 +65,7 @@ public class TestJsonUtil {
|
||||||
final HdfsFileStatus status = new HdfsFileStatus(1001L, false, 3, 1L << 26,
|
final HdfsFileStatus status = new HdfsFileStatus(1001L, false, 3, 1L << 26,
|
||||||
now, now + 10, new FsPermission((short) 0644), "user", "group",
|
now, now + 10, new FsPermission((short) 0644), "user", "group",
|
||||||
DFSUtil.string2Bytes("bar"), DFSUtil.string2Bytes("foo"),
|
DFSUtil.string2Bytes("bar"), DFSUtil.string2Bytes("foo"),
|
||||||
HdfsConstants.GRANDFATHER_INODE_ID, 0, null, (byte) 0, null);
|
HdfsConstants.GRANDFATHER_INODE_ID, 0, null, (byte) 0, null, 0);
|
||||||
final FileStatus fstatus = toFileStatus(status, parent);
|
final FileStatus fstatus = toFileStatus(status, parent);
|
||||||
System.out.println("status = " + status);
|
System.out.println("status = " + status);
|
||||||
System.out.println("fstatus = " + fstatus);
|
System.out.println("fstatus = " + fstatus);
|
||||||
|
|
Loading…
Reference in New Issue