HDFS-8865. Improve quota initialization performance. Contributed by Kihwal Lee.

(cherry picked from commit b6ceee9bf4)
This commit is contained in:
Kihwal Lee 2015-08-28 13:19:28 -05:00
parent 792b9c0eaf
commit 5184779418
8 changed files with 194 additions and 55 deletions

View File

@ -508,6 +508,8 @@ Release 2.8.0 - UNRELEASED
HDFS-8962. Clean up checkstyle warnings in o.a.h.hdfs.DfsClientConf.
(Mingliang Liu via wheat9)
HDFS-8865. Improve quota initialization performance. (kihwal)
OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

View File

@ -210,6 +210,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_NAMENODE_EDITS_DIR_MINIMUM_KEY = "dfs.namenode.edits.dir.minimum";
public static final int DFS_NAMENODE_EDITS_DIR_MINIMUM_DEFAULT = 1;
public static final String DFS_NAMENODE_QUOTA_INIT_THREADS_KEY = "dfs.namenode.quota.init-threads";
public static final int DFS_NAMENODE_QUOTA_INIT_THREADS_DEFAULT = 4;
public static final String DFS_NAMENODE_EDIT_LOG_AUTOROLL_MULTIPLIER_THRESHOLD = "dfs.namenode.edit.log.autoroll.multiplier.threshold";
public static final float DFS_NAMENODE_EDIT_LOG_AUTOROLL_MULTIPLIER_THRESHOLD_DEFAULT = 2.0f;

View File

@ -25,6 +25,7 @@ import java.util.zip.Checksum;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
@ -83,6 +84,8 @@ public class BackupImage extends FSImage {
private FSNamesystem namesystem;
private int quotaInitThreads;
/**
* Construct a backup image.
* @param conf Configuration
@ -92,6 +95,9 @@ public class BackupImage extends FSImage {
super(conf);
storage.setDisablePreUpgradableLayoutCheck(true);
bnState = BNState.DROP_UNTIL_NEXT_ROLL;
quotaInitThreads = conf.getInt(
DFSConfigKeys.DFS_NAMENODE_QUOTA_INIT_THREADS_KEY,
DFSConfigKeys.DFS_NAMENODE_QUOTA_INIT_THREADS_DEFAULT);
}
synchronized FSNamesystem getNamesystem() {
@ -231,7 +237,7 @@ public class BackupImage extends FSImage {
FSImage.updateCountForQuota(
getNamesystem().dir.getBlockStoragePolicySuite(),
getNamesystem().dir.rootDir); // inefficient!
getNamesystem().dir.rootDir, quotaInitThreads);
} finally {
backupInputStream.clear();
}

View File

@ -27,6 +27,8 @@ import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
@ -70,6 +72,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.util.Canceler;
import org.apache.hadoop.hdfs.util.EnumCounters;
import org.apache.hadoop.hdfs.util.MD5FileUtils;
import org.apache.hadoop.hdfs.util.ReadOnlyList;
import org.apache.hadoop.io.MD5Hash;
import org.apache.hadoop.util.Time;
@ -100,6 +103,7 @@ public class FSImage implements Closeable {
final private Configuration conf;
protected NNStorageRetentionManager archivalManager;
private int quotaInitThreads;
/* Used to make sure there are no concurrent checkpoints for a given txid
* The checkpoint here could be one of the following operations.
@ -143,6 +147,10 @@ public class FSImage implements Closeable {
storage.setRestoreFailedStorage(true);
}
this.quotaInitThreads = conf.getInt(
DFSConfigKeys.DFS_NAMENODE_QUOTA_INIT_THREADS_KEY,
DFSConfigKeys.DFS_NAMENODE_QUOTA_INIT_THREADS_DEFAULT);
this.editLog = new FSEditLog(conf, storage, editsDirs);
archivalManager = new NNStorageRetentionManager(conf, storage, editLog);
@ -851,7 +859,7 @@ public class FSImage implements Closeable {
FSEditLog.closeAllStreams(editStreams);
// update the counts
updateCountForQuota(target.getBlockManager().getStoragePolicySuite(),
target.dir.rootDir);
target.dir.rootDir, quotaInitThreads);
}
prog.endPhase(Phase.LOADING_EDITS);
return lastAppliedTxId - prevLastAppliedTxId;
@ -866,65 +874,104 @@ public class FSImage implements Closeable {
* throw QuotaExceededException.
*/
static void updateCountForQuota(BlockStoragePolicySuite bsps,
INodeDirectory root) {
updateCountForQuotaRecursively(bsps, root.getStoragePolicyID(), root,
new QuotaCounts.Builder().build());
}
INodeDirectory root, int threads) {
threads = (threads < 1) ? 1 : threads;
LOG.info("Initializing quota with " + threads + " thread(s)");
long start = Time.now();
QuotaCounts counts = new QuotaCounts.Builder().build();
ForkJoinPool p = new ForkJoinPool(threads);
RecursiveAction task = new InitQuotaTask(bsps, root.getStoragePolicyID(),
root, counts);
p.execute(task);
task.join();
LOG.info("Quota initialization completed in " + (Time.now() - start) +
" milliseconds\n" + counts);
}
private static void updateCountForQuotaRecursively(BlockStoragePolicySuite bsps,
byte blockStoragePolicyId, INodeDirectory dir, QuotaCounts counts) {
final long parentNamespace = counts.getNameSpace();
final long parentStoragespace = counts.getStorageSpace();
final EnumCounters<StorageType> parentTypeSpaces = counts.getTypeSpaces();
/**
* parallel initialization using fork-join.
*/
private static class InitQuotaTask extends RecursiveAction {
private final INodeDirectory dir;
private final QuotaCounts counts;
private final BlockStoragePolicySuite bsps;
private final byte blockStoragePolicyId;
dir.computeQuotaUsage4CurrentDirectory(bsps, blockStoragePolicyId, counts);
for (INode child : dir.getChildrenList(Snapshot.CURRENT_STATE_ID)) {
final byte childPolicyId = child.getStoragePolicyIDForQuota(blockStoragePolicyId);
if (child.isDirectory()) {
updateCountForQuotaRecursively(bsps, childPolicyId,
child.asDirectory(), counts);
} else {
// file or symlink: count here to reduce recursive calls.
counts.add(child.computeQuotaUsage(bsps, childPolicyId, false,
Snapshot.CURRENT_STATE_ID));
}
public InitQuotaTask(BlockStoragePolicySuite bsps,
byte blockStoragePolicyId, INodeDirectory dir, QuotaCounts counts) {
this.dir = dir;
this.counts = counts;
this.bsps = bsps;
this.blockStoragePolicyId = blockStoragePolicyId;
}
if (dir.isQuotaSet()) {
// check if quota is violated. It indicates a software bug.
final QuotaCounts q = dir.getQuotaCounts();
final long namespace = counts.getNameSpace() - parentNamespace;
final long nsQuota = q.getNameSpace();
if (Quota.isViolated(nsQuota, namespace)) {
LOG.warn("Namespace quota violation in image for "
+ dir.getFullPathName()
+ " quota = " + nsQuota + " < consumed = " + namespace);
}
public void compute() {
QuotaCounts myCounts = new QuotaCounts.Builder().build();
dir.computeQuotaUsage4CurrentDirectory(bsps, blockStoragePolicyId,
myCounts);
final long ssConsumed = counts.getStorageSpace() - parentStoragespace;
final long ssQuota = q.getStorageSpace();
if (Quota.isViolated(ssQuota, ssConsumed)) {
LOG.warn("Storagespace quota violation in image for "
+ dir.getFullPathName()
+ " quota = " + ssQuota + " < consumed = " + ssConsumed);
}
ReadOnlyList<INode> children =
dir.getChildrenList(Snapshot.CURRENT_STATE_ID);
final EnumCounters<StorageType> typeSpaces = counts.getTypeSpaces();
for (StorageType t : StorageType.getTypesSupportingQuota()) {
final long typeSpace = typeSpaces.get(t) - parentTypeSpaces.get(t);
final long typeQuota = q.getTypeSpaces().get(t);
if (Quota.isViolated(typeQuota, typeSpace)) {
LOG.warn("Storage type quota violation in image for "
+ dir.getFullPathName()
+ " type = " + t.toString() + " quota = "
+ typeQuota + " < consumed " + typeSpace);
if (children.size() > 0) {
List<InitQuotaTask> subtasks = new ArrayList<InitQuotaTask>();
for (INode child : children) {
final byte childPolicyId =
child.getStoragePolicyIDForQuota(blockStoragePolicyId);
if (child.isDirectory()) {
subtasks.add(new InitQuotaTask(bsps, childPolicyId,
child.asDirectory(), myCounts));
} else {
// file or symlink. count using the local counts variable
myCounts.add(child.computeQuotaUsage(bsps, childPolicyId, false,
Snapshot.CURRENT_STATE_ID));
}
}
// invoke and wait for completion
invokeAll(subtasks);
}
dir.getDirectoryWithQuotaFeature().setSpaceConsumed(namespace, ssConsumed,
typeSpaces);
if (dir.isQuotaSet()) {
// check if quota is violated. It indicates a software bug.
final QuotaCounts q = dir.getQuotaCounts();
final long nsConsumed = myCounts.getNameSpace();
final long nsQuota = q.getNameSpace();
if (Quota.isViolated(nsQuota, nsConsumed)) {
LOG.warn("Namespace quota violation in image for "
+ dir.getFullPathName()
+ " quota = " + nsQuota + " < consumed = " + nsConsumed);
}
final long ssConsumed = myCounts.getStorageSpace();
final long ssQuota = q.getStorageSpace();
if (Quota.isViolated(ssQuota, ssConsumed)) {
LOG.warn("Storagespace quota violation in image for "
+ dir.getFullPathName()
+ " quota = " + ssQuota + " < consumed = " + ssConsumed);
}
final EnumCounters<StorageType> tsConsumed = myCounts.getTypeSpaces();
for (StorageType t : StorageType.getTypesSupportingQuota()) {
final long typeSpace = tsConsumed.get(t);
final long typeQuota = q.getTypeSpaces().get(t);
if (Quota.isViolated(typeQuota, typeSpace)) {
LOG.warn("Storage type quota violation in image for "
+ dir.getFullPathName()
+ " type = " + t.toString() + " quota = "
+ typeQuota + " < consumed " + typeSpace);
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("Setting quota for " + dir + "\n" + myCounts);
}
dir.getDirectoryWithQuotaFeature().setSpaceConsumed(nsConsumed,
ssConsumed, tsConsumed);
}
synchronized(counts) {
counts.add(myCounts);
}
}
}

View File

@ -159,6 +159,13 @@ public class QuotaCounts {
return tsCounts.anyGreaterOrEqual(val);
}
@Override
public String toString() {
return "name space=" + getNameSpace() +
"\nstorage space=" + getStorageSpace() +
"\nstorage types=" + getTypeSpaces();
}
@Override
public boolean equals(Object obj) {
if (obj == this) {
@ -176,4 +183,5 @@ public class QuotaCounts {
assert false : "hashCode not designed";
return 42; // any arbitrary constant will do
}
}
}

View File

@ -2414,4 +2414,14 @@
</description>
</property>
<property>
<name>dfs.namenode.quota.init-threads</name>
<value>4</value>
<description>
The number of concurrent threads to be used in quota initialization. The
speed of quota initialization also affects the namenode fail-over latency.
If the size of name space is big, try increasing this.
</description>
</property>
</configuration>

View File

@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.util.EnumSet;
import java.util.HashMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ContentSummary;
@ -36,6 +37,7 @@ import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.QuotaByStorageTypeExceededException;
import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
import org.apache.hadoop.ipc.RemoteException;
import org.junit.After;
import org.junit.Assert;
@ -310,4 +312,66 @@ public class TestDiskspaceQuotaUpdate {
dfs.recoverLease(file);
cluster.restartNameNodes();
}
/**
* Check whether the quota is initialized correctly.
*/
@Test
public void testQuotaInitialization() throws Exception {
final int size = 500;
Path testDir = new Path("/testDir");
long expectedSize = 3 * BLOCKSIZE + BLOCKSIZE/2;
dfs.mkdirs(testDir);
dfs.setQuota(testDir, size*4, expectedSize*size*2);
Path[] testDirs = new Path[size];
for (int i = 0; i < size; i++) {
testDirs[i] = new Path(testDir, "sub" + i);
dfs.mkdirs(testDirs[i]);
dfs.setQuota(testDirs[i], 100, 1000000);
DFSTestUtil.createFile(dfs, new Path(testDirs[i], "a"), expectedSize,
(short)1, 1L);
}
// Directly access the name system to obtain the current cached usage.
INodeDirectory root = fsdir.getRoot();
HashMap<String, Long> nsMap = new HashMap<String, Long>();
HashMap<String, Long> dsMap = new HashMap<String, Long>();
scanDirsWithQuota(root, nsMap, dsMap, false);
FSImage.updateCountForQuota(
fsdir.getBlockManager().getStoragePolicySuite(), root, 1);
scanDirsWithQuota(root, nsMap, dsMap, true);
FSImage.updateCountForQuota(
fsdir.getBlockManager().getStoragePolicySuite(), root, 2);
scanDirsWithQuota(root, nsMap, dsMap, true);
FSImage.updateCountForQuota(
fsdir.getBlockManager().getStoragePolicySuite(), root, 4);
scanDirsWithQuota(root, nsMap, dsMap, true);
}
private void scanDirsWithQuota(INodeDirectory dir,
HashMap<String, Long> nsMap,
HashMap<String, Long> dsMap, boolean verify) {
if (dir.isQuotaSet()) {
// get the current consumption
QuotaCounts q = dir.getDirectoryWithQuotaFeature().getSpaceConsumed();
String name = dir.getFullPathName();
if (verify) {
assertEquals(nsMap.get(name).longValue(), q.getNameSpace());
assertEquals(dsMap.get(name).longValue(), q.getStorageSpace());
} else {
nsMap.put(name, Long.valueOf(q.getNameSpace()));
dsMap.put(name, Long.valueOf(q.getStorageSpace()));
}
}
for (INode child : dir.getChildrenList(Snapshot.CURRENT_STATE_ID)) {
if (child instanceof INodeDirectory) {
scanDirsWithQuota((INodeDirectory)child, nsMap, dsMap, verify);
}
}
}
}

View File

@ -159,7 +159,7 @@ public class TestFSImageWithSnapshot {
try {
loader.load(imageFile, false);
FSImage.updateCountForQuota(fsn.getBlockManager().getStoragePolicySuite(),
INodeDirectory.valueOf(fsn.getFSDirectory().getINode("/"), "/"));
INodeDirectory.valueOf(fsn.getFSDirectory().getINode("/"), "/"), 4);
} finally {
fsn.getFSDirectory().writeUnlock();
fsn.writeUnlock();
@ -509,4 +509,4 @@ public class TestFSImageWithSnapshot {
fsn = cluster.getNamesystem();
hdfs = cluster.getFileSystem();
}
}
}