HDFS-11848. Enhance dfsadmin listOpenFiles command to list files under a given path. Contributed by Yiqun Lin.

This commit is contained in:
Yiqun Lin 2018-01-06 14:31:08 +08:00
parent 836e3c45e8
commit bf5c948995
19 changed files with 249 additions and 54 deletions

View File

@ -3088,11 +3088,26 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
@Deprecated @Deprecated
public RemoteIterator<OpenFileEntry> listOpenFiles() throws IOException { public RemoteIterator<OpenFileEntry> listOpenFiles() throws IOException {
checkOpen(); checkOpen();
return listOpenFiles(EnumSet.of(OpenFilesType.ALL_OPEN_FILES)); return listOpenFiles(EnumSet.of(OpenFilesType.ALL_OPEN_FILES),
OpenFilesIterator.FILTER_PATH_DEFAULT);
} }
/** /**
* Get a remote iterator to the open files list by type, managed by NameNode. * Get a remote iterator to the open files list by path,
* managed by NameNode.
*
* @param path
* @throws IOException
*/
public RemoteIterator<OpenFileEntry> listOpenFiles(String path)
throws IOException {
checkOpen();
return listOpenFiles(EnumSet.of(OpenFilesType.ALL_OPEN_FILES), path);
}
/**
* Get a remote iterator to the open files list by type,
* managed by NameNode.
* *
* @param openFilesTypes * @param openFilesTypes
* @throws IOException * @throws IOException
@ -3100,6 +3115,21 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
public RemoteIterator<OpenFileEntry> listOpenFiles( public RemoteIterator<OpenFileEntry> listOpenFiles(
EnumSet<OpenFilesType> openFilesTypes) throws IOException { EnumSet<OpenFilesType> openFilesTypes) throws IOException {
checkOpen(); checkOpen();
return new OpenFilesIterator(namenode, tracer, openFilesTypes); return listOpenFiles(openFilesTypes,
OpenFilesIterator.FILTER_PATH_DEFAULT);
}
/**
* Get a remote iterator to the open files list by type and path,
* managed by NameNode.
*
* @param openFilesTypes
* @param path
* @throws IOException
*/
public RemoteIterator<OpenFileEntry> listOpenFiles(
EnumSet<OpenFilesType> openFilesTypes, String path) throws IOException {
checkOpen();
return new OpenFilesIterator(namenode, tracer, openFilesTypes, path);
} }
} }

View File

@ -3086,8 +3086,8 @@ public class DistributedFileSystem extends FileSystem
} }
public RemoteIterator<OpenFileEntry> listOpenFiles( public RemoteIterator<OpenFileEntry> listOpenFiles(
EnumSet<OpenFilesType> openFilesTypes) throws IOException { EnumSet<OpenFilesType> openFilesTypes, String path) throws IOException {
return dfs.listOpenFiles(openFilesTypes); return dfs.listOpenFiles(openFilesTypes, path);
} }

View File

@ -659,8 +659,8 @@ public class HdfsAdmin {
} }
public RemoteIterator<OpenFileEntry> listOpenFiles( public RemoteIterator<OpenFileEntry> listOpenFiles(
EnumSet<OpenFilesType> openFilesTypes) throws IOException { EnumSet<OpenFilesType> openFilesTypes, String path) throws IOException {
return dfs.listOpenFiles(openFilesTypes); return dfs.listOpenFiles(openFilesTypes, path);
} }
} }

View File

@ -1724,10 +1724,11 @@ public interface ClientProtocol {
* the the list across batches are not atomic. * the the list across batches are not atomic.
* *
* @param prevId the cursor INode id. * @param prevId the cursor INode id.
* @param openFilesTypes types to filter the open files * @param openFilesTypes types to filter the open files.
* @param path path to filter the open files.
* @throws IOException * @throws IOException
*/ */
@Idempotent @Idempotent
BatchedEntries<OpenFileEntry> listOpenFiles(long prevId, BatchedEntries<OpenFileEntry> listOpenFiles(long prevId,
EnumSet<OpenFilesType> openFilesTypes) throws IOException; EnumSet<OpenFilesType> openFilesTypes, String path) throws IOException;
} }

View File

@ -37,6 +37,9 @@ import org.apache.htrace.core.Tracer;
public class OpenFilesIterator extends public class OpenFilesIterator extends
BatchedRemoteIterator<Long, OpenFileEntry> { BatchedRemoteIterator<Long, OpenFileEntry> {
/** No path to be filtered by default. */
public static final String FILTER_PATH_DEFAULT = "/";
/** /**
* Open file types to filter the results. * Open file types to filter the results.
*/ */
@ -67,20 +70,23 @@ public class OpenFilesIterator extends
private final ClientProtocol namenode; private final ClientProtocol namenode;
private final Tracer tracer; private final Tracer tracer;
private final EnumSet<OpenFilesType> types; private final EnumSet<OpenFilesType> types;
/** List files filtered by given path. */
private String path;
public OpenFilesIterator(ClientProtocol namenode, Tracer tracer, public OpenFilesIterator(ClientProtocol namenode, Tracer tracer,
EnumSet<OpenFilesType> types) { EnumSet<OpenFilesType> types, String path) {
super(HdfsConstants.GRANDFATHER_INODE_ID); super(HdfsConstants.GRANDFATHER_INODE_ID);
this.namenode = namenode; this.namenode = namenode;
this.tracer = tracer; this.tracer = tracer;
this.types = types; this.types = types;
this.path = path;
} }
@Override @Override
public BatchedEntries<OpenFileEntry> makeRequest(Long prevId) public BatchedEntries<OpenFileEntry> makeRequest(Long prevId)
throws IOException { throws IOException {
try (TraceScope ignored = tracer.newScope("listOpenFiles")) { try (TraceScope ignored = tracer.newScope("listOpenFiles")) {
return namenode.listOpenFiles(prevId, types); return namenode.listOpenFiles(prevId, types, path);
} }
} }

View File

@ -78,6 +78,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.OpenFilesIterator.OpenFilesType; import org.apache.hadoop.hdfs.protocol.OpenFilesIterator.OpenFilesType;
import org.apache.hadoop.hdfs.protocol.ReplicatedBlockStats; import org.apache.hadoop.hdfs.protocol.ReplicatedBlockStats;
import org.apache.hadoop.hdfs.protocol.OpenFileEntry; import org.apache.hadoop.hdfs.protocol.OpenFileEntry;
import org.apache.hadoop.hdfs.protocol.OpenFilesIterator;
import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus; import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus;
import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo; import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport; import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
@ -1898,17 +1899,20 @@ public class ClientNamenodeProtocolTranslatorPB implements
@Override @Override
public BatchedEntries<OpenFileEntry> listOpenFiles(long prevId) public BatchedEntries<OpenFileEntry> listOpenFiles(long prevId)
throws IOException { throws IOException {
return listOpenFiles(prevId, EnumSet.of(OpenFilesType.ALL_OPEN_FILES)); return listOpenFiles(prevId, EnumSet.of(OpenFilesType.ALL_OPEN_FILES),
OpenFilesIterator.FILTER_PATH_DEFAULT);
} }
@Override @Override
public BatchedEntries<OpenFileEntry> listOpenFiles(long prevId, public BatchedEntries<OpenFileEntry> listOpenFiles(long prevId,
EnumSet<OpenFilesType> openFilesTypes) throws IOException { EnumSet<OpenFilesType> openFilesTypes, String path) throws IOException {
ListOpenFilesRequestProto.Builder req = ListOpenFilesRequestProto.Builder req =
ListOpenFilesRequestProto.newBuilder().setId(prevId); ListOpenFilesRequestProto.newBuilder().setId(prevId);
if (openFilesTypes != null) { if (openFilesTypes != null) {
req.addAllTypes(PBHelperClient.convertOpenFileTypes(openFilesTypes)); req.addAllTypes(PBHelperClient.convertOpenFileTypes(openFilesTypes));
} }
req.setPath(path);
try { try {
ListOpenFilesResponseProto response = ListOpenFilesResponseProto response =
rpcProxy.listOpenFiles(null, req.build()); rpcProxy.listOpenFiles(null, req.build());

View File

@ -804,6 +804,7 @@ enum OpenFilesTypeProto {
message ListOpenFilesRequestProto { message ListOpenFilesRequestProto {
required int64 id = 1; required int64 id = 1;
repeated OpenFilesTypeProto types = 2; repeated OpenFilesTypeProto types = 2;
optional string path = 3;
} }
message OpenFilesBatchResponseProto { message OpenFilesBatchResponseProto {

View File

@ -1856,7 +1856,7 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
EnumSet<OpenFilesType> openFilesTypes = EnumSet<OpenFilesType> openFilesTypes =
PBHelperClient.convertOpenFileTypes(req.getTypesList()); PBHelperClient.convertOpenFileTypes(req.getTypesList());
BatchedEntries<OpenFileEntry> entries = server.listOpenFiles(req.getId(), BatchedEntries<OpenFileEntry> entries = server.listOpenFiles(req.getId(),
openFilesTypes); openFilesTypes, req.getPath());
ListOpenFilesResponseProto.Builder builder = ListOpenFilesResponseProto.Builder builder =
ListOpenFilesResponseProto.newBuilder(); ListOpenFilesResponseProto.newBuilder();
builder.setHasMore(entries.hasMore()); builder.setHasMore(entries.hasMore());

View File

@ -90,6 +90,7 @@ import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.OpenFileEntry; import org.apache.hadoop.hdfs.protocol.OpenFileEntry;
import org.apache.hadoop.hdfs.protocol.OpenFilesIterator;
import org.apache.hadoop.hdfs.protocol.OpenFilesIterator.OpenFilesType; import org.apache.hadoop.hdfs.protocol.OpenFilesIterator.OpenFilesType;
import org.apache.hadoop.hdfs.protocol.ReplicatedBlockStats; import org.apache.hadoop.hdfs.protocol.ReplicatedBlockStats;
import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo; import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
@ -1940,12 +1941,13 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
@Override @Override
public BatchedEntries<OpenFileEntry> listOpenFiles(long prevId) public BatchedEntries<OpenFileEntry> listOpenFiles(long prevId)
throws IOException { throws IOException {
return listOpenFiles(prevId, EnumSet.of(OpenFilesType.ALL_OPEN_FILES)); return listOpenFiles(prevId, EnumSet.of(OpenFilesType.ALL_OPEN_FILES),
OpenFilesIterator.FILTER_PATH_DEFAULT);
} }
@Override @Override
public BatchedEntries<OpenFileEntry> listOpenFiles(long prevId, public BatchedEntries<OpenFileEntry> listOpenFiles(long prevId,
EnumSet<OpenFilesType> openFilesTypes) throws IOException { EnumSet<OpenFilesType> openFilesTypes, String path) throws IOException {
checkOperation(OperationCategory.READ, false); checkOperation(OperationCategory.READ, false);
return null; return null;
} }

View File

@ -1767,11 +1767,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
* TODO: HDFS-12969 - to report open files by type. * TODO: HDFS-12969 - to report open files by type.
* *
* @param prevId the cursor INode id. * @param prevId the cursor INode id.
* @param openFilesTypes * @param openFilesTypes types to filter the open files.
* @param path path to filter the open files.
* @throws IOException * @throws IOException
*/ */
BatchedListEntries<OpenFileEntry> listOpenFiles(long prevId, BatchedListEntries<OpenFileEntry> listOpenFiles(long prevId,
EnumSet<OpenFilesType> openFilesTypes) throws IOException { EnumSet<OpenFilesType> openFilesTypes, String path) throws IOException {
final String operationName = "listOpenFiles"; final String operationName = "listOpenFiles";
checkSuperuserPrivilege(); checkSuperuserPrivilege();
checkOperation(OperationCategory.READ); checkOperation(OperationCategory.READ);
@ -1780,10 +1781,11 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
try { try {
checkOperation(OperationCategory.READ); checkOperation(OperationCategory.READ);
if(openFilesTypes.contains(OpenFilesType.ALL_OPEN_FILES)) { if(openFilesTypes.contains(OpenFilesType.ALL_OPEN_FILES)) {
batchedListEntries = leaseManager.getUnderConstructionFiles(prevId); batchedListEntries = leaseManager.getUnderConstructionFiles(prevId,
path);
} else { } else {
if(openFilesTypes.contains(OpenFilesType.BLOCKING_DECOMMISSION)) { if(openFilesTypes.contains(OpenFilesType.BLOCKING_DECOMMISSION)) {
batchedListEntries = getFilesBlockingDecom(prevId); batchedListEntries = getFilesBlockingDecom(prevId, path);
} else { } else {
throw new IllegalArgumentException("Unknown OpenFileType: " throw new IllegalArgumentException("Unknown OpenFileType: "
+ openFilesTypes); + openFilesTypes);
@ -1799,7 +1801,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
return batchedListEntries; return batchedListEntries;
} }
public BatchedListEntries<OpenFileEntry> getFilesBlockingDecom(long prevId) { public BatchedListEntries<OpenFileEntry> getFilesBlockingDecom(long prevId,
String path) {
assert hasReadLock(); assert hasReadLock();
final List<OpenFileEntry> openFileEntries = Lists.newArrayList(); final List<OpenFileEntry> openFileEntries = Lists.newArrayList();
LightWeightHashSet<Long> openFileIds = new LightWeightHashSet<>(); LightWeightHashSet<Long> openFileIds = new LightWeightHashSet<>();
@ -1817,10 +1820,16 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
Preconditions.checkState(ucFile instanceof INodeFile); Preconditions.checkState(ucFile instanceof INodeFile);
openFileIds.add(ucFileId); openFileIds.add(ucFileId);
INodeFile inodeFile = ucFile.asFile(); INodeFile inodeFile = ucFile.asFile();
openFileEntries.add(new OpenFileEntry(
inodeFile.getId(), inodeFile.getFullPathName(), String fullPathName = inodeFile.getFullPathName();
inodeFile.getFileUnderConstructionFeature().getClientName(), if (org.apache.commons.lang.StringUtils.isEmpty(path)
inodeFile.getFileUnderConstructionFeature().getClientMachine())); || fullPathName.startsWith(path)) {
openFileEntries.add(new OpenFileEntry(inodeFile.getId(),
inodeFile.getFullPathName(),
inodeFile.getFileUnderConstructionFeature().getClientName(),
inodeFile.getFileUnderConstructionFeature().getClientMachine()));
}
if (openFileIds.size() >= this.maxListOpenFilesResponses) { if (openFileIds.size() >= this.maxListOpenFilesResponses) {
return new BatchedListEntries<>(openFileEntries, true); return new BatchedListEntries<>(openFileEntries, true);
} }

View File

@ -37,10 +37,13 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries; import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.OpenFileEntry; import org.apache.hadoop.hdfs.protocol.OpenFileEntry;
import org.apache.hadoop.hdfs.protocol.OpenFilesIterator;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.Daemon;
@ -258,6 +261,12 @@ public class LeaseManager {
return iipSet; return iipSet;
} }
public BatchedListEntries<OpenFileEntry> getUnderConstructionFiles(
final long prevId) throws IOException {
return getUnderConstructionFiles(prevId,
OpenFilesIterator.FILTER_PATH_DEFAULT);
}
/** /**
* Get a batch of under construction files from the currently active leases. * Get a batch of under construction files from the currently active leases.
* File INodeID is the cursor used to fetch new batch of results and the * File INodeID is the cursor used to fetch new batch of results and the
@ -270,7 +279,7 @@ public class LeaseManager {
* @throws IOException * @throws IOException
*/ */
public BatchedListEntries<OpenFileEntry> getUnderConstructionFiles( public BatchedListEntries<OpenFileEntry> getUnderConstructionFiles(
final long prevId) throws IOException { final long prevId, final String path) throws IOException {
assert fsnamesystem.hasReadLock(); assert fsnamesystem.hasReadLock();
SortedMap<Long, Lease> remainingLeases; SortedMap<Long, Lease> remainingLeases;
synchronized (this) { synchronized (this) {
@ -283,6 +292,7 @@ public class LeaseManager {
Lists.newArrayListWithExpectedSize(numResponses); Lists.newArrayListWithExpectedSize(numResponses);
int count = 0; int count = 0;
String fullPathName = null;
for (Long inodeId: inodeIds) { for (Long inodeId: inodeIds) {
final INodeFile inodeFile = final INodeFile inodeFile =
fsnamesystem.getFSDirectory().getInode(inodeId).asFile(); fsnamesystem.getFSDirectory().getInode(inodeId).asFile();
@ -291,11 +301,15 @@ public class LeaseManager {
inodeFile.getFullPathName()); inodeFile.getFullPathName());
continue; continue;
} }
openFileEntries.add(new OpenFileEntry(
inodeFile.getId(), inodeFile.getFullPathName(), fullPathName = inodeFile.getFullPathName();
inodeFile.getFileUnderConstructionFeature().getClientName(), if (StringUtils.isEmpty(path) || fullPathName.startsWith(path)) {
inodeFile.getFileUnderConstructionFeature().getClientMachine())); openFileEntries.add(new OpenFileEntry(inodeFile.getId(), fullPathName,
count++; inodeFile.getFileUnderConstructionFeature().getClientName(),
inodeFile.getFileUnderConstructionFeature().getClientMachine()));
count++;
}
if (count >= numResponses) { if (count >= numResponses) {
break; break;
} }

View File

@ -115,6 +115,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException; import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
import org.apache.hadoop.hdfs.protocol.OpenFileEntry; import org.apache.hadoop.hdfs.protocol.OpenFileEntry;
import org.apache.hadoop.hdfs.protocol.OpenFilesIterator;
import org.apache.hadoop.hdfs.protocol.OpenFilesIterator.OpenFilesType; import org.apache.hadoop.hdfs.protocol.OpenFilesIterator.OpenFilesType;
import org.apache.hadoop.hdfs.protocol.QuotaByStorageTypeExceededException; import org.apache.hadoop.hdfs.protocol.QuotaByStorageTypeExceededException;
import org.apache.hadoop.hdfs.protocol.QuotaExceededException; import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
@ -1339,14 +1340,15 @@ public class NameNodeRpcServer implements NamenodeProtocols {
@Override // ClientProtocol @Override // ClientProtocol
public BatchedEntries<OpenFileEntry> listOpenFiles(long prevId) public BatchedEntries<OpenFileEntry> listOpenFiles(long prevId)
throws IOException { throws IOException {
return listOpenFiles(prevId, EnumSet.of(OpenFilesType.ALL_OPEN_FILES)); return listOpenFiles(prevId, EnumSet.of(OpenFilesType.ALL_OPEN_FILES),
OpenFilesIterator.FILTER_PATH_DEFAULT);
} }
@Override // ClientProtocol @Override // ClientProtocol
public BatchedEntries<OpenFileEntry> listOpenFiles(long prevId, public BatchedEntries<OpenFileEntry> listOpenFiles(long prevId,
EnumSet<OpenFilesType> openFilesTypes) throws IOException { EnumSet<OpenFilesType> openFilesTypes, String path) throws IOException {
checkNNStartup(); checkNNStartup();
return namesystem.listOpenFiles(prevId, openFilesTypes); return namesystem.listOpenFiles(prevId, openFilesTypes, path);
} }
@Override // ClientProtocol @Override // ClientProtocol

View File

@ -464,7 +464,7 @@ public class DFSAdmin extends FsShell {
"\t[-getDatanodeInfo <datanode_host:ipc_port>]\n" + "\t[-getDatanodeInfo <datanode_host:ipc_port>]\n" +
"\t[-metasave filename]\n" + "\t[-metasave filename]\n" +
"\t[-triggerBlockReport [-incremental] <datanode_host:ipc_port>]\n" + "\t[-triggerBlockReport [-incremental] <datanode_host:ipc_port>]\n" +
"\t[-listOpenFiles [-blockingDecommission]]\n" + "\t[-listOpenFiles [-blockingDecommission] [-path <path>]]\n" +
"\t[-help [cmd]]\n"; "\t[-help [cmd]]\n";
/** /**
@ -918,16 +918,29 @@ public class DFSAdmin extends FsShell {
* @param argv * @param argv
*/ */
public int listOpenFiles(String[] argv) throws IOException { public int listOpenFiles(String[] argv) throws IOException {
String path = null;
List<OpenFilesType> types = new ArrayList<>(); List<OpenFilesType> types = new ArrayList<>();
if (argv != null) { if (argv != null) {
List<String> args = new ArrayList<>(Arrays.asList(argv)); List<String> args = new ArrayList<>(Arrays.asList(argv));
if (StringUtils.popOption("-blockingDecommission", args)) { if (StringUtils.popOption("-blockingDecommission", args)) {
types.add(OpenFilesType.BLOCKING_DECOMMISSION); types.add(OpenFilesType.BLOCKING_DECOMMISSION);
} }
path = StringUtils.popOptionWithArgument("-path", args);
} }
if (types.isEmpty()) { if (types.isEmpty()) {
types.add(OpenFilesType.ALL_OPEN_FILES); types.add(OpenFilesType.ALL_OPEN_FILES);
} }
if (path != null) {
path = path.trim();
if (path.length() == 0) {
path = OpenFilesIterator.FILTER_PATH_DEFAULT;
}
} else {
path = OpenFilesIterator.FILTER_PATH_DEFAULT;
}
EnumSet<OpenFilesType> openFilesTypes = EnumSet.copyOf(types); EnumSet<OpenFilesType> openFilesTypes = EnumSet.copyOf(types);
DistributedFileSystem dfs = getDFS(); DistributedFileSystem dfs = getDFS();
@ -941,9 +954,9 @@ public class DFSAdmin extends FsShell {
dfsConf, HAUtil.getAddressOfActive(getDFS()), ClientProtocol.class, dfsConf, HAUtil.getAddressOfActive(getDFS()), ClientProtocol.class,
UserGroupInformation.getCurrentUser(), false); UserGroupInformation.getCurrentUser(), false);
openFilesRemoteIterator = new OpenFilesIterator(proxy.getProxy(), openFilesRemoteIterator = new OpenFilesIterator(proxy.getProxy(),
FsTracer.get(dfsConf), openFilesTypes); FsTracer.get(dfsConf), openFilesTypes, path);
} else { } else {
openFilesRemoteIterator = dfs.listOpenFiles(openFilesTypes); openFilesRemoteIterator = dfs.listOpenFiles(openFilesTypes, path);
} }
printOpenFiles(openFilesRemoteIterator); printOpenFiles(openFilesRemoteIterator);
return 0; return 0;
@ -1982,7 +1995,7 @@ public class DFSAdmin extends FsShell {
+ " [-triggerBlockReport [-incremental] <datanode_host:ipc_port>]"); + " [-triggerBlockReport [-incremental] <datanode_host:ipc_port>]");
} else if ("-listOpenFiles".equals(cmd)) { } else if ("-listOpenFiles".equals(cmd)) {
System.err.println("Usage: hdfs dfsadmin" System.err.println("Usage: hdfs dfsadmin"
+ " [-listOpenFiles [-blockingDecommission]]"); + " [-listOpenFiles [-blockingDecommission] [-path <path>]]");
} else { } else {
System.err.println("Usage: hdfs dfsadmin"); System.err.println("Usage: hdfs dfsadmin");
System.err.println("Note: Administrative commands can only be run as the HDFS superuser."); System.err.println("Note: Administrative commands can only be run as the HDFS superuser.");
@ -2137,7 +2150,7 @@ public class DFSAdmin extends FsShell {
return exitCode; return exitCode;
} }
} else if ("-listOpenFiles".equals(cmd)) { } else if ("-listOpenFiles".equals(cmd)) {
if ((argv.length != 1) && (argv.length != 2)) { if ((argv.length > 4)) {
printUsage(cmd); printUsage(cmd);
return exitCode; return exitCode;
} }

View File

@ -372,7 +372,7 @@ Usage:
hdfs dfsadmin [-getDatanodeInfo <datanode_host:ipc_port>] hdfs dfsadmin [-getDatanodeInfo <datanode_host:ipc_port>]
hdfs dfsadmin [-metasave filename] hdfs dfsadmin [-metasave filename]
hdfs dfsadmin [-triggerBlockReport [-incremental] <datanode_host:ipc_port>] hdfs dfsadmin [-triggerBlockReport [-incremental] <datanode_host:ipc_port>]
hdfs dfsadmin [-listOpenFiles] hdfs dfsadmin [-listOpenFiles [-blockingDecommission] [-path <path>]]
hdfs dfsadmin [-help [cmd]] hdfs dfsadmin [-help [cmd]]
| COMMAND\_OPTION | Description | | COMMAND\_OPTION | Description |
@ -409,7 +409,7 @@ Usage:
| `-getDatanodeInfo` \<datanode\_host:ipc\_port\> | Get the information about the given datanode. See [Rolling Upgrade document](./HdfsRollingUpgrade.html#dfsadmin_-getDatanodeInfo) for the detail. | | `-getDatanodeInfo` \<datanode\_host:ipc\_port\> | Get the information about the given datanode. See [Rolling Upgrade document](./HdfsRollingUpgrade.html#dfsadmin_-getDatanodeInfo) for the detail. |
| `-metasave` filename | Save Namenode's primary data structures to *filename* in the directory specified by hadoop.log.dir property. *filename* is overwritten if it exists. *filename* will contain one line for each of the following<br/>1. Datanodes heart beating with Namenode<br/>2. Blocks waiting to be replicated<br/>3. Blocks currently being replicated<br/>4. Blocks waiting to be deleted | | `-metasave` filename | Save Namenode's primary data structures to *filename* in the directory specified by hadoop.log.dir property. *filename* is overwritten if it exists. *filename* will contain one line for each of the following<br/>1. Datanodes heart beating with Namenode<br/>2. Blocks waiting to be replicated<br/>3. Blocks currently being replicated<br/>4. Blocks waiting to be deleted |
| `-triggerBlockReport` `[-incremental]` \<datanode\_host:ipc\_port\> | Trigger a block report for the given datanode. If 'incremental' is specified, it will be otherwise, it will be a full block report. | | `-triggerBlockReport` `[-incremental]` \<datanode\_host:ipc\_port\> | Trigger a block report for the given datanode. If 'incremental' is specified, it will be otherwise, it will be a full block report. |
| `-listOpenFiles` `[-blockingDecommission]` | List all open files currently managed by the NameNode along with client name and client machine accessing them. | | `-listOpenFiles` `[-blockingDecommission]` `[-path <path>]` | List all open files currently managed by the NameNode along with client name and client machine accessing them. Open files list will be filtered by given type and path. |
| `-help` [cmd] | Displays help for the given command or all commands if none is specified. | | `-help` [cmd] | Displays help for the given command or all commands if none is specified. |
Runs a HDFS dfsadmin client. Runs a HDFS dfsadmin client.

View File

@ -710,13 +710,49 @@ public class TestDecommission extends AdminStatesBaseTest {
@Override @Override
public Boolean get() { public Boolean get() {
try { try {
boolean result1 = false;
boolean result2 = false;
toolOut.reset(); toolOut.reset();
assertEquals(0, ToolRunner.run(dfsAdmin, assertEquals(0, ToolRunner.run(dfsAdmin,
new String[]{"-listOpenFiles", "-blockingDecommission"})); new String[]{"-listOpenFiles", "-blockingDecommission"}));
toolOut.flush(); toolOut.flush();
return verifyOpenFilesListing( result1 = verifyOpenFilesListing(
"dfsadmin -listOpenFiles -blockingDecommission", "dfsadmin -listOpenFiles -blockingDecommission",
closedFileSet, openFilesMap, toolOut, maxOpenFiles); closedFileSet, openFilesMap, toolOut, maxOpenFiles);
// test -blockingDecommission with option -path
if (openFilesMap.size() > 0) {
String firstOpenFile = null;
// Construct a new open-file and close-file map.
// Pick the first open file into new open-file map, remaining
// open files move into close-files map.
HashMap<Path, FSDataOutputStream> newOpenFilesMap =
new HashMap<>();
HashSet<Path> newClosedFileSet = new HashSet<>();
for (Map.Entry<Path, FSDataOutputStream> entry : openFilesMap
.entrySet()) {
if (firstOpenFile == null) {
newOpenFilesMap.put(entry.getKey(), entry.getValue());
firstOpenFile = entry.getKey().toString();
} else {
newClosedFileSet.add(entry.getKey());
}
}
toolOut.reset();
assertEquals(0,
ToolRunner.run(dfsAdmin, new String[] {"-listOpenFiles",
"-blockingDecommission", "-path", firstOpenFile}));
toolOut.flush();
result2 = verifyOpenFilesListing(
"dfsadmin -listOpenFiles -blockingDecommission -path"
+ firstOpenFile,
newClosedFileSet, newOpenFilesMap, toolOut, 1);
} else {
result2 = true;
}
return result1 && result2;
} catch (Exception e) { } catch (Exception e) {
LOG.warn("Unexpected exception: " + e); LOG.warn("Unexpected exception: " + e);
} }

View File

@ -42,6 +42,7 @@ import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hdfs.client.HdfsAdmin; import org.apache.hadoop.hdfs.client.HdfsAdmin;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.OpenFileEntry; import org.apache.hadoop.hdfs.protocol.OpenFileEntry;
import org.apache.hadoop.hdfs.protocol.OpenFilesIterator;
import org.apache.hadoop.hdfs.protocol.OpenFilesIterator.OpenFilesType; import org.apache.hadoop.hdfs.protocol.OpenFilesIterator.OpenFilesType;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite; import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
import org.junit.After; import org.junit.After;
@ -256,7 +257,8 @@ public class TestHdfsAdmin {
HdfsAdmin hdfsAdmin = new HdfsAdmin(FileSystem.getDefaultUri(conf), conf); HdfsAdmin hdfsAdmin = new HdfsAdmin(FileSystem.getDefaultUri(conf), conf);
HashSet<Path> openFiles = new HashSet<>(openFileMap.keySet()); HashSet<Path> openFiles = new HashSet<>(openFileMap.keySet());
RemoteIterator<OpenFileEntry> openFilesRemoteItr = RemoteIterator<OpenFileEntry> openFilesRemoteItr =
hdfsAdmin.listOpenFiles(EnumSet.of(OpenFilesType.ALL_OPEN_FILES)); hdfsAdmin.listOpenFiles(EnumSet.of(OpenFilesType.ALL_OPEN_FILES),
OpenFilesIterator.FILTER_PATH_DEFAULT);
while (openFilesRemoteItr.hasNext()) { while (openFilesRemoteItr.hasNext()) {
String filePath = openFilesRemoteItr.next().getFilePath(); String filePath = openFilesRemoteItr.next().getFilePath();
assertFalse(filePath + " should not be listed under open files!", assertFalse(filePath + " should not be listed under open files!",

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.protocol.OpenFilesIterator;
import org.apache.hadoop.hdfs.protocol.QuotaExceededException; import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot; import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
@ -405,8 +406,11 @@ public class TestLeaseManager {
leaseManager.getINodeWithLeases(ancestorDirectory).size()); leaseManager.getINodeWithLeases(ancestorDirectory).size());
assertEquals(iNodeIdWithLeaseCount, assertEquals(iNodeIdWithLeaseCount,
leaseManager.getUnderConstructionFiles(0).size()); leaseManager.getUnderConstructionFiles(0).size());
assertEquals(0, (fsNamesystem.getFilesBlockingDecom(0) == null ? assertEquals(0,
0 : fsNamesystem.getFilesBlockingDecom(0).size())); (fsNamesystem.getFilesBlockingDecom(0,
OpenFilesIterator.FILTER_PATH_DEFAULT) == null ? 0
: fsNamesystem.getFilesBlockingDecom(0,
OpenFilesIterator.FILTER_PATH_DEFAULT).size()));
} }
private Map<String, INode> createINodeTree(INodeDirectory parentDir, private Map<String, INode> createINodeTree(INodeDirectory parentDir,

View File

@ -44,6 +44,7 @@ import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.MiniDFSNNTopology; import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.OpenFileEntry; import org.apache.hadoop.hdfs.protocol.OpenFileEntry;
import org.apache.hadoop.hdfs.protocol.OpenFilesIterator;
import org.apache.hadoop.hdfs.protocol.OpenFilesIterator.OpenFilesType; import org.apache.hadoop.hdfs.protocol.OpenFilesIterator.OpenFilesType;
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil; import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
@ -97,11 +98,13 @@ public class TestListOpenFiles {
verifyOpenFiles(openFiles); verifyOpenFiles(openFiles);
BatchedEntries<OpenFileEntry> openFileEntryBatchedEntries = BatchedEntries<OpenFileEntry> openFileEntryBatchedEntries =
nnRpc.listOpenFiles(0, EnumSet.of(OpenFilesType.ALL_OPEN_FILES)); nnRpc.listOpenFiles(0, EnumSet.of(OpenFilesType.ALL_OPEN_FILES),
OpenFilesIterator.FILTER_PATH_DEFAULT);
assertTrue("Open files list should be empty!", assertTrue("Open files list should be empty!",
openFileEntryBatchedEntries.size() == 0); openFileEntryBatchedEntries.size() == 0);
BatchedEntries<OpenFileEntry> openFilesBlockingDecomEntries = BatchedEntries<OpenFileEntry> openFilesBlockingDecomEntries =
nnRpc.listOpenFiles(0, EnumSet.of(OpenFilesType.BLOCKING_DECOMMISSION)); nnRpc.listOpenFiles(0, EnumSet.of(OpenFilesType.BLOCKING_DECOMMISSION),
OpenFilesIterator.FILTER_PATH_DEFAULT);
assertTrue("Open files list blocking decommission should be empty!", assertTrue("Open files list blocking decommission should be empty!",
openFilesBlockingDecomEntries.size() == 0); openFilesBlockingDecomEntries.size() == 0);
@ -128,15 +131,16 @@ public class TestListOpenFiles {
} }
private void verifyOpenFiles(Map<Path, FSDataOutputStream> openFiles, private void verifyOpenFiles(Map<Path, FSDataOutputStream> openFiles,
EnumSet<OpenFilesType> openFilesTypes) throws IOException { EnumSet<OpenFilesType> openFilesTypes, String path) throws IOException {
HashSet<Path> remainingFiles = new HashSet<>(openFiles.keySet()); HashSet<Path> remainingFiles = new HashSet<>(openFiles.keySet());
OpenFileEntry lastEntry = null; OpenFileEntry lastEntry = null;
BatchedEntries<OpenFileEntry> batchedEntries; BatchedEntries<OpenFileEntry> batchedEntries;
do { do {
if (lastEntry == null) { if (lastEntry == null) {
batchedEntries = nnRpc.listOpenFiles(0, openFilesTypes); batchedEntries = nnRpc.listOpenFiles(0, openFilesTypes, path);
} else { } else {
batchedEntries = nnRpc.listOpenFiles(lastEntry.getId(), openFilesTypes); batchedEntries = nnRpc.listOpenFiles(lastEntry.getId(),
openFilesTypes, path);
} }
assertTrue("Incorrect open files list size!", assertTrue("Incorrect open files list size!",
batchedEntries.size() <= BATCH_SIZE); batchedEntries.size() <= BATCH_SIZE);
@ -154,9 +158,11 @@ public class TestListOpenFiles {
private void verifyOpenFiles(Map<Path, FSDataOutputStream> openFiles) private void verifyOpenFiles(Map<Path, FSDataOutputStream> openFiles)
throws IOException { throws IOException {
verifyOpenFiles(openFiles, EnumSet.of(OpenFilesType.ALL_OPEN_FILES)); verifyOpenFiles(openFiles, EnumSet.of(OpenFilesType.ALL_OPEN_FILES),
OpenFilesIterator.FILTER_PATH_DEFAULT);
verifyOpenFiles(new HashMap<>(), verifyOpenFiles(new HashMap<>(),
EnumSet.of(OpenFilesType.BLOCKING_DECOMMISSION)); EnumSet.of(OpenFilesType.BLOCKING_DECOMMISSION),
OpenFilesIterator.FILTER_PATH_DEFAULT);
} }
private Set<Path> createFiles(FileSystem fileSystem, String fileNamePrefix, private Set<Path> createFiles(FileSystem fileSystem, String fileNamePrefix,

View File

@ -725,6 +725,67 @@ public class TestDFSAdmin {
new String[]{"-listOpenFiles"})); new String[]{"-listOpenFiles"}));
verifyOpenFilesListing(closedFileSet, openFilesMap); verifyOpenFilesListing(closedFileSet, openFilesMap);
} }
// test -listOpenFiles command with option <path>
openFilesMap.clear();
Path file;
HashMap<Path, FSDataOutputStream> openFiles1 = new HashMap<>();
HashMap<Path, FSDataOutputStream> openFiles2 = new HashMap<>();
for (int i = 0; i < numOpenFiles; i++) {
if (i % 2 == 0) {
file = new Path(new Path("/tmp/files/a"), "open-file-" + i);
} else {
file = new Path(new Path("/tmp/files/b"), "open-file-" + i);
}
DFSTestUtil.createFile(fs, file, fileLength, replFactor, 12345L);
FSDataOutputStream outputStream = fs.append(file);
if (i % 2 == 0) {
openFiles1.put(file, outputStream);
} else {
openFiles2.put(file, outputStream);
}
openFilesMap.put(file, outputStream);
}
resetStream();
// list all open files
assertEquals(0,
ToolRunner.run(dfsAdmin, new String[] {"-listOpenFiles"}));
verifyOpenFilesListing(null, openFilesMap);
resetStream();
// list open files under directory path /tmp/files/a
assertEquals(0, ToolRunner.run(dfsAdmin,
new String[] {"-listOpenFiles", "-path", "/tmp/files/a"}));
verifyOpenFilesListing(null, openFiles1);
resetStream();
// list open files without input path
assertEquals(-1, ToolRunner.run(dfsAdmin,
new String[] {"-listOpenFiles", "-path"}));
// verify the error
String outStr = scanIntoString(err);
assertTrue(outStr.contains("listOpenFiles: option"
+ " -path requires 1 argument"));
resetStream();
// list open files with empty path
assertEquals(0, ToolRunner.run(dfsAdmin,
new String[] {"-listOpenFiles", "-path", ""}));
// all the open files will be listed
verifyOpenFilesListing(null, openFilesMap);
resetStream();
// list invalid path file
assertEquals(0, ToolRunner.run(dfsAdmin,
new String[] {"-listOpenFiles", "-path", "/invalid_path"}));
outStr = scanIntoString(out);
for (Path openFilePath : openFilesMap.keySet()) {
assertThat(outStr, not(containsString(openFilePath.toString())));
}
DFSTestUtil.closeOpenFiles(openFilesMap, openFilesMap.size());
} }
} }
@ -732,9 +793,13 @@ public class TestDFSAdmin {
HashMap<Path, FSDataOutputStream> openFilesMap) { HashMap<Path, FSDataOutputStream> openFilesMap) {
final String outStr = scanIntoString(out); final String outStr = scanIntoString(out);
LOG.info("dfsadmin -listOpenFiles output: \n" + out); LOG.info("dfsadmin -listOpenFiles output: \n" + out);
for (Path closedFilePath : closedFileSet) { if (closedFileSet != null) {
assertThat(outStr, not(containsString(closedFilePath.toString() + "\n"))); for (Path closedFilePath : closedFileSet) {
assertThat(outStr,
not(containsString(closedFilePath.toString() + "\n")));
}
} }
for (Path openFilePath : openFilesMap.keySet()) { for (Path openFilePath : openFilesMap.keySet()) {
assertThat(outStr, is(containsString(openFilePath.toString() + "\n"))); assertThat(outStr, is(containsString(openFilePath.toString() + "\n")));
} }