diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java index ccc69349297..86b4150777e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java @@ -172,6 +172,7 @@ public class FSImage implements Closeable { this.editLog = FSEditLog.newInstance(conf, storage, editsDirs); archivalManager = new NNStorageRetentionManager(conf, storage, editLog); + FSImageFormatProtobuf.initParallelLoad(conf); } void format(FSNamesystem fsn, String clusterId, boolean force) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java index dcc60bf5e71..478cec55d0d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java @@ -242,6 +242,7 @@ public class FSImageFormat { * the layout version. */ public static LoaderDelegator newLoader(Configuration conf, FSNamesystem fsn) { + return new LoaderDelegator(conf, fsn); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java index 5c40efc797c..7556727e629 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java @@ -88,6 +88,8 @@ public final class FSImageFormatProtobuf { private static final Logger LOG = LoggerFactory .getLogger(FSImageFormatProtobuf.class); + private static volatile boolean enableParallelLoad = false; + public static final class LoaderContext { private SerialNumberManager.StringTable stringTable; private final ArrayList refList = Lists.newArrayList(); @@ -582,9 +584,7 @@ public final class FSImageFormatProtobuf { } private static boolean enableParallelSaveAndLoad(Configuration conf) { - boolean loadInParallel = - conf.getBoolean(DFSConfigKeys.DFS_IMAGE_PARALLEL_LOAD_KEY, - DFSConfigKeys.DFS_IMAGE_PARALLEL_LOAD_DEFAULT); + boolean loadInParallel = enableParallelLoad; boolean compressionEnabled = conf.getBoolean( DFSConfigKeys.DFS_IMAGE_COMPRESS_KEY, DFSConfigKeys.DFS_IMAGE_COMPRESS_DEFAULT); @@ -600,6 +600,20 @@ public final class FSImageFormatProtobuf { return loadInParallel; } + public static void initParallelLoad(Configuration conf) { + enableParallelLoad = + conf.getBoolean(DFSConfigKeys.DFS_IMAGE_PARALLEL_LOAD_KEY, + DFSConfigKeys.DFS_IMAGE_PARALLEL_LOAD_DEFAULT); + } + + public static void refreshParallelSaveAndLoad(boolean enable) { + enableParallelLoad = enable; + } + + public static boolean getEnableParallelLoad() { + return enableParallelLoad; + } + public static final class Saver { public static final int CHECK_CANCEL_INTERVAL = 4096; private boolean writeSubSections = false; @@ -640,10 +654,6 @@ public final class FSImageFormatProtobuf { return inodesPerSubSection; } - public boolean shouldWriteSubSections() { - return writeSubSections; - } - /** * Commit the length and offset of a fsimage section to the summary index, * including the sub section, which will be committed before the section is diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java index 3225bab32a0..54671ea6cf4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java @@ -123,6 +123,8 @@ import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERV import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NN_NOT_BECOME_ACTIVE_IN_SAFEMODE; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NN_NOT_BECOME_ACTIVE_IN_SAFEMODE_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_IMAGE_PARALLEL_LOAD_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_IMAGE_PARALLEL_LOAD_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_NAMENODE_RPC_PORT_DEFAULT; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_ENABLED_KEY; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_ENABLED_DEFAULT; @@ -325,7 +327,8 @@ public class NameNode extends ReconfigurableBase implements DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY, DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION, DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, - DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY)); + DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY, + DFS_IMAGE_PARALLEL_LOAD_KEY)); private static final String USAGE = "Usage: hdfs namenode [" + StartupOption.BACKUP.getName() + "] | \n\t[" @@ -2184,6 +2187,8 @@ public class NameNode extends ReconfigurableBase implements .equals(DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY)) { reconfBlockPlacementPolicy(); return newVal; + } else if (property.equals(DFS_IMAGE_PARALLEL_LOAD_KEY)) { + return reconfigureParallelLoad(newVal); } else { throw new ReconfigurationException(property, newVal, getConf().get( property)); @@ -2359,6 +2364,17 @@ public class NameNode extends ReconfigurableBase implements return newVal; } + String reconfigureParallelLoad(String newVal) { + boolean enableParallelLoad; + if (newVal == null) { + enableParallelLoad = DFS_IMAGE_PARALLEL_LOAD_DEFAULT; + } else { + enableParallelLoad = Boolean.parseBoolean(newVal); + } + FSImageFormatProtobuf.refreshParallelSaveAndLoad(enableParallelLoad); + return Boolean.toString(enableParallelLoad); + } + @Override // ReconfigurableBase protected Configuration getNewConf() { return new HdfsConfiguration(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeReconfigure.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeReconfigure.java index 3265bed80c1..ada7c82150a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeReconfigure.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeReconfigure.java @@ -24,6 +24,7 @@ import org.junit.Test; import org.junit.Before; import org.junit.After; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_IMAGE_PARALLEL_LOAD_KEY; import static org.junit.Assert.*; import org.slf4j.Logger; @@ -378,6 +379,21 @@ public class TestNameNodeReconfigure { datanodeManager.getBlockInvalidateLimit()); } + @Test + public void testEnableParallelLoadAfterReconfigured() + throws ReconfigurationException { + final NameNode nameNode = cluster.getNameNode(); + + // By default, enableParallelLoad is false + assertEquals(false, FSImageFormatProtobuf.getEnableParallelLoad()); + + nameNode.reconfigureProperty(DFS_IMAGE_PARALLEL_LOAD_KEY, + Boolean.toString(true)); + + // After reconfigured, enableParallelLoad is true + assertEquals(true, FSImageFormatProtobuf.getEnableParallelLoad()); + } + @After public void shutDown() throws IOException { if (cluster != null) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java index 278db064fdd..736d66f2f4a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java @@ -21,6 +21,7 @@ import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONN import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_IMAGE_PARALLEL_LOAD_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY; @@ -429,11 +430,12 @@ public class TestDFSAdmin { final List outs = Lists.newArrayList(); final List errs = Lists.newArrayList(); getReconfigurableProperties("namenode", address, outs, errs); - assertEquals(12, outs.size()); + assertEquals(13, outs.size()); assertEquals(DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY, outs.get(1)); assertEquals(DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, outs.get(2)); assertEquals(DFS_HEARTBEAT_INTERVAL_KEY, outs.get(3)); - assertEquals(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, outs.get(4)); + assertEquals(DFS_IMAGE_PARALLEL_LOAD_KEY, outs.get(4)); + assertEquals(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, outs.get(5)); assertEquals(errs.size(), 0); }