HDFS-7859. Erasure Coding: Persist erasure coding policies in NameNode. Contributed by Sammi Chen

This commit is contained in:
Kai Zheng 2017-09-15 09:08:18 +08:00
parent 61cee3a0b9
commit ae8f55b932
7 changed files with 249 additions and 10 deletions

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.namenode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
@ -289,6 +290,11 @@ public final class ErasureCodingPolicyManager {
}
ecPolicy.setState(ErasureCodingPolicyState.REMOVED);
LOG.info("Remove erasure coding policy " + name);
/*
* TODO HDFS-12405 postpone the delete removed policy to Namenode restart
* time.
* */
}
@VisibleForTesting
@ -338,4 +344,36 @@ public final class ErasureCodingPolicyManager {
enabledPoliciesByName.values().toArray(new ErasureCodingPolicy[0]);
LOG.info("Enable the erasure coding policy " + name);
}
/**
* Load an erasure coding policy into erasure coding manager.
*/
private void loadPolicy(ErasureCodingPolicy policy) {
if (!CodecUtil.hasCodec(policy.getCodecName()) ||
policy.getCellSize() > maxCellSize) {
// If policy is not supported in current system, set the policy state to
// DISABLED;
policy.setState(ErasureCodingPolicyState.DISABLED);
}
this.policiesByName.put(policy.getName(), policy);
this.policiesByID.put(policy.getId(), policy);
if (policy.isEnabled()) {
enablePolicy(policy.getName());
}
}
/**
* Reload erasure coding policies from fsImage.
*
* @param ecPolicies contains ErasureCodingPolicy list
*
*/
public synchronized void loadPolicies(List<ErasureCodingPolicy> ecPolicies) {
Preconditions.checkNotNull(ecPolicies);
for (ErasureCodingPolicy p : ecPolicies) {
loadPolicy(p);
}
allPolicies = policiesByName.values().toArray(new ErasureCodingPolicy[0]);
}
}

View File

@ -334,10 +334,10 @@ public final class FSImageFormatPBINode {
boolean isStriped = f.hasErasureCodingPolicyID();
assert ((!isStriped) || (isStriped && !f.hasReplication()));
Short replication = (!isStriped ? (short) f.getReplication() : null);
Byte ecPolicyID = (isStriped ?
(byte) f.getErasureCodingPolicyID() : null);
ErasureCodingPolicy ecPolicy = isStriped ?
fsn.getErasureCodingPolicyManager().getByID(
(byte) f.getErasureCodingPolicyID()) : null;
Byte ecPolicyID = (isStriped ? ecPolicy.getId() : null);
fsn.getErasureCodingPolicyManager().getByID(ecPolicyID) : null;
BlockInfo[] blocks = new BlockInfo[bp.size()];
for (int i = 0; i < bp.size(); ++i) {

View File

@ -36,10 +36,13 @@ import java.security.MessageDigest;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -47,6 +50,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheDirectiveInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CachePoolInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ErasureCodingPolicyProto;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockIdManager;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
@ -55,6 +59,7 @@ import org.apache.hadoop.hdfs.server.namenode.FsImageProto.FileSummary;
import org.apache.hadoop.hdfs.server.namenode.FsImageProto.NameSystemSection;
import org.apache.hadoop.hdfs.server.namenode.FsImageProto.SecretManagerSection;
import org.apache.hadoop.hdfs.server.namenode.FsImageProto.StringTableSection;
import org.apache.hadoop.hdfs.server.namenode.FsImageProto.ErasureCodingSection;
import org.apache.hadoop.hdfs.server.namenode.snapshot.FSImageFormatPBSnapshot;
import org.apache.hadoop.hdfs.server.namenode.startupprogress.Phase;
import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress;
@ -287,6 +292,12 @@ public final class FSImageFormatProtobuf {
prog.endStep(Phase.LOADING_FSIMAGE, step);
}
break;
case ERASURE_CODING:
Step step = new Step(StepType.ERASURE_CODING_POLICIES);
prog.beginStep(Phase.LOADING_FSIMAGE, step);
loadErasureCodingSection(in);
prog.endStep(Phase.LOADING_FSIMAGE, step);
break;
default:
LOG.warn("Unrecognized section {}", n);
break;
@ -366,6 +377,17 @@ public final class FSImageFormatProtobuf {
new CacheManager.PersistState(s, pools, directives));
}
private void loadErasureCodingSection(InputStream in)
throws IOException {
ErasureCodingSection s = ErasureCodingSection.parseDelimitedFrom(in);
List<ErasureCodingPolicy> ecPolicies = Lists
.newArrayListWithCapacity(s.getPoliciesCount());
for (int i = 0; i < s.getPoliciesCount(); ++i) {
ecPolicies.add(PBHelperClient.convertErasureCodingPolicy(
s.getPolicies(i)));
}
fsn.getErasureCodingPolicyManager().loadPolicies(ecPolicies);
}
}
public static final class Saver {
@ -497,7 +519,13 @@ public final class FSImageFormatProtobuf {
// depends on this behavior.
context.checkCancelled();
Step step = new Step(StepType.INODES, filePath);
// Erasure coding policies should be saved before inodes
Step step = new Step(StepType.ERASURE_CODING_POLICIES, filePath);
prog.beginStep(Phase.SAVING_CHECKPOINT, step);
saveErasureCodingSection(b);
prog.endStep(Phase.SAVING_CHECKPOINT, step);
step = new Step(StepType.INODES, filePath);
prog.beginStep(Phase.SAVING_CHECKPOINT, step);
saveInodes(b);
saveSnapshots(b);
@ -555,6 +583,23 @@ public final class FSImageFormatProtobuf {
commitSection(summary, SectionName.CACHE_MANAGER);
}
private void saveErasureCodingSection(
FileSummary.Builder summary) throws IOException {
final FSNamesystem fsn = context.getSourceNamesystem();
ErasureCodingPolicy[] ecPolicies =
fsn.getErasureCodingPolicyManager().getPolicies();
ArrayList<ErasureCodingPolicyProto> ecPolicyProtoes =
new ArrayList<ErasureCodingPolicyProto>();
for (ErasureCodingPolicy p : ecPolicies) {
ecPolicyProtoes.add(PBHelperClient.convertErasureCodingPolicy(p));
}
ErasureCodingSection section = ErasureCodingSection.newBuilder().
addAllPolicies(ecPolicyProtoes).build();
section.writeDelimitedTo(sectionOutputStream);
commitSection(summary, SectionName.ERASURE_CODING);
}
private void saveNameSystemSection(FileSummary.Builder summary)
throws IOException {
final FSNamesystem fsn = context.getSourceNamesystem();
@ -606,6 +651,7 @@ public final class FSImageFormatProtobuf {
NS_INFO("NS_INFO"),
STRING_TABLE("STRING_TABLE"),
EXTENDED_ACL("EXTENDED_ACL"),
ERASURE_CODING("ERASURE_CODING"),
INODE("INODE"),
INODE_REFERENCE("INODE_REFERENCE"),
SNAPSHOT("SNAPSHOT"),

View File

@ -52,7 +52,12 @@ public enum StepType {
/**
* The namenode is performing an operation related to cache entries.
*/
CACHE_ENTRIES("CacheEntries", "cache entries");
CACHE_ENTRIES("CacheEntries", "cache entries"),
/**
* The namenode is performing an operation related to erasure coding policies.
*/
ERASURE_CODING_POLICIES("ErasureCodingPolicies", "erasure coding policies");
private final String name, description;

View File

@ -346,3 +346,7 @@ message CacheManagerSection {
// repeated CachePoolInfoProto pools
// repeated CacheDirectiveInfoProto directives
}
message ErasureCodingSection {
repeated ErasureCodingPolicyProto policies = 1;
}

View File

@ -209,11 +209,6 @@ public class TestErasureCodingPolicies {
cluster.restartNameNodes();
cluster.waitActive();
// Only default policy should be enabled after restart
Assert.assertEquals("Only default policy should be enabled after restart",
1,
ErasureCodingPolicyManager.getInstance().getEnabledPolicies().length);
// Already set directory-level policies should still be in effect
Path disabledPolicy = new Path(dir1, "afterDisabled");
Assert.assertEquals("Dir does not have policy set",

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
@ -34,7 +35,9 @@ import java.io.IOException;
import java.util.EnumSet;
import org.apache.hadoop.hdfs.StripedFileTestUtil;
import org.apache.hadoop.hdfs.protocol.AddECPolicyResponse;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyState;
import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
@ -43,6 +46,8 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
import org.apache.hadoop.hdfs.protocol.BlockType;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
import org.apache.hadoop.io.erasurecode.ECSchema;
import org.apache.hadoop.ipc.RemoteException;
import org.junit.Assert;
import org.apache.hadoop.fs.permission.PermissionStatus;
@ -810,4 +815,150 @@ public class TestFSImage {
}
}
}
/**
* Test persist and load erasure coding policies.
*/
@Test
public void testSaveAndLoadErasureCodingPolicies() throws IOException{
Configuration conf = new Configuration();
final int blockSize = 16 * 1024 * 1024;
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
try (MiniDFSCluster cluster =
new MiniDFSCluster.Builder(conf).numDataNodes(10).build()) {
cluster.waitActive();
DistributedFileSystem fs = cluster.getFileSystem();
DFSTestUtil.enableAllECPolicies(fs);
// Save namespace and restart NameNode
fs.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
fs.saveNamespace();
fs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
cluster.restartNameNodes();
cluster.waitActive();
assertEquals("Erasure coding policy number should match",
SystemErasureCodingPolicies.getPolicies().size(),
ErasureCodingPolicyManager.getInstance().getPolicies().length);
// Add new erasure coding policy
ECSchema newSchema = new ECSchema("rs", 5, 4);
ErasureCodingPolicy newPolicy =
new ErasureCodingPolicy(newSchema, 2 * 1024, (byte) 254);
ErasureCodingPolicy[] policies = new ErasureCodingPolicy[]{newPolicy};
AddECPolicyResponse[] ret = fs.addErasureCodingPolicies(policies);
assertEquals(1, ret.length);
assertEquals(true, ret[0].isSucceed());
newPolicy = ret[0].getPolicy();
// Save namespace and restart NameNode
fs.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
fs.saveNamespace();
fs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
cluster.restartNameNodes();
cluster.waitActive();
assertEquals("Erasure coding policy number should match",
SystemErasureCodingPolicies.getPolicies().size() + 1,
ErasureCodingPolicyManager.getInstance().getPolicies().length);
ErasureCodingPolicy ecPolicy =
ErasureCodingPolicyManager.getInstance().getByID(newPolicy.getId());
assertEquals("Newly added erasure coding policy is not found",
newPolicy, ecPolicy);
assertEquals(
"Newly added erasure coding policy should be of disabled state",
ErasureCodingPolicyState.DISABLED, ecPolicy.getState());
// Test enable/disable/remove user customized erasure coding policy
testChangeErasureCodingPolicyState(cluster, blockSize, newPolicy);
// Test enable/disable built-in erasure coding policy
testChangeErasureCodingPolicyState(cluster, blockSize,
SystemErasureCodingPolicies.getByID((byte) 1));
}
}
private void testChangeErasureCodingPolicyState(MiniDFSCluster cluster,
int blockSize, ErasureCodingPolicy targetPolicy) throws IOException {
DistributedFileSystem fs = cluster.getFileSystem();
// 1. Enable an erasure coding policy
fs.enableErasureCodingPolicy(targetPolicy.getName());
targetPolicy.setState(ErasureCodingPolicyState.ENABLED);
// Create file, using the new policy
final Path dirPath = new Path("/striped");
final Path filePath = new Path(dirPath, "file");
final int fileLength = blockSize * targetPolicy.getNumDataUnits();
fs.mkdirs(dirPath);
fs.setErasureCodingPolicy(dirPath, targetPolicy.getName());
final byte[] bytes = StripedFileTestUtil.generateBytes(fileLength);
DFSTestUtil.writeFile(fs, filePath, bytes);
// Save namespace and restart NameNode
fs.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
fs.saveNamespace();
fs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
cluster.restartNameNodes();
cluster.waitActive();
ErasureCodingPolicy ecPolicy =
ErasureCodingPolicyManager.getInstance().getByID(targetPolicy.getId());
assertEquals("The erasure coding policy is not found",
targetPolicy, ecPolicy);
assertEquals("The erasure coding policy should be of enabled state",
ErasureCodingPolicyState.ENABLED, ecPolicy.getState());
// Read file regardless of the erasure coding policy state
DFSTestUtil.readFileAsBytes(fs, filePath);
// 2. Disable an erasure coding policy
fs.disableErasureCodingPolicy(ecPolicy.getName());
targetPolicy.setState(ErasureCodingPolicyState.DISABLED);
// Save namespace and restart NameNode
fs.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
fs.saveNamespace();
fs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
cluster.restartNameNodes();
cluster.waitActive();
ecPolicy =
ErasureCodingPolicyManager.getInstance().getByID(targetPolicy.getId());
assertEquals("The erasure coding policy is not found",
targetPolicy, ecPolicy);
assertEquals("The erasure coding policy should be of disabled state",
ErasureCodingPolicyState.DISABLED, ecPolicy.getState());
// Read file regardless of the erasure coding policy state
DFSTestUtil.readFileAsBytes(fs, filePath);
// 3. Remove an erasure coding policy
try {
fs.removeErasureCodingPolicy(ecPolicy.getName());
} catch (RemoteException e) {
// built-in policy cannot been removed
assertTrue("Built-in policy cannot be removed",
ecPolicy.isSystemPolicy());
assertExceptionContains("System erasure coding policy", e);
return;
}
targetPolicy.setState(ErasureCodingPolicyState.REMOVED);
// Save namespace and restart NameNode
fs.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
fs.saveNamespace();
fs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
cluster.restartNameNodes();
cluster.waitActive();
ecPolicy = ErasureCodingPolicyManager.getInstance().getByID(
targetPolicy.getId());
assertEquals("The erasure coding policy saved into and loaded from " +
"fsImage is bad", targetPolicy, ecPolicy);
assertEquals("The erasure coding policy should be of removed state",
ErasureCodingPolicyState.REMOVED, ecPolicy.getState());
// Read file regardless of the erasure coding policy state
DFSTestUtil.readFileAsBytes(fs, filePath);
fs.delete(dirPath, true);
}
}