svn merge -c 1541971 merging from trunk to branch-2 to fix HDFS-4995 Make getContentSummary less expensive.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1541975 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Kihwal Lee 2013-11-14 16:55:31 +00:00
parent 2d39336213
commit f62fbc7f17
15 changed files with 280 additions and 43 deletions

View File

@ -114,6 +114,8 @@ Release 2.3.0 - UNRELEASED
HDFS-5487. Introduce unit test for TokenAspect. (Haohui Mai via jing9) HDFS-5487. Introduce unit test for TokenAspect. (Haohui Mai via jing9)
HDFS-4995. Make getContentSummary less expensive. (kihwal)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-5239. Allow FSNamesystem lock fairness to be configurable (daryn) HDFS-5239. Allow FSNamesystem lock fairness to be configurable (daryn)

View File

@ -192,6 +192,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_LIST_LIMIT = "dfs.ls.limit"; public static final String DFS_LIST_LIMIT = "dfs.ls.limit";
public static final int DFS_LIST_LIMIT_DEFAULT = 1000; public static final int DFS_LIST_LIMIT_DEFAULT = 1000;
public static final String DFS_CONTENT_SUMMARY_LIMIT_KEY = "dfs.content-summary.limit";
public static final int DFS_CONTENT_SUMMARY_LIMIT_DEFAULT = 0;
public static final String DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY = "dfs.datanode.failed.volumes.tolerated"; public static final String DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY = "dfs.datanode.failed.volumes.tolerated";
public static final int DFS_DATANODE_FAILED_VOLUMES_TOLERATED_DEFAULT = 0; public static final int DFS_DATANODE_FAILED_VOLUMES_TOLERATED_DEFAULT = 0;
public static final String DFS_DATANODE_SYNCONCLOSE_KEY = "dfs.datanode.synconclose"; public static final String DFS_DATANODE_SYNCONCLOSE_KEY = "dfs.datanode.synconclose";

View File

@ -0,0 +1,119 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class ContentSummaryComputationContext {
private FSDirectory dir = null;
private FSNamesystem fsn = null;
private Content.Counts counts = null;
private long nextCountLimit = 0;
private long limitPerRun = 0;
private long yieldCount = 0;
/**
* Constructor
*
* @param dir The FSDirectory instance
* @param fsn The FSNamesystem instance
* @param limitPerRun allowed number of operations in one
* locking period. 0 or a negative number means
* no limit (i.e. no yielding)
*/
public ContentSummaryComputationContext(FSDirectory dir,
FSNamesystem fsn, long limitPerRun) {
this.dir = dir;
this.fsn = fsn;
this.limitPerRun = limitPerRun;
this.nextCountLimit = limitPerRun;
this.counts = Content.Counts.newInstance();
}
/** Constructor for blocking computation. */
public ContentSummaryComputationContext() {
this(null, null, 0);
}
/** Return current yield count */
public long getYieldCount() {
return yieldCount;
}
/**
* Relinquish locks held during computation for a short while
* and reacquire them. This will give other threads a chance
* to acquire the contended locks and run.
*
* @return true if locks were released and reacquired.
*/
public boolean yield() {
// Are we set up to do this?
if (limitPerRun <= 0 || dir == null || fsn == null) {
return false;
}
// Have we reached the limit?
long currentCount = counts.get(Content.FILE) +
counts.get(Content.SYMLINK) +
counts.get(Content.DIRECTORY) +
counts.get(Content.SNAPSHOTTABLE_DIRECTORY);
if (currentCount <= nextCountLimit) {
return false;
}
// Update the next limit
nextCountLimit = currentCount + limitPerRun;
boolean hadDirReadLock = dir.hasReadLock();
boolean hadDirWriteLock = dir.hasWriteLock();
boolean hadFsnReadLock = fsn.hasReadLock();
boolean hadFsnWriteLock = fsn.hasWriteLock();
// sanity check.
if (!hadDirReadLock || !hadFsnReadLock || hadDirWriteLock ||
hadFsnWriteLock || dir.getReadHoldCount() != 1 ||
fsn.getReadHoldCount() != 1) {
// cannot relinquish
return false;
}
// unlock
dir.readUnlock();
fsn.readUnlock();
try {
Thread.sleep(1);
} catch (InterruptedException ie) {
} finally {
// reacquire
fsn.readLock();
dir.readLock();
}
yieldCount++;
return true;
}
/** Get the content counts */
public Content.Counts getCounts() {
return counts;
}
}

View File

@ -112,7 +112,9 @@ public class FSDirectory implements Closeable {
private final int maxComponentLength; private final int maxComponentLength;
private final int maxDirItems; private final int maxDirItems;
private final int lsLimit; // max list limit private final int lsLimit; // max list limit
private final int contentCountLimit; // max content summary counts per run
private final INodeMap inodeMap; // Synchronized by dirLock private final INodeMap inodeMap; // Synchronized by dirLock
private long yieldCount = 0; // keep track of lock yield count.
// lock to protect the directory and BlockMap // lock to protect the directory and BlockMap
private ReentrantReadWriteLock dirLock; private ReentrantReadWriteLock dirLock;
@ -143,6 +145,14 @@ public class FSDirectory implements Closeable {
return this.dirLock.getReadHoldCount() > 0; return this.dirLock.getReadHoldCount() > 0;
} }
public int getReadHoldCount() {
return this.dirLock.getReadHoldCount();
}
public int getWriteHoldCount() {
return this.dirLock.getWriteHoldCount();
}
/** /**
* Caches frequently used file names used in {@link INode} to reuse * Caches frequently used file names used in {@link INode} to reuse
* byte[] objects and reduce heap usage. * byte[] objects and reduce heap usage.
@ -159,6 +169,10 @@ public class FSDirectory implements Closeable {
DFSConfigKeys.DFS_LIST_LIMIT, DFSConfigKeys.DFS_LIST_LIMIT_DEFAULT); DFSConfigKeys.DFS_LIST_LIMIT, DFSConfigKeys.DFS_LIST_LIMIT_DEFAULT);
this.lsLimit = configuredLimit>0 ? this.lsLimit = configuredLimit>0 ?
configuredLimit : DFSConfigKeys.DFS_LIST_LIMIT_DEFAULT; configuredLimit : DFSConfigKeys.DFS_LIST_LIMIT_DEFAULT;
this.contentCountLimit = conf.getInt(
DFSConfigKeys.DFS_CONTENT_SUMMARY_LIMIT_KEY,
DFSConfigKeys.DFS_CONTENT_SUMMARY_LIMIT_DEFAULT);
// filesystem limits // filesystem limits
this.maxComponentLength = conf.getInt( this.maxComponentLength = conf.getInt(
@ -2294,13 +2308,26 @@ public class FSDirectory implements Closeable {
throw new FileNotFoundException("File does not exist: " + srcs); throw new FileNotFoundException("File does not exist: " + srcs);
} }
else { else {
return targetNode.computeContentSummary(); // Make it relinquish locks everytime contentCountLimit entries are
// processed. 0 means disabled. I.e. blocking for the entire duration.
ContentSummaryComputationContext cscc =
new ContentSummaryComputationContext(this, getFSNamesystem(),
contentCountLimit);
ContentSummary cs = targetNode.computeAndConvertContentSummary(cscc);
yieldCount += cscc.getYieldCount();
return cs;
} }
} finally { } finally {
readUnlock(); readUnlock();
} }
} }
@VisibleForTesting
public long getYieldCount() {
return yieldCount;
}
public INodeMap getINodeMap() { public INodeMap getINodeMap() {
return inodeMap; return inodeMap;
} }

View File

@ -1272,6 +1272,14 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
return hasReadLock() || hasWriteLock(); return hasReadLock() || hasWriteLock();
} }
public int getReadHoldCount() {
return this.fsLock.getReadHoldCount();
}
public int getWriteHoldCount() {
return this.fsLock.getWriteHoldCount();
}
NamespaceInfo getNamespaceInfo() { NamespaceInfo getNamespaceInfo() {
readLock(); readLock();
try { try {

View File

@ -371,10 +371,18 @@ public abstract class INode implements INodeAttributes, Diff.Element<byte[]> {
public abstract void destroyAndCollectBlocks( public abstract void destroyAndCollectBlocks(
BlocksMapUpdateInfo collectedBlocks, List<INode> removedINodes); BlocksMapUpdateInfo collectedBlocks, List<INode> removedINodes);
/** Compute {@link ContentSummary}. */ /** Compute {@link ContentSummary}. Blocking call */
public final ContentSummary computeContentSummary() { public final ContentSummary computeContentSummary() {
final Content.Counts counts = computeContentSummary( return computeAndConvertContentSummary(
Content.Counts.newInstance()); new ContentSummaryComputationContext());
}
/**
* Compute {@link ContentSummary}.
*/
public final ContentSummary computeAndConvertContentSummary(
ContentSummaryComputationContext summary) {
Content.Counts counts = computeContentSummary(summary).getCounts();
return new ContentSummary(counts.get(Content.LENGTH), return new ContentSummary(counts.get(Content.LENGTH),
counts.get(Content.FILE) + counts.get(Content.SYMLINK), counts.get(Content.FILE) + counts.get(Content.SYMLINK),
counts.get(Content.DIRECTORY), getNsQuota(), counts.get(Content.DIRECTORY), getNsQuota(),
@ -384,10 +392,12 @@ public abstract class INode implements INodeAttributes, Diff.Element<byte[]> {
/** /**
* Count subtree content summary with a {@link Content.Counts}. * Count subtree content summary with a {@link Content.Counts}.
* *
* @param counts The subtree counts for returning. * @param summary the context object holding counts for the subtree.
* @return The same objects as the counts parameter. * @return The same objects as summary.
*/ */
public abstract Content.Counts computeContentSummary(Content.Counts counts); public abstract ContentSummaryComputationContext computeContentSummary(
ContentSummaryComputationContext summary);
/** /**
* Check and add namespace/diskspace consumed to itself and the ancestors. * Check and add namespace/diskspace consumed to itself and the ancestors.

View File

@ -466,12 +466,45 @@ public class INodeDirectory extends INodeWithAdditionalFields
} }
@Override @Override
public Content.Counts computeContentSummary(final Content.Counts counts) { public ContentSummaryComputationContext computeContentSummary(
for (INode child : getChildrenList(null)) { ContentSummaryComputationContext summary) {
child.computeContentSummary(counts); ReadOnlyList<INode> childrenList = getChildrenList(null);
// Explicit traversing is done to enable repositioning after relinquishing
// and reacquiring locks.
for (int i = 0; i < childrenList.size(); i++) {
INode child = childrenList.get(i);
byte[] childName = child.getLocalNameBytes();
long lastYieldCount = summary.getYieldCount();
child.computeContentSummary(summary);
// Check whether the computation was paused in the subtree.
// The counts may be off, but traversing the rest of children
// should be made safe.
if (lastYieldCount == summary.getYieldCount()) {
continue;
}
// The locks were released and reacquired. Check parent first.
if (getParent() == null) {
// Stop further counting and return whatever we have so far.
break;
}
// Obtain the children list again since it may have been modified.
childrenList = getChildrenList(null);
// Reposition in case the children list is changed. Decrement by 1
// since it will be incremented when loops.
i = nextChild(childrenList, childName) - 1;
} }
counts.add(Content.DIRECTORY, 1);
return counts; // Increment the directory count for this directory.
summary.getCounts().add(Content.DIRECTORY, 1);
// Relinquish and reacquire locks if necessary.
summary.yield();
return summary;
} }
/** /**

View File

@ -107,12 +107,16 @@ public class INodeDirectoryWithQuota extends INodeDirectory {
} }
@Override @Override
public Content.Counts computeContentSummary( public ContentSummaryComputationContext computeContentSummary(
final Content.Counts counts) { final ContentSummaryComputationContext summary) {
final long original = counts.get(Content.DISKSPACE); final long original = summary.getCounts().get(Content.DISKSPACE);
super.computeContentSummary(counts); long oldYieldCount = summary.getYieldCount();
checkDiskspace(counts.get(Content.DISKSPACE) - original); super.computeContentSummary(summary);
return counts; // Check only when the content has not changed in the middle.
if (oldYieldCount == summary.getYieldCount()) {
checkDiskspace(summary.getCounts().get(Content.DISKSPACE) - original);
}
return summary;
} }
private void checkDiskspace(final long computed) { private void checkDiskspace(final long computed) {

View File

@ -344,11 +344,11 @@ public class INodeFile extends INodeWithAdditionalFields
} }
@Override @Override
public final Content.Counts computeContentSummary( public final ContentSummaryComputationContext computeContentSummary(
final Content.Counts counts) { final ContentSummaryComputationContext summary) {
computeContentSummary4Snapshot(counts); computeContentSummary4Snapshot(summary.getCounts());
computeContentSummary4Current(counts); computeContentSummary4Current(summary.getCounts());
return counts; return summary;
} }
private void computeContentSummary4Snapshot(final Content.Counts counts) { private void computeContentSummary4Snapshot(final Content.Counts counts) {

View File

@ -107,7 +107,8 @@ public class INodeMap {
} }
@Override @Override
public Content.Counts computeContentSummary(Content.Counts counts) { public ContentSummaryComputationContext computeContentSummary(
ContentSummaryComputationContext summary) {
return null; return null;
} }

View File

@ -278,8 +278,9 @@ public abstract class INodeReference extends INode {
} }
@Override @Override
public Content.Counts computeContentSummary(Content.Counts counts) { public ContentSummaryComputationContext computeContentSummary(
return referred.computeContentSummary(counts); ContentSummaryComputationContext summary) {
return referred.computeContentSummary(summary);
} }
@Override @Override
@ -444,12 +445,13 @@ public abstract class INodeReference extends INode {
} }
@Override @Override
public final Content.Counts computeContentSummary(Content.Counts counts) { public final ContentSummaryComputationContext computeContentSummary(
ContentSummaryComputationContext summary) {
//only count diskspace for WithName //only count diskspace for WithName
final Quota.Counts q = Quota.Counts.newInstance(); final Quota.Counts q = Quota.Counts.newInstance();
computeQuotaUsage(q, false, lastSnapshotId); computeQuotaUsage(q, false, lastSnapshotId);
counts.add(Content.DISKSPACE, q.get(Quota.DISKSPACE)); summary.getCounts().add(Content.DISKSPACE, q.get(Quota.DISKSPACE));
return counts; return summary;
} }
@Override @Override
@ -688,4 +690,4 @@ public abstract class INodeReference extends INode {
} }
} }
} }
} }

View File

@ -98,9 +98,10 @@ public class INodeSymlink extends INodeWithAdditionalFields {
} }
@Override @Override
public Content.Counts computeContentSummary(final Content.Counts counts) { public ContentSummaryComputationContext computeContentSummary(
counts.add(Content.SYMLINK, 1); final ContentSummaryComputationContext summary) {
return counts; summary.getCounts().add(Content.SYMLINK, 1);
return summary;
} }
@Override @Override

View File

@ -38,6 +38,7 @@ import org.apache.hadoop.hdfs.protocol.SnapshotException;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffReportEntry; import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffReportEntry;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffType; import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffType;
import org.apache.hadoop.hdfs.server.namenode.Content; import org.apache.hadoop.hdfs.server.namenode.Content;
import org.apache.hadoop.hdfs.server.namenode.ContentSummaryComputationContext;
import org.apache.hadoop.hdfs.server.namenode.INode; import org.apache.hadoop.hdfs.server.namenode.INode;
import org.apache.hadoop.hdfs.server.namenode.INodeDirectory; import org.apache.hadoop.hdfs.server.namenode.INodeDirectory;
import org.apache.hadoop.hdfs.server.namenode.INodeFile; import org.apache.hadoop.hdfs.server.namenode.INodeFile;
@ -342,11 +343,12 @@ public class INodeDirectorySnapshottable extends INodeDirectoryWithSnapshot {
} }
@Override @Override
public Content.Counts computeContentSummary(final Content.Counts counts) { public ContentSummaryComputationContext computeContentSummary(
super.computeContentSummary(counts); final ContentSummaryComputationContext summary) {
counts.add(Content.SNAPSHOT, snapshotsByNames.size()); super.computeContentSummary(summary);
counts.add(Content.SNAPSHOTTABLE_DIRECTORY, 1); summary.getCounts().add(Content.SNAPSHOT, snapshotsByNames.size());
return counts; summary.getCounts().add(Content.SNAPSHOTTABLE_DIRECTORY, 1);
return summary;
} }
/** /**

View File

@ -32,6 +32,7 @@ import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffReportEntry; import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffReportEntry;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffType; import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffType;
import org.apache.hadoop.hdfs.server.namenode.Content; import org.apache.hadoop.hdfs.server.namenode.Content;
import org.apache.hadoop.hdfs.server.namenode.ContentSummaryComputationContext;
import org.apache.hadoop.hdfs.server.namenode.FSImageSerialization; import org.apache.hadoop.hdfs.server.namenode.FSImageSerialization;
import org.apache.hadoop.hdfs.server.namenode.INode; import org.apache.hadoop.hdfs.server.namenode.INode;
import org.apache.hadoop.hdfs.server.namenode.INodeDirectory; import org.apache.hadoop.hdfs.server.namenode.INodeDirectory;
@ -883,18 +884,27 @@ public class INodeDirectoryWithSnapshot extends INodeDirectoryWithQuota {
} }
@Override @Override
public Content.Counts computeContentSummary(final Content.Counts counts) { public ContentSummaryComputationContext computeContentSummary(
super.computeContentSummary(counts); final ContentSummaryComputationContext summary) {
computeContentSummary4Snapshot(counts); // Snapshot summary calc won't be relinquishing locks in the middle.
return counts; // Do this first and handover to parent.
computeContentSummary4Snapshot(summary.getCounts());
super.computeContentSummary(summary);
return summary;
} }
private void computeContentSummary4Snapshot(final Content.Counts counts) { private void computeContentSummary4Snapshot(final Content.Counts counts) {
// Create a new blank summary context for blocking processing of subtree.
ContentSummaryComputationContext summary =
new ContentSummaryComputationContext();
for(DirectoryDiff d : diffs) { for(DirectoryDiff d : diffs) {
for(INode deleted : d.getChildrenDiff().getList(ListType.DELETED)) { for(INode deleted : d.getChildrenDiff().getList(ListType.DELETED)) {
deleted.computeContentSummary(counts); deleted.computeContentSummary(summary);
} }
} }
// Add the counts from deleted trees.
counts.add(summary.getCounts());
// Add the deleted directory count.
counts.add(Content.DIRECTORY, diffs.asList().size()); counts.add(Content.DIRECTORY, diffs.asList().size());
} }

View File

@ -84,6 +84,9 @@ public class TestQuota {
// Space quotas // Space quotas
final int DEFAULT_BLOCK_SIZE = 512; final int DEFAULT_BLOCK_SIZE = 512;
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE); conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE);
// Make it relinquish locks. When run serially, the result should
// be identical.
conf.setInt(DFSConfigKeys.DFS_CONTENT_SUMMARY_LIMIT_KEY, 2);
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build(); final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
final FileSystem fs = cluster.getFileSystem(); final FileSystem fs = cluster.getFileSystem();
assertTrue("Not a HDFS: "+fs.getUri(), assertTrue("Not a HDFS: "+fs.getUri(),
@ -348,6 +351,7 @@ public class TestQuota {
} }
assertTrue(hasException); assertTrue(hasException);
assertEquals(4, cluster.getNamesystem().getFSDirectory().getYieldCount());
} finally { } finally {
cluster.shutdown(); cluster.shutdown();
} }
@ -358,6 +362,9 @@ public class TestQuota {
@Test @Test
public void testNamespaceCommands() throws Exception { public void testNamespaceCommands() throws Exception {
final Configuration conf = new HdfsConfiguration(); final Configuration conf = new HdfsConfiguration();
// Make it relinquish locks. When run serially, the result should
// be identical.
conf.setInt(DFSConfigKeys.DFS_CONTENT_SUMMARY_LIMIT_KEY, 2);
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build(); final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
final FileSystem fs = cluster.getFileSystem(); final FileSystem fs = cluster.getFileSystem();
assertTrue("Not a HDFS: "+fs.getUri(), assertTrue("Not a HDFS: "+fs.getUri(),
@ -513,6 +520,7 @@ public class TestQuota {
c = dfs.getContentSummary(quotaDir1); c = dfs.getContentSummary(quotaDir1);
assertEquals(c.getDirectoryCount(), 6); assertEquals(c.getDirectoryCount(), 6);
assertEquals(c.getQuota(), 6); assertEquals(c.getQuota(), 6);
assertEquals(14, cluster.getNamesystem().getFSDirectory().getYieldCount());
} finally { } finally {
cluster.shutdown(); cluster.shutdown();
} }
@ -530,6 +538,9 @@ public class TestQuota {
// set a smaller block size so that we can test with smaller // set a smaller block size so that we can test with smaller
// diskspace quotas // diskspace quotas
conf.set(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, "512"); conf.set(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, "512");
// Make it relinquish locks. When run serially, the result should
// be identical.
conf.setInt(DFSConfigKeys.DFS_CONTENT_SUMMARY_LIMIT_KEY, 2);
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build(); final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
final FileSystem fs = cluster.getFileSystem(); final FileSystem fs = cluster.getFileSystem();
assertTrue("Not a HDFS: "+fs.getUri(), assertTrue("Not a HDFS: "+fs.getUri(),
@ -762,6 +773,7 @@ public class TestQuota {
assertEquals(c.getSpaceConsumed(), assertEquals(c.getSpaceConsumed(),
(sizeFactorA + sizeFactorB + sizeFactorC) * fileSpace); (sizeFactorA + sizeFactorB + sizeFactorC) * fileSpace);
assertEquals(20, cluster.getNamesystem().getFSDirectory().getYieldCount());
} finally { } finally {
cluster.shutdown(); cluster.shutdown();
} }
@ -846,6 +858,9 @@ public class TestQuota {
final int BLOCK_SIZE = 6 * 1024; final int BLOCK_SIZE = 6 * 1024;
conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
conf.setBoolean(DFSConfigKeys.DFS_WEBHDFS_ENABLED_KEY, true); conf.setBoolean(DFSConfigKeys.DFS_WEBHDFS_ENABLED_KEY, true);
// Make it relinquish locks. When run serially, the result should
// be identical.
conf.setInt(DFSConfigKeys.DFS_CONTENT_SUMMARY_LIMIT_KEY, 2);
MiniDFSCluster cluster = MiniDFSCluster cluster =
new MiniDFSCluster.Builder(conf).numDataNodes(3).build(); new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
cluster.waitActive(); cluster.waitActive();
@ -904,6 +919,7 @@ public class TestQuota {
exceededQuota = true; exceededQuota = true;
} }
assertTrue("Quota not exceeded", exceededQuota); assertTrue("Quota not exceeded", exceededQuota);
assertEquals(2, cluster.getNamesystem().getFSDirectory().getYieldCount());
} finally { } finally {
cluster.shutdown(); cluster.shutdown();
} }