HDFS-15830. Support to make dfs.image.parallel.load reconfigurable (#2694)

This commit is contained in:
Hui Fei 2021-02-19 09:07:22 +08:00 committed by GitHub
parent e391844e8e
commit 2970bd93f3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 56 additions and 10 deletions

View File

@ -172,6 +172,7 @@ public class FSImage implements Closeable {
this.editLog = FSEditLog.newInstance(conf, storage, editsDirs);
archivalManager = new NNStorageRetentionManager(conf, storage, editLog);
FSImageFormatProtobuf.initParallelLoad(conf);
}
void format(FSNamesystem fsn, String clusterId, boolean force)

View File

@ -242,6 +242,7 @@ public class FSImageFormat {
* the layout version.
*/
public static LoaderDelegator newLoader(Configuration conf, FSNamesystem fsn) {
return new LoaderDelegator(conf, fsn);
}

View File

@ -88,6 +88,8 @@ public final class FSImageFormatProtobuf {
private static final Logger LOG = LoggerFactory
.getLogger(FSImageFormatProtobuf.class);
private static volatile boolean enableParallelLoad = false;
public static final class LoaderContext {
private SerialNumberManager.StringTable stringTable;
private final ArrayList<INodeReference> refList = Lists.newArrayList();
@ -582,9 +584,7 @@ public final class FSImageFormatProtobuf {
}
private static boolean enableParallelSaveAndLoad(Configuration conf) {
boolean loadInParallel =
conf.getBoolean(DFSConfigKeys.DFS_IMAGE_PARALLEL_LOAD_KEY,
DFSConfigKeys.DFS_IMAGE_PARALLEL_LOAD_DEFAULT);
boolean loadInParallel = enableParallelLoad;
boolean compressionEnabled = conf.getBoolean(
DFSConfigKeys.DFS_IMAGE_COMPRESS_KEY,
DFSConfigKeys.DFS_IMAGE_COMPRESS_DEFAULT);
@ -600,6 +600,20 @@ public final class FSImageFormatProtobuf {
return loadInParallel;
}
public static void initParallelLoad(Configuration conf) {
enableParallelLoad =
conf.getBoolean(DFSConfigKeys.DFS_IMAGE_PARALLEL_LOAD_KEY,
DFSConfigKeys.DFS_IMAGE_PARALLEL_LOAD_DEFAULT);
}
public static void refreshParallelSaveAndLoad(boolean enable) {
enableParallelLoad = enable;
}
public static boolean getEnableParallelLoad() {
return enableParallelLoad;
}
public static final class Saver {
public static final int CHECK_CANCEL_INTERVAL = 4096;
private boolean writeSubSections = false;
@ -640,10 +654,6 @@ public final class FSImageFormatProtobuf {
return inodesPerSubSection;
}
public boolean shouldWriteSubSections() {
return writeSubSections;
}
/**
* Commit the length and offset of a fsimage section to the summary index,
* including the sub section, which will be committed before the section is

View File

@ -123,6 +123,8 @@ import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERV
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NN_NOT_BECOME_ACTIVE_IN_SAFEMODE;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NN_NOT_BECOME_ACTIVE_IN_SAFEMODE_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_IMAGE_PARALLEL_LOAD_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_IMAGE_PARALLEL_LOAD_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_NAMENODE_RPC_PORT_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_ENABLED_KEY;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_ENABLED_DEFAULT;
@ -325,7 +327,8 @@ public class NameNode extends ReconfigurableBase implements
DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY,
DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION,
DFS_BLOCK_REPLICATOR_CLASSNAME_KEY,
DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY));
DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY,
DFS_IMAGE_PARALLEL_LOAD_KEY));
private static final String USAGE = "Usage: hdfs namenode ["
+ StartupOption.BACKUP.getName() + "] | \n\t["
@ -2184,6 +2187,8 @@ public class NameNode extends ReconfigurableBase implements
.equals(DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY)) {
reconfBlockPlacementPolicy();
return newVal;
} else if (property.equals(DFS_IMAGE_PARALLEL_LOAD_KEY)) {
return reconfigureParallelLoad(newVal);
} else {
throw new ReconfigurationException(property, newVal, getConf().get(
property));
@ -2359,6 +2364,17 @@ public class NameNode extends ReconfigurableBase implements
return newVal;
}
String reconfigureParallelLoad(String newVal) {
boolean enableParallelLoad;
if (newVal == null) {
enableParallelLoad = DFS_IMAGE_PARALLEL_LOAD_DEFAULT;
} else {
enableParallelLoad = Boolean.parseBoolean(newVal);
}
FSImageFormatProtobuf.refreshParallelSaveAndLoad(enableParallelLoad);
return Boolean.toString(enableParallelLoad);
}
@Override // ReconfigurableBase
protected Configuration getNewConf() {
return new HdfsConfiguration();

View File

@ -24,6 +24,7 @@ import org.junit.Test;
import org.junit.Before;
import org.junit.After;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_IMAGE_PARALLEL_LOAD_KEY;
import static org.junit.Assert.*;
import org.slf4j.Logger;
@ -378,6 +379,21 @@ public class TestNameNodeReconfigure {
datanodeManager.getBlockInvalidateLimit());
}
@Test
public void testEnableParallelLoadAfterReconfigured()
throws ReconfigurationException {
final NameNode nameNode = cluster.getNameNode();
// By default, enableParallelLoad is false
assertEquals(false, FSImageFormatProtobuf.getEnableParallelLoad());
nameNode.reconfigureProperty(DFS_IMAGE_PARALLEL_LOAD_KEY,
Boolean.toString(true));
// After reconfigured, enableParallelLoad is true
assertEquals(true, FSImageFormatProtobuf.getEnableParallelLoad());
}
@After
public void shutDown() throws IOException {
if (cluster != null) {

View File

@ -21,6 +21,7 @@ import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONN
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_IMAGE_PARALLEL_LOAD_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY;
@ -429,11 +430,12 @@ public class TestDFSAdmin {
final List<String> outs = Lists.newArrayList();
final List<String> errs = Lists.newArrayList();
getReconfigurableProperties("namenode", address, outs, errs);
assertEquals(12, outs.size());
assertEquals(13, outs.size());
assertEquals(DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY, outs.get(1));
assertEquals(DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, outs.get(2));
assertEquals(DFS_HEARTBEAT_INTERVAL_KEY, outs.get(3));
assertEquals(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, outs.get(4));
assertEquals(DFS_IMAGE_PARALLEL_LOAD_KEY, outs.get(4));
assertEquals(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, outs.get(5));
assertEquals(errs.size(), 0);
}