svn merge -c 1560750 merging from trunk to branch-2 to fix:HDFS-5788. listLocatedStatus response can be very large. Contributed by Nathan Roberts.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1560778 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Kihwal Lee 2014-01-23 18:13:17 +00:00
parent 7bbd687d4d
commit 95e3ef2ad4
3 changed files with 97 additions and 4 deletions

View File

@ -212,6 +212,9 @@ Release 2.4.0 - UNRELEASED
HDFS-5434. Change block placement policy constructors from package private HDFS-5434. Change block placement policy constructors from package private
to protected. (Buddy Taylor via Arpit Agarwal) to protected. (Buddy Taylor via Arpit Agarwal)
HDFS-5788. listLocatedStatus response can be very large. (Nathan Roberts
via kihwal)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-5239. Allow FSNamesystem lock fairness to be configurable (daryn) HDFS-5239. Allow FSNamesystem lock fairness to be configurable (daryn)

View File

@ -171,7 +171,6 @@ public class FSDirectory implements Closeable {
DFSConfigKeys.DFS_LIST_LIMIT, DFSConfigKeys.DFS_LIST_LIMIT_DEFAULT); DFSConfigKeys.DFS_LIST_LIMIT, DFSConfigKeys.DFS_LIST_LIMIT_DEFAULT);
this.lsLimit = configuredLimit>0 ? this.lsLimit = configuredLimit>0 ?
configuredLimit : DFSConfigKeys.DFS_LIST_LIMIT_DEFAULT; configuredLimit : DFSConfigKeys.DFS_LIST_LIMIT_DEFAULT;
this.contentCountLimit = conf.getInt( this.contentCountLimit = conf.getInt(
DFSConfigKeys.DFS_CONTENT_SUMMARY_LIMIT_KEY, DFSConfigKeys.DFS_CONTENT_SUMMARY_LIMIT_KEY,
DFSConfigKeys.DFS_CONTENT_SUMMARY_LIMIT_DEFAULT); DFSConfigKeys.DFS_CONTENT_SUMMARY_LIMIT_DEFAULT);
@ -1532,6 +1531,11 @@ public class FSDirectory implements Closeable {
/** /**
* Get a partial listing of the indicated directory * Get a partial listing of the indicated directory
* *
* We will stop when any of the following conditions is met:
* 1) this.lsLimit files have been added
* 2) needLocation is true AND enough files have been added such
* that at least this.lsLimit block locations are in the response
*
* @param src the directory name * @param src the directory name
* @param startAfter the name to start listing after * @param startAfter the name to start listing after
* @param needLocation if block locations are returned * @param needLocation if block locations are returned
@ -1563,14 +1567,30 @@ public class FSDirectory implements Closeable {
int startChild = INodeDirectory.nextChild(contents, startAfter); int startChild = INodeDirectory.nextChild(contents, startAfter);
int totalNumChildren = contents.size(); int totalNumChildren = contents.size();
int numOfListing = Math.min(totalNumChildren-startChild, this.lsLimit); int numOfListing = Math.min(totalNumChildren-startChild, this.lsLimit);
int locationBudget = this.lsLimit;
int listingCnt = 0;
HdfsFileStatus listing[] = new HdfsFileStatus[numOfListing]; HdfsFileStatus listing[] = new HdfsFileStatus[numOfListing];
for (int i=0; i<numOfListing; i++) { for (int i=0; i<numOfListing && locationBudget>0; i++) {
INode cur = contents.get(startChild+i); INode cur = contents.get(startChild+i);
listing[i] = createFileStatus(cur.getLocalNameBytes(), cur, listing[i] = createFileStatus(cur.getLocalNameBytes(), cur,
needLocation, snapshot); needLocation, snapshot);
listingCnt++;
if (needLocation) {
// Once we hit lsLimit locations, stop.
// This helps to prevent excessively large response payloads.
// Approximate #locations with locatedBlockCount() * repl_factor
LocatedBlocks blks =
((HdfsLocatedFileStatus)listing[i]).getBlockLocations();
locationBudget -= (blks == null) ? 0 :
blks.locatedBlockCount() * listing[i].getReplication();
}
}
// truncate return array if necessary
if (listingCnt < numOfListing) {
listing = Arrays.copyOf(listing, listingCnt);
} }
return new DirectoryListing( return new DirectoryListing(
listing, totalNumChildren-startChild-numOfListing); listing, totalNumChildren-startChild-listingCnt);
} finally { } finally {
readUnlock(); readUnlock();
} }

View File

@ -25,6 +25,7 @@ import static org.junit.Assert.fail;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
@ -931,6 +932,75 @@ public class TestINodeFile {
} }
} }
} }
@Test
public void testLocationLimitInListingOps() throws Exception {
final Configuration conf = new Configuration();
conf.setInt(DFSConfigKeys.DFS_LIST_LIMIT, 9); // 3 blocks * 3 replicas
MiniDFSCluster cluster = null;
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
cluster.waitActive();
final DistributedFileSystem hdfs = cluster.getFileSystem();
ArrayList<String> source = new ArrayList<String>();
// tmp1 holds files with 3 blocks, 3 replicas
// tmp2 holds files with 3 blocks, 1 replica
hdfs.mkdirs(new Path("/tmp1"));
hdfs.mkdirs(new Path("/tmp2"));
source.add("f1");
source.add("f2");
int numEntries = source.size();
for (int j=0;j<numEntries;j++) {
DFSTestUtil.createFile(hdfs, new Path("/tmp1/"+source.get(j)), 4096,
3*1024-100, 1024, (short) 3, 0);
}
byte[] start = HdfsFileStatus.EMPTY_NAME;
for (int j=0;j<numEntries;j++) {
DirectoryListing dl = cluster.getNameNodeRpc().getListing("/tmp1",
start, true);
assertTrue(dl.getPartialListing().length == 1);
for (int i=0;i<dl.getPartialListing().length; i++) {
source.remove(dl.getPartialListing()[i].getLocalName());
}
start = dl.getLastName();
}
// Verify we have listed all entries in the directory.
assertTrue(source.size() == 0);
// Now create 6 files, each with 3 locations. Should take 2 iterations of 3
source.add("f1");
source.add("f2");
source.add("f3");
source.add("f4");
source.add("f5");
source.add("f6");
numEntries = source.size();
for (int j=0;j<numEntries;j++) {
DFSTestUtil.createFile(hdfs, new Path("/tmp2/"+source.get(j)), 4096,
3*1024-100, 1024, (short) 1, 0);
}
start = HdfsFileStatus.EMPTY_NAME;
for (int j=0;j<numEntries/3;j++) {
DirectoryListing dl = cluster.getNameNodeRpc().getListing("/tmp2",
start, true);
assertTrue(dl.getPartialListing().length == 3);
for (int i=0;i<dl.getPartialListing().length; i++) {
source.remove(dl.getPartialListing()[i].getLocalName());
}
start = dl.getLastName();
}
// Verify we have listed all entries in tmp2.
assertTrue(source.size() == 0);
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
@Test @Test
public void testFilesInGetListingOps() throws Exception { public void testFilesInGetListingOps() throws Exception {