HDFS-7859. Erasure Coding: Persist erasure coding policies in NameNode. Contributed by Sammi Chen
This commit is contained in:
parent
e140489147
commit
02c8807312
|
@ -18,6 +18,7 @@
|
|||
package org.apache.hadoop.hdfs.server.namenode;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.hadoop.HadoopIllegalArgumentException;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -289,6 +290,11 @@ public final class ErasureCodingPolicyManager {
|
|||
}
|
||||
ecPolicy.setState(ErasureCodingPolicyState.REMOVED);
|
||||
LOG.info("Remove erasure coding policy " + name);
|
||||
|
||||
/*
|
||||
* TODO HDFS-12405 postpone the delete removed policy to Namenode restart
|
||||
* time.
|
||||
* */
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
|
@ -338,4 +344,36 @@ public final class ErasureCodingPolicyManager {
|
|||
enabledPoliciesByName.values().toArray(new ErasureCodingPolicy[0]);
|
||||
LOG.info("Enable the erasure coding policy " + name);
|
||||
}
|
||||
|
||||
/**
|
||||
* Load an erasure coding policy into erasure coding manager.
|
||||
*/
|
||||
private void loadPolicy(ErasureCodingPolicy policy) {
|
||||
if (!CodecUtil.hasCodec(policy.getCodecName()) ||
|
||||
policy.getCellSize() > maxCellSize) {
|
||||
// If policy is not supported in current system, set the policy state to
|
||||
// DISABLED;
|
||||
policy.setState(ErasureCodingPolicyState.DISABLED);
|
||||
}
|
||||
|
||||
this.policiesByName.put(policy.getName(), policy);
|
||||
this.policiesByID.put(policy.getId(), policy);
|
||||
if (policy.isEnabled()) {
|
||||
enablePolicy(policy.getName());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Reload erasure coding policies from fsImage.
|
||||
*
|
||||
* @param ecPolicies contains ErasureCodingPolicy list
|
||||
*
|
||||
*/
|
||||
public synchronized void loadPolicies(List<ErasureCodingPolicy> ecPolicies) {
|
||||
Preconditions.checkNotNull(ecPolicies);
|
||||
for (ErasureCodingPolicy p : ecPolicies) {
|
||||
loadPolicy(p);
|
||||
}
|
||||
allPolicies = policiesByName.values().toArray(new ErasureCodingPolicy[0]);
|
||||
}
|
||||
}
|
|
@ -334,10 +334,10 @@ public final class FSImageFormatPBINode {
|
|||
boolean isStriped = f.hasErasureCodingPolicyID();
|
||||
assert ((!isStriped) || (isStriped && !f.hasReplication()));
|
||||
Short replication = (!isStriped ? (short) f.getReplication() : null);
|
||||
Byte ecPolicyID = (isStriped ?
|
||||
(byte) f.getErasureCodingPolicyID() : null);
|
||||
ErasureCodingPolicy ecPolicy = isStriped ?
|
||||
fsn.getErasureCodingPolicyManager().getByID(
|
||||
(byte) f.getErasureCodingPolicyID()) : null;
|
||||
Byte ecPolicyID = (isStriped ? ecPolicy.getId() : null);
|
||||
fsn.getErasureCodingPolicyManager().getByID(ecPolicyID) : null;
|
||||
|
||||
BlockInfo[] blocks = new BlockInfo[bp.size()];
|
||||
for (int i = 0; i < bp.size(); ++i) {
|
||||
|
|
|
@ -36,10 +36,13 @@ import java.security.MessageDigest;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
||||
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
|
||||
import org.apache.hadoop.io.compress.CompressionOutputStream;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -47,6 +50,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheDirectiveInfoProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CachePoolInfoProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ErasureCodingPolicyProto;
|
||||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockIdManager;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
||||
|
@ -55,6 +59,7 @@ import org.apache.hadoop.hdfs.server.namenode.FsImageProto.FileSummary;
|
|||
import org.apache.hadoop.hdfs.server.namenode.FsImageProto.NameSystemSection;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FsImageProto.SecretManagerSection;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FsImageProto.StringTableSection;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FsImageProto.ErasureCodingSection;
|
||||
import org.apache.hadoop.hdfs.server.namenode.snapshot.FSImageFormatPBSnapshot;
|
||||
import org.apache.hadoop.hdfs.server.namenode.startupprogress.Phase;
|
||||
import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress;
|
||||
|
@ -287,6 +292,12 @@ public final class FSImageFormatProtobuf {
|
|||
prog.endStep(Phase.LOADING_FSIMAGE, step);
|
||||
}
|
||||
break;
|
||||
case ERASURE_CODING:
|
||||
Step step = new Step(StepType.ERASURE_CODING_POLICIES);
|
||||
prog.beginStep(Phase.LOADING_FSIMAGE, step);
|
||||
loadErasureCodingSection(in);
|
||||
prog.endStep(Phase.LOADING_FSIMAGE, step);
|
||||
break;
|
||||
default:
|
||||
LOG.warn("Unrecognized section {}", n);
|
||||
break;
|
||||
|
@ -366,6 +377,17 @@ public final class FSImageFormatProtobuf {
|
|||
new CacheManager.PersistState(s, pools, directives));
|
||||
}
|
||||
|
||||
private void loadErasureCodingSection(InputStream in)
|
||||
throws IOException {
|
||||
ErasureCodingSection s = ErasureCodingSection.parseDelimitedFrom(in);
|
||||
List<ErasureCodingPolicy> ecPolicies = Lists
|
||||
.newArrayListWithCapacity(s.getPoliciesCount());
|
||||
for (int i = 0; i < s.getPoliciesCount(); ++i) {
|
||||
ecPolicies.add(PBHelperClient.convertErasureCodingPolicy(
|
||||
s.getPolicies(i)));
|
||||
}
|
||||
fsn.getErasureCodingPolicyManager().loadPolicies(ecPolicies);
|
||||
}
|
||||
}
|
||||
|
||||
public static final class Saver {
|
||||
|
@ -497,7 +519,13 @@ public final class FSImageFormatProtobuf {
|
|||
// depends on this behavior.
|
||||
context.checkCancelled();
|
||||
|
||||
Step step = new Step(StepType.INODES, filePath);
|
||||
// Erasure coding policies should be saved before inodes
|
||||
Step step = new Step(StepType.ERASURE_CODING_POLICIES, filePath);
|
||||
prog.beginStep(Phase.SAVING_CHECKPOINT, step);
|
||||
saveErasureCodingSection(b);
|
||||
prog.endStep(Phase.SAVING_CHECKPOINT, step);
|
||||
|
||||
step = new Step(StepType.INODES, filePath);
|
||||
prog.beginStep(Phase.SAVING_CHECKPOINT, step);
|
||||
saveInodes(b);
|
||||
saveSnapshots(b);
|
||||
|
@ -555,6 +583,23 @@ public final class FSImageFormatProtobuf {
|
|||
commitSection(summary, SectionName.CACHE_MANAGER);
|
||||
}
|
||||
|
||||
private void saveErasureCodingSection(
|
||||
FileSummary.Builder summary) throws IOException {
|
||||
final FSNamesystem fsn = context.getSourceNamesystem();
|
||||
ErasureCodingPolicy[] ecPolicies =
|
||||
fsn.getErasureCodingPolicyManager().getPolicies();
|
||||
ArrayList<ErasureCodingPolicyProto> ecPolicyProtoes =
|
||||
new ArrayList<ErasureCodingPolicyProto>();
|
||||
for (ErasureCodingPolicy p : ecPolicies) {
|
||||
ecPolicyProtoes.add(PBHelperClient.convertErasureCodingPolicy(p));
|
||||
}
|
||||
|
||||
ErasureCodingSection section = ErasureCodingSection.newBuilder().
|
||||
addAllPolicies(ecPolicyProtoes).build();
|
||||
section.writeDelimitedTo(sectionOutputStream);
|
||||
commitSection(summary, SectionName.ERASURE_CODING);
|
||||
}
|
||||
|
||||
private void saveNameSystemSection(FileSummary.Builder summary)
|
||||
throws IOException {
|
||||
final FSNamesystem fsn = context.getSourceNamesystem();
|
||||
|
@ -606,6 +651,7 @@ public final class FSImageFormatProtobuf {
|
|||
NS_INFO("NS_INFO"),
|
||||
STRING_TABLE("STRING_TABLE"),
|
||||
EXTENDED_ACL("EXTENDED_ACL"),
|
||||
ERASURE_CODING("ERASURE_CODING"),
|
||||
INODE("INODE"),
|
||||
INODE_REFERENCE("INODE_REFERENCE"),
|
||||
SNAPSHOT("SNAPSHOT"),
|
||||
|
|
|
@ -52,7 +52,12 @@ public enum StepType {
|
|||
/**
|
||||
* The namenode is performing an operation related to cache entries.
|
||||
*/
|
||||
CACHE_ENTRIES("CacheEntries", "cache entries");
|
||||
CACHE_ENTRIES("CacheEntries", "cache entries"),
|
||||
|
||||
/**
|
||||
* The namenode is performing an operation related to erasure coding policies.
|
||||
*/
|
||||
ERASURE_CODING_POLICIES("ErasureCodingPolicies", "erasure coding policies");
|
||||
|
||||
private final String name, description;
|
||||
|
||||
|
|
|
@ -346,3 +346,7 @@ message CacheManagerSection {
|
|||
// repeated CachePoolInfoProto pools
|
||||
// repeated CacheDirectiveInfoProto directives
|
||||
}
|
||||
|
||||
message ErasureCodingSection {
|
||||
repeated ErasureCodingPolicyProto policies = 1;
|
||||
}
|
|
@ -209,11 +209,6 @@ public class TestErasureCodingPolicies {
|
|||
cluster.restartNameNodes();
|
||||
cluster.waitActive();
|
||||
|
||||
// Only default policy should be enabled after restart
|
||||
Assert.assertEquals("Only default policy should be enabled after restart",
|
||||
1,
|
||||
ErasureCodingPolicyManager.getInstance().getEnabledPolicies().length);
|
||||
|
||||
// Already set directory-level policies should still be in effect
|
||||
Path disabledPolicy = new Path(dir1, "afterDisabled");
|
||||
Assert.assertEquals("Dir does not have policy set",
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
*/
|
||||
package org.apache.hadoop.hdfs.server.namenode;
|
||||
|
||||
import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
|
@ -34,7 +35,9 @@ import java.io.IOException;
|
|||
import java.util.EnumSet;
|
||||
|
||||
import org.apache.hadoop.hdfs.StripedFileTestUtil;
|
||||
import org.apache.hadoop.hdfs.protocol.AddECPolicyResponse;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyState;
|
||||
import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
|
||||
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
||||
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
|
||||
|
@ -43,6 +46,8 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
|
|||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockType;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
|
||||
import org.apache.hadoop.io.erasurecode.ECSchema;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.junit.Assert;
|
||||
|
||||
import org.apache.hadoop.fs.permission.PermissionStatus;
|
||||
|
@ -810,4 +815,150 @@ public class TestFSImage {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test persist and load erasure coding policies.
|
||||
*/
|
||||
@Test
|
||||
public void testSaveAndLoadErasureCodingPolicies() throws IOException{
|
||||
Configuration conf = new Configuration();
|
||||
final int blockSize = 16 * 1024 * 1024;
|
||||
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
|
||||
try (MiniDFSCluster cluster =
|
||||
new MiniDFSCluster.Builder(conf).numDataNodes(10).build()) {
|
||||
cluster.waitActive();
|
||||
DistributedFileSystem fs = cluster.getFileSystem();
|
||||
DFSTestUtil.enableAllECPolicies(fs);
|
||||
|
||||
// Save namespace and restart NameNode
|
||||
fs.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
|
||||
fs.saveNamespace();
|
||||
fs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
|
||||
|
||||
cluster.restartNameNodes();
|
||||
cluster.waitActive();
|
||||
|
||||
assertEquals("Erasure coding policy number should match",
|
||||
SystemErasureCodingPolicies.getPolicies().size(),
|
||||
ErasureCodingPolicyManager.getInstance().getPolicies().length);
|
||||
|
||||
// Add new erasure coding policy
|
||||
ECSchema newSchema = new ECSchema("rs", 5, 4);
|
||||
ErasureCodingPolicy newPolicy =
|
||||
new ErasureCodingPolicy(newSchema, 2 * 1024, (byte) 254);
|
||||
ErasureCodingPolicy[] policies = new ErasureCodingPolicy[]{newPolicy};
|
||||
AddECPolicyResponse[] ret = fs.addErasureCodingPolicies(policies);
|
||||
assertEquals(1, ret.length);
|
||||
assertEquals(true, ret[0].isSucceed());
|
||||
newPolicy = ret[0].getPolicy();
|
||||
|
||||
// Save namespace and restart NameNode
|
||||
fs.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
|
||||
fs.saveNamespace();
|
||||
fs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
|
||||
|
||||
cluster.restartNameNodes();
|
||||
cluster.waitActive();
|
||||
|
||||
assertEquals("Erasure coding policy number should match",
|
||||
SystemErasureCodingPolicies.getPolicies().size() + 1,
|
||||
ErasureCodingPolicyManager.getInstance().getPolicies().length);
|
||||
ErasureCodingPolicy ecPolicy =
|
||||
ErasureCodingPolicyManager.getInstance().getByID(newPolicy.getId());
|
||||
assertEquals("Newly added erasure coding policy is not found",
|
||||
newPolicy, ecPolicy);
|
||||
assertEquals(
|
||||
"Newly added erasure coding policy should be of disabled state",
|
||||
ErasureCodingPolicyState.DISABLED, ecPolicy.getState());
|
||||
|
||||
// Test enable/disable/remove user customized erasure coding policy
|
||||
testChangeErasureCodingPolicyState(cluster, blockSize, newPolicy);
|
||||
// Test enable/disable built-in erasure coding policy
|
||||
testChangeErasureCodingPolicyState(cluster, blockSize,
|
||||
SystemErasureCodingPolicies.getByID((byte) 1));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private void testChangeErasureCodingPolicyState(MiniDFSCluster cluster,
|
||||
int blockSize, ErasureCodingPolicy targetPolicy) throws IOException {
|
||||
DistributedFileSystem fs = cluster.getFileSystem();
|
||||
|
||||
// 1. Enable an erasure coding policy
|
||||
fs.enableErasureCodingPolicy(targetPolicy.getName());
|
||||
targetPolicy.setState(ErasureCodingPolicyState.ENABLED);
|
||||
// Create file, using the new policy
|
||||
final Path dirPath = new Path("/striped");
|
||||
final Path filePath = new Path(dirPath, "file");
|
||||
final int fileLength = blockSize * targetPolicy.getNumDataUnits();
|
||||
fs.mkdirs(dirPath);
|
||||
fs.setErasureCodingPolicy(dirPath, targetPolicy.getName());
|
||||
final byte[] bytes = StripedFileTestUtil.generateBytes(fileLength);
|
||||
DFSTestUtil.writeFile(fs, filePath, bytes);
|
||||
|
||||
|
||||
// Save namespace and restart NameNode
|
||||
fs.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
|
||||
fs.saveNamespace();
|
||||
fs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
|
||||
|
||||
cluster.restartNameNodes();
|
||||
cluster.waitActive();
|
||||
ErasureCodingPolicy ecPolicy =
|
||||
ErasureCodingPolicyManager.getInstance().getByID(targetPolicy.getId());
|
||||
assertEquals("The erasure coding policy is not found",
|
||||
targetPolicy, ecPolicy);
|
||||
assertEquals("The erasure coding policy should be of enabled state",
|
||||
ErasureCodingPolicyState.ENABLED, ecPolicy.getState());
|
||||
// Read file regardless of the erasure coding policy state
|
||||
DFSTestUtil.readFileAsBytes(fs, filePath);
|
||||
|
||||
// 2. Disable an erasure coding policy
|
||||
fs.disableErasureCodingPolicy(ecPolicy.getName());
|
||||
targetPolicy.setState(ErasureCodingPolicyState.DISABLED);
|
||||
// Save namespace and restart NameNode
|
||||
fs.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
|
||||
fs.saveNamespace();
|
||||
fs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
|
||||
|
||||
cluster.restartNameNodes();
|
||||
cluster.waitActive();
|
||||
ecPolicy =
|
||||
ErasureCodingPolicyManager.getInstance().getByID(targetPolicy.getId());
|
||||
assertEquals("The erasure coding policy is not found",
|
||||
targetPolicy, ecPolicy);
|
||||
assertEquals("The erasure coding policy should be of disabled state",
|
||||
ErasureCodingPolicyState.DISABLED, ecPolicy.getState());
|
||||
// Read file regardless of the erasure coding policy state
|
||||
DFSTestUtil.readFileAsBytes(fs, filePath);
|
||||
|
||||
// 3. Remove an erasure coding policy
|
||||
try {
|
||||
fs.removeErasureCodingPolicy(ecPolicy.getName());
|
||||
} catch (RemoteException e) {
|
||||
// built-in policy cannot been removed
|
||||
assertTrue("Built-in policy cannot be removed",
|
||||
ecPolicy.isSystemPolicy());
|
||||
assertExceptionContains("System erasure coding policy", e);
|
||||
return;
|
||||
}
|
||||
|
||||
targetPolicy.setState(ErasureCodingPolicyState.REMOVED);
|
||||
// Save namespace and restart NameNode
|
||||
fs.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
|
||||
fs.saveNamespace();
|
||||
fs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
|
||||
|
||||
cluster.restartNameNodes();
|
||||
cluster.waitActive();
|
||||
ecPolicy = ErasureCodingPolicyManager.getInstance().getByID(
|
||||
targetPolicy.getId());
|
||||
assertEquals("The erasure coding policy saved into and loaded from " +
|
||||
"fsImage is bad", targetPolicy, ecPolicy);
|
||||
assertEquals("The erasure coding policy should be of removed state",
|
||||
ErasureCodingPolicyState.REMOVED, ecPolicy.getState());
|
||||
// Read file regardless of the erasure coding policy state
|
||||
DFSTestUtil.readFileAsBytes(fs, filePath);
|
||||
fs.delete(dirPath, true);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue