HDFS-7866. Erasure coding: NameNode manages multiple erasure coding policies. Contributed by Rui Li.
This commit is contained in:
parent
89b16d27e2
commit
7600e3c48f
|
@ -31,11 +31,25 @@ public final class ErasureCodingPolicy {
|
|||
private final String name;
|
||||
private final ECSchema schema;
|
||||
private final int cellSize;
|
||||
private final byte id;
|
||||
|
||||
public ErasureCodingPolicy(String name, ECSchema schema, int cellSize){
|
||||
public ErasureCodingPolicy(String name, ECSchema schema,
|
||||
int cellSize, byte id) {
|
||||
this.name = name;
|
||||
this.schema = schema;
|
||||
this.cellSize = cellSize;
|
||||
this.id = id;
|
||||
}
|
||||
|
||||
public ErasureCodingPolicy(ECSchema schema, int cellSize, byte id) {
|
||||
this(composePolicyName(schema, cellSize), schema, cellSize, id);
|
||||
}
|
||||
|
||||
private static String composePolicyName(ECSchema schema, int cellSize) {
|
||||
assert cellSize % 1024 == 0;
|
||||
return schema.getCodecName().toUpperCase() + "-" +
|
||||
schema.getNumDataUnits() + "-" + schema.getNumParityUnits() +
|
||||
"-" + cellSize / 1024 + "k";
|
||||
}
|
||||
|
||||
public String getName() {
|
||||
|
@ -58,6 +72,10 @@ public final class ErasureCodingPolicy {
|
|||
return schema.getNumParityUnits();
|
||||
}
|
||||
|
||||
public byte getId() {
|
||||
return id;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) {
|
||||
|
|
|
@ -22,6 +22,7 @@ import java.util.Map;
|
|||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.io.erasurecode.ECSchema;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
|
||||
@InterfaceAudience.Private
|
||||
|
@ -144,6 +145,12 @@ public final class HdfsConstants {
|
|||
ALL, LIVE, DEAD, DECOMMISSIONING
|
||||
}
|
||||
|
||||
public static final ECSchema RS_6_3_SCHEMA = new ECSchema("rs", 6, 3);
|
||||
public static final byte RS_6_3_POLICY_ID = 0;
|
||||
|
||||
public static final ECSchema RS_3_2_SCHEMA = new ECSchema("rs", 3, 2);
|
||||
public static final byte RS_3_2_POLICY_ID = 1;
|
||||
|
||||
/* Hidden constructor */
|
||||
protected HdfsConstants() {
|
||||
}
|
||||
|
|
|
@ -79,7 +79,7 @@ public class HdfsFileStatus {
|
|||
byte storagePolicy, ErasureCodingPolicy ecPolicy) {
|
||||
this.length = length;
|
||||
this.isdir = isdir;
|
||||
this.block_replication = (short)block_replication;
|
||||
this.block_replication = ecPolicy == null ? (short) block_replication : 0;
|
||||
this.blocksize = blocksize;
|
||||
this.modification_time = modification_time;
|
||||
this.access_time = access_time;
|
||||
|
|
|
@ -2487,7 +2487,7 @@ public class PBHelperClient {
|
|||
ErasureCodingPolicyProto policy) {
|
||||
return new ErasureCodingPolicy(policy.getName(),
|
||||
convertECSchema(policy.getSchema()),
|
||||
policy.getCellSize());
|
||||
policy.getCellSize(), (byte) policy.getId());
|
||||
}
|
||||
|
||||
public static ErasureCodingPolicyProto convertErasureCodingPolicy(
|
||||
|
@ -2496,7 +2496,8 @@ public class PBHelperClient {
|
|||
.newBuilder()
|
||||
.setName(policy.getName())
|
||||
.setSchema(convertECSchema(policy.getSchema()))
|
||||
.setCellSize(policy.getCellSize());
|
||||
.setCellSize(policy.getCellSize())
|
||||
.setId(policy.getId());
|
||||
return builder.build();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -337,6 +337,7 @@ message ErasureCodingPolicyProto {
|
|||
required string name = 1;
|
||||
required ECSchemaProto schema = 2;
|
||||
required uint32 cellSize = 3;
|
||||
required uint32 id = 4; // Actually a byte - only 8 bits used
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -208,7 +208,7 @@ class UnderReplicatedBlocks implements Iterable<BlockInfo> {
|
|||
// highest risk of loss, highest priority
|
||||
return QUEUE_HIGHEST_PRIORITY;
|
||||
} else if ((curReplicas - dataBlkNum) * 3 < parityBlkNum + 1) {
|
||||
// can only afford one replica loss
|
||||
// there is less than a third as many blocks as requested;
|
||||
// this is considered very under-replicated
|
||||
return QUEUE_VERY_UNDER_REPLICATED;
|
||||
} else {
|
||||
|
|
|
@ -19,7 +19,7 @@ package org.apache.hadoop.hdfs.server.namenode;
|
|||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
||||
import org.apache.hadoop.io.erasurecode.ECSchema;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.TreeMap;
|
||||
|
@ -37,33 +37,29 @@ public final class ErasureCodingPolicyManager {
|
|||
/**
|
||||
* TODO: HDFS-8095
|
||||
*/
|
||||
private static final int DEFAULT_DATA_BLOCKS = 6;
|
||||
private static final int DEFAULT_PARITY_BLOCKS = 3;
|
||||
private static final int DEFAULT_CELLSIZE = 64 * 1024;
|
||||
private static final String DEFAULT_CODEC_NAME = "rs";
|
||||
private static final String DEFAULT_POLICY_NAME = "RS-6-3-64k";
|
||||
private static final ECSchema SYS_DEFAULT_SCHEMA = new ECSchema(
|
||||
DEFAULT_CODEC_NAME, DEFAULT_DATA_BLOCKS, DEFAULT_PARITY_BLOCKS);
|
||||
private static final ErasureCodingPolicy SYS_DEFAULT_POLICY =
|
||||
new ErasureCodingPolicy(DEFAULT_POLICY_NAME, SYS_DEFAULT_SCHEMA,
|
||||
DEFAULT_CELLSIZE);
|
||||
private static final ErasureCodingPolicy SYS_POLICY1 =
|
||||
new ErasureCodingPolicy(HdfsConstants.RS_6_3_SCHEMA, DEFAULT_CELLSIZE,
|
||||
HdfsConstants.RS_6_3_POLICY_ID);
|
||||
private static final ErasureCodingPolicy SYS_POLICY2 =
|
||||
new ErasureCodingPolicy(HdfsConstants.RS_3_2_SCHEMA, DEFAULT_CELLSIZE,
|
||||
HdfsConstants.RS_3_2_POLICY_ID);
|
||||
|
||||
//We may add more later.
|
||||
private static ErasureCodingPolicy[] SYS_POLICY = new ErasureCodingPolicy[] {
|
||||
SYS_DEFAULT_POLICY
|
||||
};
|
||||
private static final ErasureCodingPolicy[] SYS_POLICIES =
|
||||
new ErasureCodingPolicy[]{SYS_POLICY1, SYS_POLICY2};
|
||||
|
||||
/**
|
||||
* All active policies maintained in NN memory for fast querying,
|
||||
* identified and sorted by its name.
|
||||
*/
|
||||
private final Map<String, ErasureCodingPolicy> activePolicies;
|
||||
private final Map<String, ErasureCodingPolicy> activePoliciesByName;
|
||||
|
||||
ErasureCodingPolicyManager() {
|
||||
|
||||
this.activePolicies = new TreeMap<>();
|
||||
for (ErasureCodingPolicy policy : SYS_POLICY) {
|
||||
activePolicies.put(policy.getName(), policy);
|
||||
this.activePoliciesByName = new TreeMap<>();
|
||||
for (ErasureCodingPolicy policy : SYS_POLICIES) {
|
||||
activePoliciesByName.put(policy.getName(), policy);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -77,8 +73,8 @@ public final class ErasureCodingPolicyManager {
|
|||
* Get system defined policies.
|
||||
* @return system policies
|
||||
*/
|
||||
public static ErasureCodingPolicy[] getSystemPolices() {
|
||||
return SYS_POLICY;
|
||||
public static ErasureCodingPolicy[] getSystemPolicies() {
|
||||
return SYS_POLICIES;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -87,7 +83,8 @@ public final class ErasureCodingPolicyManager {
|
|||
* @return ecPolicy
|
||||
*/
|
||||
public static ErasureCodingPolicy getSystemDefaultPolicy() {
|
||||
return SYS_DEFAULT_POLICY;
|
||||
// make this configurable?
|
||||
return SYS_POLICY1;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -95,21 +92,34 @@ public final class ErasureCodingPolicyManager {
|
|||
* @return all policies
|
||||
*/
|
||||
public ErasureCodingPolicy[] getPolicies() {
|
||||
ErasureCodingPolicy[] results = new ErasureCodingPolicy[activePolicies.size()];
|
||||
return activePolicies.values().toArray(results);
|
||||
ErasureCodingPolicy[] results =
|
||||
new ErasureCodingPolicy[activePoliciesByName.size()];
|
||||
return activePoliciesByName.values().toArray(results);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the policy specified by the policy name.
|
||||
*/
|
||||
public ErasureCodingPolicy getPolicy(String name) {
|
||||
return activePolicies.get(name);
|
||||
public ErasureCodingPolicy getPolicyByName(String name) {
|
||||
return activePoliciesByName.get(name);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the policy specified by the policy ID.
|
||||
*/
|
||||
public ErasureCodingPolicy getPolicyByID(byte id) {
|
||||
for (ErasureCodingPolicy policy : activePoliciesByName.values()) {
|
||||
if (policy.getId() == id) {
|
||||
return policy;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Clear and clean up
|
||||
*/
|
||||
public void clear() {
|
||||
activePolicies.clear();
|
||||
activePoliciesByName.clear();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -248,12 +248,10 @@ final class FSDirErasureCodingOp {
|
|||
if (inode == null) {
|
||||
continue;
|
||||
}
|
||||
/**
|
||||
* TODO: lookup {@link ErasureCodingPolicyManager#getSystemPolices()}
|
||||
*/
|
||||
if (inode.isFile()) {
|
||||
return inode.asFile().getErasureCodingPolicyID() == 0 ?
|
||||
null : ErasureCodingPolicyManager.getSystemDefaultPolicy();
|
||||
byte id = inode.asFile().getErasureCodingPolicyID();
|
||||
return id < 0 ? null : fsd.getFSNamesystem().
|
||||
getErasureCodingPolicyManager().getPolicyByID(id);
|
||||
}
|
||||
// We don't allow setting EC policies on paths with a symlink. Thus
|
||||
// if a symlink is encountered, the dir shouldn't have EC policy.
|
||||
|
@ -269,7 +267,7 @@ final class FSDirErasureCodingOp {
|
|||
DataInputStream dIn = new DataInputStream(bIn);
|
||||
String ecPolicyName = WritableUtils.readString(dIn);
|
||||
return fsd.getFSNamesystem().getErasureCodingPolicyManager().
|
||||
getPolicy(ecPolicyName);
|
||||
getPolicyByName(ecPolicyName);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -497,16 +497,19 @@ class FSDirWriteFileOp {
|
|||
assert fsd.hasWriteLock();
|
||||
try {
|
||||
// check if the file has an EC policy
|
||||
final boolean isStriped = FSDirErasureCodingOp.hasErasureCodingPolicy(
|
||||
fsd.getFSNamesystem(), existing);
|
||||
ErasureCodingPolicy ecPolicy = FSDirErasureCodingOp.
|
||||
getErasureCodingPolicy(fsd.getFSNamesystem(), existing);
|
||||
if (ecPolicy != null) {
|
||||
replication = ecPolicy.getId();
|
||||
}
|
||||
if (underConstruction) {
|
||||
newNode = newINodeFile(id, permissions, modificationTime,
|
||||
modificationTime, replication, preferredBlockSize, storagePolicyId,
|
||||
isStriped);
|
||||
ecPolicy != null);
|
||||
newNode.toUnderConstruction(clientName, clientMachine);
|
||||
} else {
|
||||
newNode = newINodeFile(id, permissions, modificationTime, atime,
|
||||
replication, preferredBlockSize, storagePolicyId, isStriped);
|
||||
replication, preferredBlockSize, storagePolicyId, ecPolicy != null);
|
||||
}
|
||||
newNode.setLocalName(localName);
|
||||
INodesInPath iip = fsd.addINode(existing, newNode);
|
||||
|
@ -595,10 +598,13 @@ class FSDirWriteFileOp {
|
|||
INodesInPath newiip;
|
||||
fsd.writeLock();
|
||||
try {
|
||||
final boolean isStriped = FSDirErasureCodingOp.hasErasureCodingPolicy(
|
||||
fsd.getFSNamesystem(), existing);
|
||||
ErasureCodingPolicy ecPolicy = FSDirErasureCodingOp.
|
||||
getErasureCodingPolicy(fsd.getFSNamesystem(), existing);
|
||||
if (ecPolicy != null) {
|
||||
replication = ecPolicy.getId();
|
||||
}
|
||||
INodeFile newNode = newINodeFile(fsd.allocateNewInodeId(), permissions,
|
||||
modTime, modTime, replication, preferredBlockSize, isStriped);
|
||||
modTime, modTime, replication, preferredBlockSize, ecPolicy != null);
|
||||
newNode.setLocalName(localName.getBytes(Charsets.UTF_8));
|
||||
newNode.toUnderConstruction(clientName, clientMachine);
|
||||
newiip = fsd.addINode(existing, newNode);
|
||||
|
|
|
@ -82,24 +82,57 @@ public class INodeFile extends INodeWithAdditionalFields
|
|||
|
||||
/**
|
||||
* Bit format:
|
||||
* [4-bit storagePolicyID][1-bit isStriped]
|
||||
* [11-bit replication][48-bit preferredBlockSize]
|
||||
* [4-bit storagePolicyID][12-bit BLOCK_LAYOUT_AND_REDUNDANCY]
|
||||
* [48-bit preferredBlockSize]
|
||||
*
|
||||
* BLOCK_LAYOUT_AND_REDUNDANCY contains 12 bits and describes the layout and
|
||||
* redundancy of a block. We use the highest 1 bit to determine whether the
|
||||
* block is replica or erasure coded. For replica blocks, the tail 11 bits
|
||||
* stores the replication factor. For erasure coded blocks, the tail 11 bits
|
||||
* stores the EC policy ID, and in the future, we may further divide these
|
||||
* 11 bits to store both the EC policy ID and replication factor for erasure
|
||||
* coded blocks. The layout of this section is demonstrated as below.
|
||||
* +---------------+-------------------------------+
|
||||
* | 1 bit | 11 bit |
|
||||
* +---------------+-------------------------------+
|
||||
* | Replica or EC |Replica factor or EC policy ID |
|
||||
* +---------------+-------------------------------+
|
||||
*
|
||||
* BLOCK_LAYOUT_AND_REDUNDANCY format for replicated block:
|
||||
* 0 [11-bit replication]
|
||||
*
|
||||
* BLOCK_LAYOUT_AND_REDUNDANCY format for striped block:
|
||||
* 1 [11-bit ErasureCodingPolicy ID]
|
||||
*/
|
||||
enum HeaderFormat {
|
||||
PREFERRED_BLOCK_SIZE(null, 48, 1),
|
||||
REPLICATION(PREFERRED_BLOCK_SIZE.BITS, 11, 0),
|
||||
IS_STRIPED(REPLICATION.BITS, 1, 0),
|
||||
STORAGE_POLICY_ID(IS_STRIPED.BITS, BlockStoragePolicySuite.ID_BIT_LENGTH,
|
||||
0);
|
||||
BLOCK_LAYOUT_AND_REDUNDANCY(PREFERRED_BLOCK_SIZE.BITS,
|
||||
HeaderFormat.LAYOUT_BIT_WIDTH + 11, 0),
|
||||
STORAGE_POLICY_ID(BLOCK_LAYOUT_AND_REDUNDANCY.BITS,
|
||||
BlockStoragePolicySuite.ID_BIT_LENGTH, 0);
|
||||
|
||||
private final LongBitFormat BITS;
|
||||
|
||||
/**
|
||||
* Number of bits used to encode block layout type.
|
||||
* Different types can be replica or EC
|
||||
*/
|
||||
private static final int LAYOUT_BIT_WIDTH = 1;
|
||||
|
||||
private static final int MAX_REDUNDANCY = (1 << 11) - 1;
|
||||
|
||||
HeaderFormat(LongBitFormat previous, int length, long min) {
|
||||
BITS = new LongBitFormat(name(), previous, length, min);
|
||||
}
|
||||
|
||||
static short getReplication(long header) {
|
||||
return (short)REPLICATION.BITS.retrieve(header);
|
||||
long layoutRedundancy = BLOCK_LAYOUT_AND_REDUNDANCY.BITS.retrieve(header);
|
||||
return (short) (layoutRedundancy & MAX_REDUNDANCY);
|
||||
}
|
||||
|
||||
static byte getECPolicyID(long header) {
|
||||
long layoutRedundancy = BLOCK_LAYOUT_AND_REDUNDANCY.BITS.retrieve(header);
|
||||
return (byte) (layoutRedundancy & MAX_REDUNDANCY);
|
||||
}
|
||||
|
||||
static long getPreferredBlockSize(long header) {
|
||||
|
@ -111,26 +144,27 @@ public class INodeFile extends INodeWithAdditionalFields
|
|||
}
|
||||
|
||||
static boolean isStriped(long header) {
|
||||
long isStriped = IS_STRIPED.BITS.retrieve(header);
|
||||
Preconditions.checkState(isStriped == 0 || isStriped == 1);
|
||||
return isStriped == 1;
|
||||
long layoutRedundancy = BLOCK_LAYOUT_AND_REDUNDANCY.BITS.retrieve(header);
|
||||
return (layoutRedundancy & (1 << 11)) != 0;
|
||||
}
|
||||
|
||||
static long toLong(long preferredBlockSize, short replication,
|
||||
boolean isStriped, byte storagePolicyID) {
|
||||
Preconditions.checkArgument(replication >= 0 &&
|
||||
replication <= MAX_REDUNDANCY);
|
||||
long h = 0;
|
||||
if (preferredBlockSize == 0) {
|
||||
preferredBlockSize = PREFERRED_BLOCK_SIZE.BITS.getMin();
|
||||
}
|
||||
h = PREFERRED_BLOCK_SIZE.BITS.combine(preferredBlockSize, h);
|
||||
// Replication factor for striped files is zero
|
||||
// For erasure coded files, replication is used to store ec policy id
|
||||
// TODO: this is hacky. Add some utility to generate the layoutRedundancy
|
||||
long layoutRedundancy = 0;
|
||||
if (isStriped) {
|
||||
h = REPLICATION.BITS.combine(0L, h);
|
||||
h = IS_STRIPED.BITS.combine(1L, h);
|
||||
} else {
|
||||
h = REPLICATION.BITS.combine(replication, h);
|
||||
h = IS_STRIPED.BITS.combine(0L, h);
|
||||
layoutRedundancy |= 1 << 11;
|
||||
}
|
||||
layoutRedundancy |= replication;
|
||||
h = BLOCK_LAYOUT_AND_REDUNDANCY.BITS.combine(layoutRedundancy, h);
|
||||
h = STORAGE_POLICY_ID.BITS.combine(storagePolicyID, h);
|
||||
return h;
|
||||
}
|
||||
|
@ -401,9 +435,11 @@ public class INodeFile extends INodeWithAdditionalFields
|
|||
return HeaderFormat.getReplication(header);
|
||||
}
|
||||
|
||||
/** The same as getFileReplication(null). */
|
||||
/**
|
||||
* The same as getFileReplication(null).
|
||||
* For erasure coded files, this returns the EC policy ID.
|
||||
* */
|
||||
@Override // INodeFileAttributes
|
||||
// TODO properly handle striped files
|
||||
public final short getFileReplication() {
|
||||
return getFileReplication(CURRENT_STATE_ID);
|
||||
}
|
||||
|
@ -429,7 +465,12 @@ public class INodeFile extends INodeWithAdditionalFields
|
|||
|
||||
/** Set the replication factor of this file. */
|
||||
private void setFileReplication(short replication) {
|
||||
header = HeaderFormat.REPLICATION.BITS.combine(replication, header);
|
||||
long layoutRedundancy =
|
||||
HeaderFormat.BLOCK_LAYOUT_AND_REDUNDANCY.BITS.retrieve(header);
|
||||
layoutRedundancy = (layoutRedundancy &
|
||||
~HeaderFormat.MAX_REDUNDANCY) | replication;
|
||||
header = HeaderFormat.BLOCK_LAYOUT_AND_REDUNDANCY.BITS.
|
||||
combine(layoutRedundancy, header);
|
||||
}
|
||||
|
||||
/** Set the replication factor of this file. */
|
||||
|
@ -474,16 +515,16 @@ public class INodeFile extends INodeWithAdditionalFields
|
|||
|
||||
|
||||
/**
|
||||
* @return The ID of the erasure coding policy on the file. 0 represents no
|
||||
* EC policy (file is in contiguous format). 1 represents the system
|
||||
* default EC policy:
|
||||
* {@link ErasureCodingPolicyManager#SYS_DEFAULT_POLICY}.
|
||||
* TODO: support more policies by reusing {@link HeaderFormat#REPLICATION}.
|
||||
* @return The ID of the erasure coding policy on the file. -1 represents no
|
||||
* EC policy.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
@Override
|
||||
public byte getErasureCodingPolicyID() {
|
||||
return isStriped() ? (byte)1 : (byte)0;
|
||||
if (isStriped()) {
|
||||
return HeaderFormat.getECPolicyID(header);
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -27,26 +27,26 @@ import org.apache.hadoop.hdfs.server.namenode.INodeFile.HeaderFormat;
|
|||
@InterfaceAudience.Private
|
||||
public interface INodeFileAttributes extends INodeAttributes {
|
||||
/** @return the file replication. */
|
||||
public short getFileReplication();
|
||||
short getFileReplication();
|
||||
|
||||
/** @return whether the file is striped (instead of contiguous) */
|
||||
public boolean isStriped();
|
||||
boolean isStriped();
|
||||
|
||||
/** @return whether the file is striped (instead of contiguous) */
|
||||
public byte getErasureCodingPolicyID();
|
||||
/** @return the ID of the ErasureCodingPolicy */
|
||||
byte getErasureCodingPolicyID();
|
||||
|
||||
/** @return preferred block size in bytes */
|
||||
public long getPreferredBlockSize();
|
||||
long getPreferredBlockSize();
|
||||
|
||||
/** @return the header as a long. */
|
||||
public long getHeaderLong();
|
||||
long getHeaderLong();
|
||||
|
||||
public boolean metadataEquals(INodeFileAttributes other);
|
||||
boolean metadataEquals(INodeFileAttributes other);
|
||||
|
||||
public byte getLocalStoragePolicyID();
|
||||
byte getLocalStoragePolicyID();
|
||||
|
||||
/** A copy of the inode file attributes */
|
||||
public static class SnapshotCopy extends INodeAttributes.SnapshotCopy
|
||||
static class SnapshotCopy extends INodeAttributes.SnapshotCopy
|
||||
implements INodeFileAttributes {
|
||||
private final long header;
|
||||
|
||||
|
@ -82,7 +82,10 @@ public interface INodeFileAttributes extends INodeAttributes {
|
|||
|
||||
@Override
|
||||
public byte getErasureCodingPolicyID() {
|
||||
return isStriped() ? (byte)1 : (byte)0;
|
||||
if (isStriped()) {
|
||||
return HeaderFormat.getECPolicyID(header);
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -35,6 +35,7 @@ import org.junit.Test;
|
|||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
|
||||
import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains;
|
||||
|
@ -225,8 +226,11 @@ public class TestErasureCodingPolicies {
|
|||
fs.create(fooFile, FsPermission.getFileDefault(), true,
|
||||
conf.getInt(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096),
|
||||
(short)0, fs.getDefaultBlockSize(fooFile), null);
|
||||
ErasureCodingPolicy policy = fs.getErasureCodingPolicy(fooFile);
|
||||
// set replication should be a no-op
|
||||
fs.setReplication(fooFile, (short) 3);
|
||||
// should preserve the policy after set replication
|
||||
assertEquals(policy, fs.getErasureCodingPolicy(fooFile));
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
|
@ -247,9 +251,10 @@ public class TestErasureCodingPolicies {
|
|||
|
||||
@Test(timeout = 60000)
|
||||
public void testGetErasureCodingPolicy() throws Exception {
|
||||
ErasureCodingPolicy[] sysECPolicies = ErasureCodingPolicyManager.getSystemPolices();
|
||||
assertTrue("System ecPolicies should be of only 1 for now",
|
||||
sysECPolicies.length == 1);
|
||||
ErasureCodingPolicy[] sysECPolicies =
|
||||
ErasureCodingPolicyManager.getSystemPolicies();
|
||||
assertTrue("System ecPolicies should exist",
|
||||
sysECPolicies.length > 0);
|
||||
|
||||
ErasureCodingPolicy usingECPolicy = sysECPolicies[0];
|
||||
String src = "/ec2";
|
||||
|
@ -281,7 +286,7 @@ public class TestErasureCodingPolicies {
|
|||
String policyName = "RS-4-2-128k";
|
||||
int cellSize = 128 * 1024;
|
||||
ErasureCodingPolicy ecPolicy=
|
||||
new ErasureCodingPolicy(policyName,rsSchema,cellSize);
|
||||
new ErasureCodingPolicy(policyName, rsSchema, cellSize, (byte) -1);
|
||||
String src = "/ecDir4-2";
|
||||
final Path ecDir = new Path(src);
|
||||
try {
|
||||
|
@ -298,16 +303,11 @@ public class TestErasureCodingPolicies {
|
|||
@Test(timeout = 60000)
|
||||
public void testGetAllErasureCodingPolicies() throws Exception {
|
||||
ErasureCodingPolicy[] sysECPolicies = ErasureCodingPolicyManager
|
||||
.getSystemPolices();
|
||||
assertTrue("System ecPolicies should be of only 1 for now",
|
||||
sysECPolicies.length == 1);
|
||||
|
||||
.getSystemPolicies();
|
||||
Collection<ErasureCodingPolicy> allECPolicies = fs
|
||||
.getAllErasureCodingPolicies();
|
||||
assertTrue("All ecPolicies should be of only 1 for now",
|
||||
allECPolicies.size() == 1);
|
||||
assertEquals("Erasure coding policy mismatches",
|
||||
sysECPolicies[0], allECPolicies.iterator().next());
|
||||
assertTrue("All system policies should be active",
|
||||
allECPolicies.containsAll(Arrays.asList(sysECPolicies)));
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
|
@ -329,4 +329,23 @@ public class TestErasureCodingPolicies {
|
|||
assertExceptionContains("Path not found: " + path, e);
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testMultiplePoliciesCoExist() throws Exception {
|
||||
ErasureCodingPolicy[] sysPolicies =
|
||||
ErasureCodingPolicyManager.getSystemPolicies();
|
||||
if (sysPolicies.length > 1) {
|
||||
for (ErasureCodingPolicy policy : sysPolicies) {
|
||||
Path dir = new Path("/policy_" + policy.getId());
|
||||
fs.mkdir(dir, FsPermission.getDefault());
|
||||
fs.setErasureCodingPolicy(dir, policy);
|
||||
Path file = new Path(dir, "child");
|
||||
fs.create(file).close();
|
||||
assertEquals(policy, fs.getErasureCodingPolicy(file));
|
||||
assertEquals(policy, fs.getErasureCodingPolicy(dir));
|
||||
INode iNode = namesystem.getFSDirectory().getINode(file.toString());
|
||||
assertEquals(policy.getId(), iNode.asFile().getFileReplication());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -58,10 +58,35 @@ public class TestReconstructStripedBlocksWithRackAwareness {
|
|||
GenericTestUtils.setLogLevel(BlockManager.LOG, Level.ALL);
|
||||
}
|
||||
|
||||
private static final String[] hosts = new String[]{"host1", "host2", "host3",
|
||||
"host4", "host5", "host6", "host7", "host8", "host9", "host10"};
|
||||
private static final String[] racks = new String[]{"/r1", "/r1", "/r2", "/r2",
|
||||
"/r3", "/r3", "/r4", "/r4", "/r5", "/r6"};
|
||||
private static final String[] hosts = getHosts();
|
||||
private static final String[] racks = getRacks();
|
||||
|
||||
private static String[] getHosts() {
|
||||
String[] hosts = new String[NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS + 1];
|
||||
for (int i = 0; i < hosts.length; i++) {
|
||||
hosts[i] = "host" + (i + 1);
|
||||
}
|
||||
return hosts;
|
||||
}
|
||||
|
||||
private static String[] getRacks() {
|
||||
String[] racks = new String[NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS + 1];
|
||||
int numHostEachRack = (NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS - 1) /
|
||||
(NUM_DATA_BLOCKS - 1) + 1;
|
||||
int j = 0;
|
||||
// we have NUM_DATA_BLOCKS racks
|
||||
for (int i = 1; i <= NUM_DATA_BLOCKS; i++) {
|
||||
if (j == racks.length - 1) {
|
||||
assert i == NUM_DATA_BLOCKS;
|
||||
racks[j++] = "/r" + i;
|
||||
} else {
|
||||
for (int k = 0; k < numHostEachRack && j < racks.length - 1; k++) {
|
||||
racks[j++] = "/r" + i;
|
||||
}
|
||||
}
|
||||
}
|
||||
return racks;
|
||||
}
|
||||
|
||||
private MiniDFSCluster cluster;
|
||||
private DistributedFileSystem fs;
|
||||
|
@ -118,7 +143,8 @@ public class TestReconstructStripedBlocksWithRackAwareness {
|
|||
*/
|
||||
@Test
|
||||
public void testReconstructForNotEnoughRacks() throws Exception {
|
||||
MiniDFSCluster.DataNodeProperties host10 = stopDataNode("host10");
|
||||
MiniDFSCluster.DataNodeProperties lastHost = stopDataNode(
|
||||
hosts[hosts.length - 1]);
|
||||
|
||||
final Path file = new Path("/foo");
|
||||
// the file's block is in 9 dn but 5 racks
|
||||
|
@ -135,16 +161,16 @@ public class TestReconstructStripedBlocksWithRackAwareness {
|
|||
for (DatanodeStorageInfo storage : blockInfo.storages) {
|
||||
rackSet.add(storage.getDatanodeDescriptor().getNetworkLocation());
|
||||
}
|
||||
Assert.assertEquals(5, rackSet.size());
|
||||
Assert.assertEquals(NUM_DATA_BLOCKS - 1, rackSet.size());
|
||||
|
||||
// restart the stopped datanode
|
||||
cluster.restartDataNode(host10);
|
||||
cluster.restartDataNode(lastHost);
|
||||
cluster.waitActive();
|
||||
|
||||
// make sure we have 6 racks again
|
||||
NetworkTopology topology = bm.getDatanodeManager().getNetworkTopology();
|
||||
Assert.assertEquals(hosts.length, topology.getNumOfLeaves());
|
||||
Assert.assertEquals(6, topology.getNumOfRacks());
|
||||
Assert.assertEquals(NUM_DATA_BLOCKS, topology.getNumOfRacks());
|
||||
|
||||
// pause all the heartbeats
|
||||
for (DataNode dn : cluster.getDataNodes()) {
|
||||
|
@ -180,7 +206,8 @@ public class TestReconstructStripedBlocksWithRackAwareness {
|
|||
|
||||
@Test
|
||||
public void testChooseExcessReplicasToDelete() throws Exception {
|
||||
MiniDFSCluster.DataNodeProperties host10 = stopDataNode("host10");
|
||||
MiniDFSCluster.DataNodeProperties lastHost = stopDataNode(
|
||||
hosts[hosts.length - 1]);
|
||||
|
||||
final Path file = new Path("/foo");
|
||||
DFSTestUtil.createFile(fs, file,
|
||||
|
@ -188,8 +215,8 @@ public class TestReconstructStripedBlocksWithRackAwareness {
|
|||
|
||||
// stop host1
|
||||
MiniDFSCluster.DataNodeProperties host1 = stopDataNode("host1");
|
||||
// bring host10 back
|
||||
cluster.restartDataNode(host10);
|
||||
// bring last host back
|
||||
cluster.restartDataNode(lastHost);
|
||||
cluster.waitActive();
|
||||
|
||||
// wait for reconstruction to finish
|
||||
|
|
|
@ -777,8 +777,9 @@ public class TestBlockRecovery {
|
|||
|
||||
@Test
|
||||
public void testSafeLength() throws Exception {
|
||||
// hard coded policy to work with hard coded test suite
|
||||
ErasureCodingPolicy ecPolicy = ErasureCodingPolicyManager
|
||||
.getSystemDefaultPolicy();
|
||||
.getSystemPolicies()[0];
|
||||
RecoveringStripedBlock rBlockStriped = new RecoveringStripedBlock(rBlock,
|
||||
new byte[9], ecPolicy);
|
||||
BlockRecoveryWorker recoveryWorker = new BlockRecoveryWorker(dn);
|
||||
|
|
|
@ -151,7 +151,7 @@ public class TestFSImage {
|
|||
long mtime = 1426222916-3600;
|
||||
long atime = 1426222916;
|
||||
BlockInfoContiguous[] blks = new BlockInfoContiguous[0];
|
||||
short replication = 3;
|
||||
short replication = testECPolicy.getId();
|
||||
long preferredBlockSize = 128*1024*1024;
|
||||
INodeFile file = new INodeFile(id, name, permissionStatus, mtime, atime,
|
||||
blks, replication, preferredBlockSize, (byte) 0, true);
|
||||
|
|
|
@ -33,7 +33,6 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.StripedFileTestUtil;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
||||
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
||||
|
@ -61,8 +60,9 @@ public class TestStripedINodeFile {
|
|||
private final BlockStoragePolicy defaultPolicy =
|
||||
defaultSuite.getDefaultPolicy();
|
||||
|
||||
// use hard coded policy - see HDFS-9816
|
||||
private static final ErasureCodingPolicy testECPolicy
|
||||
= ErasureCodingPolicyManager.getSystemDefaultPolicy();
|
||||
= ErasureCodingPolicyManager.getSystemPolicies()[0];
|
||||
|
||||
@Rule
|
||||
public Timeout globalTimeout = new Timeout(300000);
|
||||
|
@ -228,8 +228,8 @@ public class TestStripedINodeFile {
|
|||
final Path contiguousFile = new Path(parentDir, "someFile");
|
||||
final DistributedFileSystem dfs;
|
||||
final Configuration conf = new Configuration();
|
||||
final short GROUP_SIZE = (short) (StripedFileTestUtil.NUM_DATA_BLOCKS
|
||||
+ StripedFileTestUtil.NUM_PARITY_BLOCKS);
|
||||
final short GROUP_SIZE = (short) (testECPolicy.getNumDataUnits() +
|
||||
testECPolicy.getNumParityUnits());
|
||||
conf.setInt(DFSConfigKeys.DFS_NAMENODE_MAX_XATTRS_PER_INODE_KEY, 2);
|
||||
|
||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(GROUP_SIZE)
|
||||
|
|
|
@ -79,15 +79,16 @@ import static org.junit.Assert.assertFalse;
|
|||
* TODO: test parity block logic
|
||||
*/
|
||||
public class TestStripedBlockUtil {
|
||||
private final short DATA_BLK_NUM = StripedFileTestUtil.NUM_DATA_BLOCKS;
|
||||
private final short PARITY_BLK_NUM = StripedFileTestUtil.NUM_PARITY_BLOCKS;
|
||||
// use hard coded policy - see HDFS-9816
|
||||
private final ErasureCodingPolicy EC_POLICY =
|
||||
ErasureCodingPolicyManager.getSystemPolicies()[0];
|
||||
private final short DATA_BLK_NUM = (short) EC_POLICY.getNumDataUnits();
|
||||
private final short PARITY_BLK_NUM = (short) EC_POLICY.getNumParityUnits();
|
||||
private final short BLK_GROUP_WIDTH = (short) (DATA_BLK_NUM + PARITY_BLK_NUM);
|
||||
private final int CELLSIZE = StripedFileTestUtil.BLOCK_STRIPED_CELL_SIZE;
|
||||
private final int FULL_STRIPE_SIZE = DATA_BLK_NUM * CELLSIZE;
|
||||
/** number of full stripes in a full block group */
|
||||
private final int BLK_GROUP_STRIPE_NUM = 16;
|
||||
private final ErasureCodingPolicy ECPOLICY = ErasureCodingPolicyManager.
|
||||
getSystemDefaultPolicy();
|
||||
private final Random random = new Random();
|
||||
|
||||
private int[] blockGroupSizes;
|
||||
|
@ -157,7 +158,7 @@ public class TestStripedBlockUtil {
|
|||
int done = 0;
|
||||
while (done < bgSize) {
|
||||
Preconditions.checkState(done % CELLSIZE == 0);
|
||||
StripingCell cell = new StripingCell(ECPOLICY, CELLSIZE, done / CELLSIZE, 0);
|
||||
StripingCell cell = new StripingCell(EC_POLICY, CELLSIZE, done / CELLSIZE, 0);
|
||||
int idxInStripe = cell.idxInStripe;
|
||||
int size = Math.min(CELLSIZE, bgSize - done);
|
||||
for (int i = 0; i < size; i++) {
|
||||
|
@ -250,7 +251,7 @@ public class TestStripedBlockUtil {
|
|||
if (brStart + brSize > bgSize) {
|
||||
continue;
|
||||
}
|
||||
AlignedStripe[] stripes = divideByteRangeIntoStripes(ECPOLICY,
|
||||
AlignedStripe[] stripes = divideByteRangeIntoStripes(EC_POLICY,
|
||||
CELLSIZE, blockGroup, brStart, brStart + brSize - 1, assembled, 0);
|
||||
|
||||
for (AlignedStripe stripe : stripes) {
|
||||
|
|
Loading…
Reference in New Issue