HDFS-16986. EC: Fix locationBudget in getListing(). (#5582). Contributed by Shuyan Zhang.
Signed-off-by: Ayush Saxena <ayushsaxena@apache.org> Signed-off-by: He Xiaoqiao <hexiaoqiao@apache.org>
This commit is contained in:
parent
5b23224970
commit
6a23c376c9
|
@ -262,13 +262,24 @@ class FSDirStatAndListingOp {
|
||||||
needLocation, false);
|
needLocation, false);
|
||||||
listingCnt++;
|
listingCnt++;
|
||||||
if (listing[i] instanceof HdfsLocatedFileStatus) {
|
if (listing[i] instanceof HdfsLocatedFileStatus) {
|
||||||
// Once we hit lsLimit locations, stop.
|
// Once we hit lsLimit locations, stop.
|
||||||
// This helps to prevent excessively large response payloads.
|
// This helps to prevent excessively large response payloads.
|
||||||
// Approximate #locations with locatedBlockCount() * repl_factor
|
LocatedBlocks blks =
|
||||||
LocatedBlocks blks =
|
((HdfsLocatedFileStatus) listing[i]).getLocatedBlocks();
|
||||||
((HdfsLocatedFileStatus)listing[i]).getLocatedBlocks();
|
if (blks != null) {
|
||||||
locationBudget -= (blks == null) ? 0 :
|
ErasureCodingPolicy ecPolicy = listing[i].getErasureCodingPolicy();
|
||||||
blks.locatedBlockCount() * listing[i].getReplication();
|
if (ecPolicy != null && !ecPolicy.isReplicationPolicy()) {
|
||||||
|
// Approximate #locations with locatedBlockCount() *
|
||||||
|
// internalBlocksNum.
|
||||||
|
locationBudget -= blks.locatedBlockCount() *
|
||||||
|
(ecPolicy.getNumDataUnits() + ecPolicy.getNumParityUnits());
|
||||||
|
} else {
|
||||||
|
// Approximate #locations with locatedBlockCount() *
|
||||||
|
// replicationFactor.
|
||||||
|
locationBudget -=
|
||||||
|
blks.locatedBlockCount() * listing[i].getReplication();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// truncate return array if necessary
|
// truncate return array if necessary
|
||||||
|
|
|
@ -29,6 +29,9 @@ import static org.junit.Assert.assertNotNull;
|
||||||
import static org.junit.Assert.assertNull;
|
import static org.junit.Assert.assertNull;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
|
import static org.mockito.ArgumentMatchers.any;
|
||||||
|
import static org.mockito.ArgumentMatchers.anyBoolean;
|
||||||
|
import static org.mockito.ArgumentMatchers.anyString;
|
||||||
import static org.mockito.ArgumentMatchers.eq;
|
import static org.mockito.ArgumentMatchers.eq;
|
||||||
import static org.mockito.Mockito.inOrder;
|
import static org.mockito.Mockito.inOrder;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
|
@ -674,6 +677,44 @@ public class TestDistributedFileSystem {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This is to test that {@link DFSConfigKeys#DFS_LIST_LIMIT} works as
|
||||||
|
* expected when {@link DistributedFileSystem#listLocatedStatus} is called.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testGetListingLimit() throws Exception {
|
||||||
|
final Configuration conf = getTestConfiguration();
|
||||||
|
conf.setInt(DFSConfigKeys.DFS_LIST_LIMIT, 9);
|
||||||
|
try (MiniDFSCluster cluster =
|
||||||
|
new MiniDFSCluster.Builder(conf).numDataNodes(9).build()) {
|
||||||
|
cluster.waitActive();
|
||||||
|
ErasureCodingPolicy ecPolicy = StripedFileTestUtil.getDefaultECPolicy();
|
||||||
|
final DistributedFileSystem fs = cluster.getFileSystem();
|
||||||
|
fs.dfs = spy(fs.dfs);
|
||||||
|
Path dir1 = new Path("/testRep");
|
||||||
|
Path dir2 = new Path("/testEC");
|
||||||
|
fs.mkdirs(dir1);
|
||||||
|
fs.mkdirs(dir2);
|
||||||
|
fs.setErasureCodingPolicy(dir2, ecPolicy.getName());
|
||||||
|
for (int i = 0; i < 3; i++) {
|
||||||
|
DFSTestUtil.createFile(fs, new Path(dir1, String.valueOf(i)),
|
||||||
|
20 * 1024L, (short) 3, 1);
|
||||||
|
DFSTestUtil.createStripedFile(cluster, new Path(dir2,
|
||||||
|
String.valueOf(i)), dir2, 1, 1, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
List<LocatedFileStatus> str = RemoteIterators.toList(fs.listLocatedStatus(dir1));
|
||||||
|
assertThat(str).hasSize(3);
|
||||||
|
Mockito.verify(fs.dfs, Mockito.times(1)).listPaths(anyString(), any(),
|
||||||
|
anyBoolean());
|
||||||
|
|
||||||
|
str = RemoteIterators.toList(fs.listLocatedStatus(dir2));
|
||||||
|
assertThat(str).hasSize(3);
|
||||||
|
Mockito.verify(fs.dfs, Mockito.times(4)).listPaths(anyString(), any(),
|
||||||
|
anyBoolean());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testStatistics() throws IOException {
|
public void testStatistics() throws IOException {
|
||||||
FileSystem.getStatistics(HdfsConstants.HDFS_URI_SCHEME,
|
FileSystem.getStatistics(HdfsConstants.HDFS_URI_SCHEME,
|
||||||
|
|
Loading…
Reference in New Issue