HDFS-14908. LeaseManager should check parent-child relationship when filter open files. Contributed by Jinglun.
This commit is contained in:
parent
578bd101a6
commit
24080666e5
|
@ -1826,16 +1826,17 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
||||||
checkSuperuserPrivilege();
|
checkSuperuserPrivilege();
|
||||||
checkOperation(OperationCategory.READ);
|
checkOperation(OperationCategory.READ);
|
||||||
BatchedListEntries<OpenFileEntry> batchedListEntries;
|
BatchedListEntries<OpenFileEntry> batchedListEntries;
|
||||||
|
String normalizedPath = new Path(path).toString(); // normalize path.
|
||||||
try {
|
try {
|
||||||
readLock();
|
readLock();
|
||||||
try {
|
try {
|
||||||
checkOperation(OperationCategory.READ);
|
checkOperation(OperationCategory.READ);
|
||||||
if (openFilesTypes.contains(OpenFilesType.ALL_OPEN_FILES)) {
|
if (openFilesTypes.contains(OpenFilesType.ALL_OPEN_FILES)) {
|
||||||
batchedListEntries = leaseManager.getUnderConstructionFiles(prevId,
|
batchedListEntries = leaseManager.getUnderConstructionFiles(prevId,
|
||||||
path);
|
normalizedPath);
|
||||||
} else {
|
} else {
|
||||||
if (openFilesTypes.contains(OpenFilesType.BLOCKING_DECOMMISSION)) {
|
if (openFilesTypes.contains(OpenFilesType.BLOCKING_DECOMMISSION)) {
|
||||||
batchedListEntries = getFilesBlockingDecom(prevId, path);
|
batchedListEntries = getFilesBlockingDecom(prevId, normalizedPath);
|
||||||
} else {
|
} else {
|
||||||
throw new IllegalArgumentException("Unknown OpenFileType: "
|
throw new IllegalArgumentException("Unknown OpenFileType: "
|
||||||
+ openFilesTypes);
|
+ openFilesTypes);
|
||||||
|
@ -1874,7 +1875,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
||||||
|
|
||||||
String fullPathName = inodeFile.getFullPathName();
|
String fullPathName = inodeFile.getFullPathName();
|
||||||
if (org.apache.commons.lang3.StringUtils.isEmpty(path)
|
if (org.apache.commons.lang3.StringUtils.isEmpty(path)
|
||||||
|| fullPathName.startsWith(path)) {
|
|| DFSUtil.isParentEntry(fullPathName, path)) {
|
||||||
openFileEntries.add(new OpenFileEntry(inodeFile.getId(),
|
openFileEntries.add(new OpenFileEntry(inodeFile.getId(),
|
||||||
inodeFile.getFullPathName(),
|
inodeFile.getFullPathName(),
|
||||||
inodeFile.getFileUnderConstructionFeature().getClientName(),
|
inodeFile.getFileUnderConstructionFeature().getClientName(),
|
||||||
|
|
|
@ -42,6 +42,7 @@ import com.google.common.collect.Lists;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
|
import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
|
||||||
|
import org.apache.hadoop.hdfs.DFSUtil;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
import org.apache.hadoop.hdfs.protocol.OpenFileEntry;
|
import org.apache.hadoop.hdfs.protocol.OpenFileEntry;
|
||||||
import org.apache.hadoop.hdfs.protocol.OpenFilesIterator;
|
import org.apache.hadoop.hdfs.protocol.OpenFilesIterator;
|
||||||
|
@ -315,7 +316,8 @@ public class LeaseManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
fullPathName = inodeFile.getFullPathName();
|
fullPathName = inodeFile.getFullPathName();
|
||||||
if (StringUtils.isEmpty(path) || fullPathName.startsWith(path)) {
|
if (StringUtils.isEmpty(path) ||
|
||||||
|
DFSUtil.isParentEntry(fullPathName, path)) {
|
||||||
openFileEntries.add(new OpenFileEntry(inodeFile.getId(), fullPathName,
|
openFileEntries.add(new OpenFileEntry(inodeFile.getId(), fullPathName,
|
||||||
inodeFile.getFileUnderConstructionFeature().getClientName(),
|
inodeFile.getFileUnderConstructionFeature().getClientName(),
|
||||||
inodeFile.getFileUnderConstructionFeature().getClientMachine()));
|
inodeFile.getFileUnderConstructionFeature().getClientMachine()));
|
||||||
|
|
|
@ -2392,13 +2392,32 @@ public class DFSTestUtil {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create open files under root path.
|
||||||
|
* @param fs the filesystem.
|
||||||
|
* @param filePrefix the prefix of the files.
|
||||||
|
* @param numFilesToCreate the number of files to create.
|
||||||
|
*/
|
||||||
public static Map<Path, FSDataOutputStream> createOpenFiles(FileSystem fs,
|
public static Map<Path, FSDataOutputStream> createOpenFiles(FileSystem fs,
|
||||||
String filePrefix, int numFilesToCreate) throws IOException {
|
String filePrefix, int numFilesToCreate) throws IOException {
|
||||||
|
return createOpenFiles(fs, new Path("/"), filePrefix, numFilesToCreate);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create open files.
|
||||||
|
* @param fs the filesystem.
|
||||||
|
* @param baseDir the base path of the files.
|
||||||
|
* @param filePrefix the prefix of the files.
|
||||||
|
* @param numFilesToCreate the number of files to create.
|
||||||
|
*/
|
||||||
|
public static Map<Path, FSDataOutputStream> createOpenFiles(FileSystem fs,
|
||||||
|
Path baseDir, String filePrefix, int numFilesToCreate)
|
||||||
|
throws IOException {
|
||||||
final Map<Path, FSDataOutputStream> filesCreated = new HashMap<>();
|
final Map<Path, FSDataOutputStream> filesCreated = new HashMap<>();
|
||||||
final byte[] buffer = new byte[(int) (1024 * 1.75)];
|
final byte[] buffer = new byte[(int) (1024 * 1.75)];
|
||||||
final Random rand = new Random(0xFEED0BACL);
|
final Random rand = new Random(0xFEED0BACL);
|
||||||
for (int i = 0; i < numFilesToCreate; i++) {
|
for (int i = 0; i < numFilesToCreate; i++) {
|
||||||
Path file = new Path("/" + filePrefix + "-" + i);
|
Path file = new Path(baseDir, filePrefix + "-" + i);
|
||||||
FSDataOutputStream stm = fs.create(file, true, 1024, (short) 1, 1024);
|
FSDataOutputStream stm = fs.create(file, true, 1024, (short) 1, 1024);
|
||||||
rand.nextBytes(buffer);
|
rand.nextBytes(buffer);
|
||||||
stm.write(buffer);
|
stm.write(buffer);
|
||||||
|
|
|
@ -157,13 +157,22 @@ public class TestListOpenFiles {
|
||||||
remainingFiles.size() == 0);
|
remainingFiles.size() == 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Verify all open files.
|
||||||
|
*/
|
||||||
private void verifyOpenFiles(Map<Path, FSDataOutputStream> openFiles)
|
private void verifyOpenFiles(Map<Path, FSDataOutputStream> openFiles)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
verifyOpenFiles(openFiles, EnumSet.of(OpenFilesType.ALL_OPEN_FILES),
|
verifyOpenFiles(openFiles, OpenFilesIterator.FILTER_PATH_DEFAULT);
|
||||||
OpenFilesIterator.FILTER_PATH_DEFAULT);
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Verify open files with specified filter path.
|
||||||
|
*/
|
||||||
|
private void verifyOpenFiles(Map<Path, FSDataOutputStream> openFiles,
|
||||||
|
String path) throws IOException {
|
||||||
|
verifyOpenFiles(openFiles, EnumSet.of(OpenFilesType.ALL_OPEN_FILES), path);
|
||||||
verifyOpenFiles(new HashMap<>(),
|
verifyOpenFiles(new HashMap<>(),
|
||||||
EnumSet.of(OpenFilesType.BLOCKING_DECOMMISSION),
|
EnumSet.of(OpenFilesType.BLOCKING_DECOMMISSION), path);
|
||||||
OpenFilesIterator.FILTER_PATH_DEFAULT);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private Set<Path> createFiles(FileSystem fileSystem, String fileNamePrefix,
|
private Set<Path> createFiles(FileSystem fileSystem, String fileNamePrefix,
|
||||||
|
@ -255,4 +264,35 @@ public class TestListOpenFiles {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 120000)
|
||||||
|
public void testListOpenFilesWithFilterPath() throws IOException {
|
||||||
|
HashMap<Path, FSDataOutputStream> openFiles = new HashMap<>();
|
||||||
|
createFiles(fs, "closed", 10);
|
||||||
|
verifyOpenFiles(openFiles, OpenFilesIterator.FILTER_PATH_DEFAULT);
|
||||||
|
|
||||||
|
BatchedEntries<OpenFileEntry> openFileEntryBatchedEntries = nnRpc
|
||||||
|
.listOpenFiles(0, EnumSet.of(OpenFilesType.ALL_OPEN_FILES),
|
||||||
|
OpenFilesIterator.FILTER_PATH_DEFAULT);
|
||||||
|
assertTrue("Open files list should be empty!",
|
||||||
|
openFileEntryBatchedEntries.size() == 0);
|
||||||
|
BatchedEntries<OpenFileEntry> openFilesBlockingDecomEntries = nnRpc
|
||||||
|
.listOpenFiles(0, EnumSet.of(OpenFilesType.BLOCKING_DECOMMISSION),
|
||||||
|
OpenFilesIterator.FILTER_PATH_DEFAULT);
|
||||||
|
assertTrue("Open files list blocking decommission should be empty!",
|
||||||
|
openFilesBlockingDecomEntries.size() == 0);
|
||||||
|
|
||||||
|
openFiles.putAll(
|
||||||
|
DFSTestUtil.createOpenFiles(fs, new Path("/base"), "open-1", 1));
|
||||||
|
Map<Path, FSDataOutputStream> baseOpen =
|
||||||
|
DFSTestUtil.createOpenFiles(fs, new Path("/base-open"), "open-1", 1);
|
||||||
|
verifyOpenFiles(openFiles, "/base");
|
||||||
|
verifyOpenFiles(openFiles, "/base/");
|
||||||
|
|
||||||
|
openFiles.putAll(baseOpen);
|
||||||
|
while (openFiles.size() > 0) {
|
||||||
|
DFSTestUtil.closeOpenFiles(openFiles, 1);
|
||||||
|
verifyOpenFiles(openFiles, OpenFilesIterator.FILTER_PATH_DEFAULT);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue