diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ErasureCodingPolicyManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ErasureCodingPolicyManager.java index 77deb856c26..3a46c3049ac 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ErasureCodingPolicyManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ErasureCodingPolicyManager.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs.server.namenode; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; @@ -289,6 +290,11 @@ public final class ErasureCodingPolicyManager { } ecPolicy.setState(ErasureCodingPolicyState.REMOVED); LOG.info("Remove erasure coding policy " + name); + + /* + * TODO HDFS-12405 postpone the delete removed policy to Namenode restart + * time. + * */ } @VisibleForTesting @@ -338,4 +344,36 @@ public final class ErasureCodingPolicyManager { enabledPoliciesByName.values().toArray(new ErasureCodingPolicy[0]); LOG.info("Enable the erasure coding policy " + name); } + + /** + * Load an erasure coding policy into erasure coding manager. + */ + private void loadPolicy(ErasureCodingPolicy policy) { + if (!CodecUtil.hasCodec(policy.getCodecName()) || + policy.getCellSize() > maxCellSize) { + // If policy is not supported in current system, set the policy state to + // DISABLED; + policy.setState(ErasureCodingPolicyState.DISABLED); + } + + this.policiesByName.put(policy.getName(), policy); + this.policiesByID.put(policy.getId(), policy); + if (policy.isEnabled()) { + enablePolicy(policy.getName()); + } + } + + /** + * Reload erasure coding policies from fsImage. + * + * @param ecPolicies contains ErasureCodingPolicy list + * + */ + public synchronized void loadPolicies(List ecPolicies) { + Preconditions.checkNotNull(ecPolicies); + for (ErasureCodingPolicy p : ecPolicies) { + loadPolicy(p); + } + allPolicies = policiesByName.values().toArray(new ErasureCodingPolicy[0]); + } } \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java index 9f8be89d64a..5e60038ce7a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java @@ -334,10 +334,10 @@ public final class FSImageFormatPBINode { boolean isStriped = f.hasErasureCodingPolicyID(); assert ((!isStriped) || (isStriped && !f.hasReplication())); Short replication = (!isStriped ? (short) f.getReplication() : null); + Byte ecPolicyID = (isStriped ? + (byte) f.getErasureCodingPolicyID() : null); ErasureCodingPolicy ecPolicy = isStriped ? - fsn.getErasureCodingPolicyManager().getByID( - (byte) f.getErasureCodingPolicyID()) : null; - Byte ecPolicyID = (isStriped ? ecPolicy.getId() : null); + fsn.getErasureCodingPolicyManager().getByID(ecPolicyID) : null; BlockInfo[] blocks = new BlockInfo[bp.size()]; for (int i = 0; i < bp.size(); ++i) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java index 22331fe0b33..ad8cdfcb138 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java @@ -36,10 +36,13 @@ import java.security.MessageDigest; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; +import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; +import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; import org.apache.hadoop.io.compress.CompressionOutputStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,6 +50,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheDirectiveInfoProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CachePoolInfoProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ErasureCodingPolicyProto; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager; import org.apache.hadoop.hdfs.server.blockmanagement.BlockIdManager; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; @@ -55,6 +59,7 @@ import org.apache.hadoop.hdfs.server.namenode.FsImageProto.FileSummary; import org.apache.hadoop.hdfs.server.namenode.FsImageProto.NameSystemSection; import org.apache.hadoop.hdfs.server.namenode.FsImageProto.SecretManagerSection; import org.apache.hadoop.hdfs.server.namenode.FsImageProto.StringTableSection; +import org.apache.hadoop.hdfs.server.namenode.FsImageProto.ErasureCodingSection; import org.apache.hadoop.hdfs.server.namenode.snapshot.FSImageFormatPBSnapshot; import org.apache.hadoop.hdfs.server.namenode.startupprogress.Phase; import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress; @@ -287,6 +292,12 @@ public final class FSImageFormatProtobuf { prog.endStep(Phase.LOADING_FSIMAGE, step); } break; + case ERASURE_CODING: + Step step = new Step(StepType.ERASURE_CODING_POLICIES); + prog.beginStep(Phase.LOADING_FSIMAGE, step); + loadErasureCodingSection(in); + prog.endStep(Phase.LOADING_FSIMAGE, step); + break; default: LOG.warn("Unrecognized section {}", n); break; @@ -366,6 +377,17 @@ public final class FSImageFormatProtobuf { new CacheManager.PersistState(s, pools, directives)); } + private void loadErasureCodingSection(InputStream in) + throws IOException { + ErasureCodingSection s = ErasureCodingSection.parseDelimitedFrom(in); + List ecPolicies = Lists + .newArrayListWithCapacity(s.getPoliciesCount()); + for (int i = 0; i < s.getPoliciesCount(); ++i) { + ecPolicies.add(PBHelperClient.convertErasureCodingPolicy( + s.getPolicies(i))); + } + fsn.getErasureCodingPolicyManager().loadPolicies(ecPolicies); + } } public static final class Saver { @@ -497,7 +519,13 @@ public final class FSImageFormatProtobuf { // depends on this behavior. context.checkCancelled(); - Step step = new Step(StepType.INODES, filePath); + // Erasure coding policies should be saved before inodes + Step step = new Step(StepType.ERASURE_CODING_POLICIES, filePath); + prog.beginStep(Phase.SAVING_CHECKPOINT, step); + saveErasureCodingSection(b); + prog.endStep(Phase.SAVING_CHECKPOINT, step); + + step = new Step(StepType.INODES, filePath); prog.beginStep(Phase.SAVING_CHECKPOINT, step); saveInodes(b); saveSnapshots(b); @@ -555,6 +583,23 @@ public final class FSImageFormatProtobuf { commitSection(summary, SectionName.CACHE_MANAGER); } + private void saveErasureCodingSection( + FileSummary.Builder summary) throws IOException { + final FSNamesystem fsn = context.getSourceNamesystem(); + ErasureCodingPolicy[] ecPolicies = + fsn.getErasureCodingPolicyManager().getPolicies(); + ArrayList ecPolicyProtoes = + new ArrayList(); + for (ErasureCodingPolicy p : ecPolicies) { + ecPolicyProtoes.add(PBHelperClient.convertErasureCodingPolicy(p)); + } + + ErasureCodingSection section = ErasureCodingSection.newBuilder(). + addAllPolicies(ecPolicyProtoes).build(); + section.writeDelimitedTo(sectionOutputStream); + commitSection(summary, SectionName.ERASURE_CODING); + } + private void saveNameSystemSection(FileSummary.Builder summary) throws IOException { final FSNamesystem fsn = context.getSourceNamesystem(); @@ -606,6 +651,7 @@ public final class FSImageFormatProtobuf { NS_INFO("NS_INFO"), STRING_TABLE("STRING_TABLE"), EXTENDED_ACL("EXTENDED_ACL"), + ERASURE_CODING("ERASURE_CODING"), INODE("INODE"), INODE_REFERENCE("INODE_REFERENCE"), SNAPSHOT("SNAPSHOT"), diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/startupprogress/StepType.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/startupprogress/StepType.java index 1b43d6a2b09..83cf6cffac0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/startupprogress/StepType.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/startupprogress/StepType.java @@ -52,7 +52,12 @@ public enum StepType { /** * The namenode is performing an operation related to cache entries. */ - CACHE_ENTRIES("CacheEntries", "cache entries"); + CACHE_ENTRIES("CacheEntries", "cache entries"), + + /** + * The namenode is performing an operation related to erasure coding policies. + */ + ERASURE_CODING_POLICIES("ErasureCodingPolicies", "erasure coding policies"); private final String name, description; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/fsimage.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/fsimage.proto index 4e21310d2a3..101a0605acb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/fsimage.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/fsimage.proto @@ -346,3 +346,7 @@ message CacheManagerSection { // repeated CachePoolInfoProto pools // repeated CacheDirectiveInfoProto directives } + +message ErasureCodingSection { + repeated ErasureCodingPolicyProto policies = 1; +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestErasureCodingPolicies.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestErasureCodingPolicies.java index 3c549b185e2..19277c447b8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestErasureCodingPolicies.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestErasureCodingPolicies.java @@ -209,11 +209,6 @@ public class TestErasureCodingPolicies { cluster.restartNameNodes(); cluster.waitActive(); - // Only default policy should be enabled after restart - Assert.assertEquals("Only default policy should be enabled after restart", - 1, - ErasureCodingPolicyManager.getInstance().getEnabledPolicies().length); - // Already set directory-level policies should still be in effect Path disabledPolicy = new Path(dir1, "afterDisabled"); Assert.assertEquals("Dir does not have policy set", diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java index 16f625891b1..c9d3255a310 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hdfs.server.namenode; +import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; @@ -34,7 +35,9 @@ import java.io.IOException; import java.util.EnumSet; import org.apache.hadoop.hdfs.StripedFileTestUtil; +import org.apache.hadoop.hdfs.protocol.AddECPolicyResponse; import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyState; import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies; import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; @@ -43,6 +46,8 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped; import org.apache.hadoop.hdfs.protocol.BlockType; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption; +import org.apache.hadoop.io.erasurecode.ECSchema; +import org.apache.hadoop.ipc.RemoteException; import org.junit.Assert; import org.apache.hadoop.fs.permission.PermissionStatus; @@ -810,4 +815,150 @@ public class TestFSImage { } } } + + /** + * Test persist and load erasure coding policies. + */ + @Test + public void testSaveAndLoadErasureCodingPolicies() throws IOException{ + Configuration conf = new Configuration(); + final int blockSize = 16 * 1024 * 1024; + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize); + try (MiniDFSCluster cluster = + new MiniDFSCluster.Builder(conf).numDataNodes(10).build()) { + cluster.waitActive(); + DistributedFileSystem fs = cluster.getFileSystem(); + DFSTestUtil.enableAllECPolicies(fs); + + // Save namespace and restart NameNode + fs.setSafeMode(SafeModeAction.SAFEMODE_ENTER); + fs.saveNamespace(); + fs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE); + + cluster.restartNameNodes(); + cluster.waitActive(); + + assertEquals("Erasure coding policy number should match", + SystemErasureCodingPolicies.getPolicies().size(), + ErasureCodingPolicyManager.getInstance().getPolicies().length); + + // Add new erasure coding policy + ECSchema newSchema = new ECSchema("rs", 5, 4); + ErasureCodingPolicy newPolicy = + new ErasureCodingPolicy(newSchema, 2 * 1024, (byte) 254); + ErasureCodingPolicy[] policies = new ErasureCodingPolicy[]{newPolicy}; + AddECPolicyResponse[] ret = fs.addErasureCodingPolicies(policies); + assertEquals(1, ret.length); + assertEquals(true, ret[0].isSucceed()); + newPolicy = ret[0].getPolicy(); + + // Save namespace and restart NameNode + fs.setSafeMode(SafeModeAction.SAFEMODE_ENTER); + fs.saveNamespace(); + fs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE); + + cluster.restartNameNodes(); + cluster.waitActive(); + + assertEquals("Erasure coding policy number should match", + SystemErasureCodingPolicies.getPolicies().size() + 1, + ErasureCodingPolicyManager.getInstance().getPolicies().length); + ErasureCodingPolicy ecPolicy = + ErasureCodingPolicyManager.getInstance().getByID(newPolicy.getId()); + assertEquals("Newly added erasure coding policy is not found", + newPolicy, ecPolicy); + assertEquals( + "Newly added erasure coding policy should be of disabled state", + ErasureCodingPolicyState.DISABLED, ecPolicy.getState()); + + // Test enable/disable/remove user customized erasure coding policy + testChangeErasureCodingPolicyState(cluster, blockSize, newPolicy); + // Test enable/disable built-in erasure coding policy + testChangeErasureCodingPolicyState(cluster, blockSize, + SystemErasureCodingPolicies.getByID((byte) 1)); + } + } + + + private void testChangeErasureCodingPolicyState(MiniDFSCluster cluster, + int blockSize, ErasureCodingPolicy targetPolicy) throws IOException { + DistributedFileSystem fs = cluster.getFileSystem(); + + // 1. Enable an erasure coding policy + fs.enableErasureCodingPolicy(targetPolicy.getName()); + targetPolicy.setState(ErasureCodingPolicyState.ENABLED); + // Create file, using the new policy + final Path dirPath = new Path("/striped"); + final Path filePath = new Path(dirPath, "file"); + final int fileLength = blockSize * targetPolicy.getNumDataUnits(); + fs.mkdirs(dirPath); + fs.setErasureCodingPolicy(dirPath, targetPolicy.getName()); + final byte[] bytes = StripedFileTestUtil.generateBytes(fileLength); + DFSTestUtil.writeFile(fs, filePath, bytes); + + + // Save namespace and restart NameNode + fs.setSafeMode(SafeModeAction.SAFEMODE_ENTER); + fs.saveNamespace(); + fs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE); + + cluster.restartNameNodes(); + cluster.waitActive(); + ErasureCodingPolicy ecPolicy = + ErasureCodingPolicyManager.getInstance().getByID(targetPolicy.getId()); + assertEquals("The erasure coding policy is not found", + targetPolicy, ecPolicy); + assertEquals("The erasure coding policy should be of enabled state", + ErasureCodingPolicyState.ENABLED, ecPolicy.getState()); + // Read file regardless of the erasure coding policy state + DFSTestUtil.readFileAsBytes(fs, filePath); + + // 2. Disable an erasure coding policy + fs.disableErasureCodingPolicy(ecPolicy.getName()); + targetPolicy.setState(ErasureCodingPolicyState.DISABLED); + // Save namespace and restart NameNode + fs.setSafeMode(SafeModeAction.SAFEMODE_ENTER); + fs.saveNamespace(); + fs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE); + + cluster.restartNameNodes(); + cluster.waitActive(); + ecPolicy = + ErasureCodingPolicyManager.getInstance().getByID(targetPolicy.getId()); + assertEquals("The erasure coding policy is not found", + targetPolicy, ecPolicy); + assertEquals("The erasure coding policy should be of disabled state", + ErasureCodingPolicyState.DISABLED, ecPolicy.getState()); + // Read file regardless of the erasure coding policy state + DFSTestUtil.readFileAsBytes(fs, filePath); + + // 3. Remove an erasure coding policy + try { + fs.removeErasureCodingPolicy(ecPolicy.getName()); + } catch (RemoteException e) { + // built-in policy cannot been removed + assertTrue("Built-in policy cannot be removed", + ecPolicy.isSystemPolicy()); + assertExceptionContains("System erasure coding policy", e); + return; + } + + targetPolicy.setState(ErasureCodingPolicyState.REMOVED); + // Save namespace and restart NameNode + fs.setSafeMode(SafeModeAction.SAFEMODE_ENTER); + fs.saveNamespace(); + fs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE); + + cluster.restartNameNodes(); + cluster.waitActive(); + ecPolicy = ErasureCodingPolicyManager.getInstance().getByID( + targetPolicy.getId()); + assertEquals("The erasure coding policy saved into and loaded from " + + "fsImage is bad", targetPolicy, ecPolicy); + assertEquals("The erasure coding policy should be of removed state", + ErasureCodingPolicyState.REMOVED, ecPolicy.getState()); + // Read file regardless of the erasure coding policy state + DFSTestUtil.readFileAsBytes(fs, filePath); + fs.delete(dirPath, true); + } }