HDFS-10480. Add an admin command to list currently open files.
This commit is contained in:
parent
168d8e0c04
commit
fc66e76a4a
|
@ -134,6 +134,8 @@ import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||||
import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
|
import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.OpenFileEntry;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.OpenFilesIterator;
|
||||||
import org.apache.hadoop.hdfs.protocol.QuotaByStorageTypeExceededException;
|
import org.apache.hadoop.hdfs.protocol.QuotaByStorageTypeExceededException;
|
||||||
import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
|
import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
|
import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
|
||||||
|
@ -3085,4 +3087,14 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
|
||||||
Tracer getTracer() {
|
Tracer getTracer() {
|
||||||
return tracer;
|
return tracer;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get a remote iterator to the open files list managed by NameNode.
|
||||||
|
*
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public RemoteIterator<OpenFileEntry> listOpenFiles() throws IOException {
|
||||||
|
checkOpen();
|
||||||
|
return new OpenFilesIterator(namenode, tracer);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -85,6 +85,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
|
import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.OpenFileEntry;
|
||||||
import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
|
import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
|
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
|
||||||
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
|
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
|
||||||
|
@ -2556,4 +2557,18 @@ public class DistributedFileSystem extends FileSystem {
|
||||||
DFSOpsCountStatistics getDFSOpsCountStatistics() {
|
DFSOpsCountStatistics getDFSOpsCountStatistics() {
|
||||||
return storageStatistics;
|
return storageStatistics;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a RemoteIterator which can be used to list all open files
|
||||||
|
* currently managed by the NameNode. For large numbers of open files,
|
||||||
|
* iterator will fetch the list in batches of configured size.
|
||||||
|
* <p/>
|
||||||
|
* Since the list is fetched in batches, it does not represent a
|
||||||
|
* consistent snapshot of the all open files.
|
||||||
|
* <p/>
|
||||||
|
* This method can only be called by HDFS superusers.
|
||||||
|
*/
|
||||||
|
public RemoteIterator<OpenFileEntry> listOpenFiles() throws IOException {
|
||||||
|
return dfs.listOpenFiles();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -46,6 +46,7 @@ import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
|
||||||
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
|
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.EncryptionZone;
|
import org.apache.hadoop.hdfs.protocol.EncryptionZone;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.OpenFileEntry;
|
||||||
import org.apache.hadoop.security.AccessControlException;
|
import org.apache.hadoop.security.AccessControlException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -508,4 +509,18 @@ public class HdfsAdmin {
|
||||||
dfs.mkdir(trashPath, TRASH_PERMISSION);
|
dfs.mkdir(trashPath, TRASH_PERMISSION);
|
||||||
dfs.setPermission(trashPath, TRASH_PERMISSION);
|
dfs.setPermission(trashPath, TRASH_PERMISSION);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a RemoteIterator which can be used to list all open files
|
||||||
|
* currently managed by the NameNode. For large numbers of open files,
|
||||||
|
* iterator will fetch the list in batches of configured size.
|
||||||
|
* <p/>
|
||||||
|
* Since the list is fetched in batches, it does not represent a
|
||||||
|
* consistent snapshot of the all open files.
|
||||||
|
* <p/>
|
||||||
|
* This method can only be called by HDFS superusers.
|
||||||
|
*/
|
||||||
|
public RemoteIterator<OpenFileEntry> listOpenFiles() throws IOException {
|
||||||
|
return dfs.listOpenFiles();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1514,4 +1514,16 @@ public interface ClientProtocol {
|
||||||
*/
|
*/
|
||||||
@Idempotent
|
@Idempotent
|
||||||
QuotaUsage getQuotaUsage(String path) throws IOException;
|
QuotaUsage getQuotaUsage(String path) throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* List open files in the system in batches. INode id is the cursor and the
|
||||||
|
* open files returned in a batch will have their INode ids greater than
|
||||||
|
* the cursor INode id. Open files can only be requested by super user and
|
||||||
|
* the the list across batches are not atomic.
|
||||||
|
*
|
||||||
|
* @param prevId the cursor INode id.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
@Idempotent
|
||||||
|
BatchedEntries<OpenFileEntry> listOpenFiles(long prevId) throws IOException;
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,58 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hdfs.protocol;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* An open file entry for use by DFSAdmin commands.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
@InterfaceStability.Evolving
|
||||||
|
public class OpenFileEntry {
|
||||||
|
private final long id;
|
||||||
|
private final String filePath;
|
||||||
|
private final String clientName;
|
||||||
|
private final String clientMachine;
|
||||||
|
|
||||||
|
public OpenFileEntry(long id, String filePath,
|
||||||
|
String clientName, String clientMachine) {
|
||||||
|
this.id = id;
|
||||||
|
this.filePath = filePath;
|
||||||
|
this.clientName = clientName;
|
||||||
|
this.clientMachine = clientMachine;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getId() {
|
||||||
|
return id;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getFilePath() {
|
||||||
|
return filePath;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getClientMachine() {
|
||||||
|
return clientMachine;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getClientName() {
|
||||||
|
return clientName;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,59 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hdfs.protocol;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
import org.apache.hadoop.fs.BatchedRemoteIterator;
|
||||||
|
import org.apache.htrace.core.TraceScope;
|
||||||
|
import org.apache.htrace.core.Tracer;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* OpenFilesIterator is a remote iterator that iterates over the open files list
|
||||||
|
* managed by the NameNode. Since the list is retrieved in batches, it does not
|
||||||
|
* represent a consistent view of all open files.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
@InterfaceStability.Evolving
|
||||||
|
public class OpenFilesIterator extends
|
||||||
|
BatchedRemoteIterator<Long, OpenFileEntry> {
|
||||||
|
private final ClientProtocol namenode;
|
||||||
|
private final Tracer tracer;
|
||||||
|
|
||||||
|
public OpenFilesIterator(ClientProtocol namenode, Tracer tracer) {
|
||||||
|
super(HdfsConstants.GRANDFATHER_INODE_ID);
|
||||||
|
this.namenode = namenode;
|
||||||
|
this.tracer = tracer;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public BatchedEntries<OpenFileEntry> makeRequest(Long prevId)
|
||||||
|
throws IOException {
|
||||||
|
try (TraceScope ignored = tracer.newScope("listOpenFiles")) {
|
||||||
|
return namenode.listOpenFiles(prevId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Long elementToPrevKey(OpenFileEntry entry) {
|
||||||
|
return entry.getId();
|
||||||
|
}
|
||||||
|
}
|
|
@ -66,6 +66,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||||
import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
|
import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.OpenFileEntry;
|
||||||
import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
|
import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
|
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
|
||||||
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
|
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
|
||||||
|
@ -131,10 +132,13 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCa
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCachePoolsRequestProto;
|
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCachePoolsRequestProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCachePoolsResponseProto;
|
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCachePoolsResponseProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCorruptFileBlocksRequestProto;
|
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCorruptFileBlocksRequestProto;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListOpenFilesRequestProto;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListOpenFilesResponseProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MetaSaveRequestProto;
|
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MetaSaveRequestProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MkdirsRequestProto;
|
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MkdirsRequestProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ModifyCacheDirectiveRequestProto;
|
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ModifyCacheDirectiveRequestProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ModifyCachePoolRequestProto;
|
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ModifyCachePoolRequestProto;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.OpenFilesBatchResponseProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RecoverLeaseRequestProto;
|
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RecoverLeaseRequestProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RefreshNodesRequestProto;
|
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RefreshNodesRequestProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemoveCacheDirectiveRequestProto;
|
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemoveCacheDirectiveRequestProto;
|
||||||
|
@ -1583,4 +1587,23 @@ public class ClientNamenodeProtocolTranslatorPB implements
|
||||||
throw ProtobufHelper.getRemoteException(e);
|
throw ProtobufHelper.getRemoteException(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public BatchedEntries<OpenFileEntry> listOpenFiles(long prevId)
|
||||||
|
throws IOException {
|
||||||
|
ListOpenFilesRequestProto req =
|
||||||
|
ListOpenFilesRequestProto.newBuilder().setId(prevId).build();
|
||||||
|
try {
|
||||||
|
ListOpenFilesResponseProto response = rpcProxy.listOpenFiles(null, req);
|
||||||
|
List<OpenFileEntry> openFileEntries =
|
||||||
|
Lists.newArrayListWithCapacity(response.getEntriesCount());
|
||||||
|
for (OpenFilesBatchResponseProto p : response.getEntriesList()) {
|
||||||
|
openFileEntries.add(PBHelperClient.convert(p));
|
||||||
|
}
|
||||||
|
return new BatchedListEntries<>(openFileEntries, response.getHasMore());
|
||||||
|
} catch (ServiceException e) {
|
||||||
|
throw ProtobufHelper.getRemoteException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -83,6 +83,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
|
import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.OpenFileEntry;
|
||||||
import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
|
import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.RollingUpgradeStatus;
|
import org.apache.hadoop.hdfs.protocol.RollingUpgradeStatus;
|
||||||
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
|
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
|
||||||
|
@ -109,6 +110,7 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.Datano
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.DatanodeStorageReportProto;
|
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.DatanodeStorageReportProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetEditsFromTxidResponseProto;
|
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetEditsFromTxidResponseProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFsStatsResponseProto;
|
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFsStatsResponseProto;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.OpenFilesBatchResponseProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RollingUpgradeActionProto;
|
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RollingUpgradeActionProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RollingUpgradeInfoProto;
|
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RollingUpgradeInfoProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SafeModeActionProto;
|
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SafeModeActionProto;
|
||||||
|
@ -1071,6 +1073,21 @@ public class PBHelperClient {
|
||||||
proto.getKeyName());
|
proto.getKeyName());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static OpenFilesBatchResponseProto convert(OpenFileEntry
|
||||||
|
openFileEntry) {
|
||||||
|
return OpenFilesBatchResponseProto.newBuilder()
|
||||||
|
.setId(openFileEntry.getId())
|
||||||
|
.setPath(openFileEntry.getFilePath())
|
||||||
|
.setClientName(openFileEntry.getClientName())
|
||||||
|
.setClientMachine(openFileEntry.getClientMachine())
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static OpenFileEntry convert(OpenFilesBatchResponseProto proto) {
|
||||||
|
return new OpenFileEntry(proto.getId(), proto.getPath(),
|
||||||
|
proto.getClientName(), proto.getClientMachine());
|
||||||
|
}
|
||||||
|
|
||||||
public static AclStatus convert(GetAclStatusResponseProto e) {
|
public static AclStatus convert(GetAclStatusResponseProto e) {
|
||||||
AclStatusProto r = e.getResult();
|
AclStatusProto r = e.getResult();
|
||||||
AclStatus.Builder builder = new AclStatus.Builder();
|
AclStatus.Builder builder = new AclStatus.Builder();
|
||||||
|
|
|
@ -743,6 +743,22 @@ message GetEditsFromTxidResponseProto {
|
||||||
required EventsListProto eventsList = 1;
|
required EventsListProto eventsList = 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
message ListOpenFilesRequestProto {
|
||||||
|
required int64 id = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
message OpenFilesBatchResponseProto {
|
||||||
|
required int64 id = 1;
|
||||||
|
required string path = 2;
|
||||||
|
required string clientName = 3;
|
||||||
|
required string clientMachine = 4;
|
||||||
|
}
|
||||||
|
|
||||||
|
message ListOpenFilesResponseProto {
|
||||||
|
repeated OpenFilesBatchResponseProto entries = 1;
|
||||||
|
required bool hasMore = 2;
|
||||||
|
}
|
||||||
|
|
||||||
service ClientNamenodeProtocol {
|
service ClientNamenodeProtocol {
|
||||||
rpc getBlockLocations(GetBlockLocationsRequestProto)
|
rpc getBlockLocations(GetBlockLocationsRequestProto)
|
||||||
returns(GetBlockLocationsResponseProto);
|
returns(GetBlockLocationsResponseProto);
|
||||||
|
@ -895,4 +911,6 @@ service ClientNamenodeProtocol {
|
||||||
returns(GetEditsFromTxidResponseProto);
|
returns(GetEditsFromTxidResponseProto);
|
||||||
rpc getQuotaUsage(GetQuotaUsageRequestProto)
|
rpc getQuotaUsage(GetQuotaUsageRequestProto)
|
||||||
returns(GetQuotaUsageResponseProto);
|
returns(GetQuotaUsageResponseProto);
|
||||||
|
rpc listOpenFiles(ListOpenFilesRequestProto)
|
||||||
|
returns(ListOpenFilesResponseProto);
|
||||||
}
|
}
|
||||||
|
|
|
@ -765,6 +765,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
||||||
HdfsClientConfigKeys.DFS_DATA_TRANSFER_SASL_PROPS_RESOLVER_CLASS_KEY;
|
HdfsClientConfigKeys.DFS_DATA_TRANSFER_SASL_PROPS_RESOLVER_CLASS_KEY;
|
||||||
public static final int DFS_NAMENODE_LIST_ENCRYPTION_ZONES_NUM_RESPONSES_DEFAULT = 100;
|
public static final int DFS_NAMENODE_LIST_ENCRYPTION_ZONES_NUM_RESPONSES_DEFAULT = 100;
|
||||||
public static final String DFS_NAMENODE_LIST_ENCRYPTION_ZONES_NUM_RESPONSES = "dfs.namenode.list.encryption.zones.num.responses";
|
public static final String DFS_NAMENODE_LIST_ENCRYPTION_ZONES_NUM_RESPONSES = "dfs.namenode.list.encryption.zones.num.responses";
|
||||||
|
public static final String DFS_NAMENODE_LIST_OPENFILES_NUM_RESPONSES =
|
||||||
|
"dfs.namenode.list.openfiles.num.responses";
|
||||||
|
public static final int DFS_NAMENODE_LIST_OPENFILES_NUM_RESPONSES_DEFAULT =
|
||||||
|
1000;
|
||||||
public static final String DFS_NAMENODE_EDEKCACHELOADER_INTERVAL_MS_KEY = "dfs.namenode.edekcacheloader.interval.ms";
|
public static final String DFS_NAMENODE_EDEKCACHELOADER_INTERVAL_MS_KEY = "dfs.namenode.edekcacheloader.interval.ms";
|
||||||
public static final int DFS_NAMENODE_EDEKCACHELOADER_INTERVAL_MS_DEFAULT = 1000;
|
public static final int DFS_NAMENODE_EDEKCACHELOADER_INTERVAL_MS_DEFAULT = 1000;
|
||||||
public static final String DFS_NAMENODE_EDEKCACHELOADER_INITIAL_DELAY_MS_KEY = "dfs.namenode.edekcacheloader.initial.delay.ms";
|
public static final String DFS_NAMENODE_EDEKCACHELOADER_INITIAL_DELAY_MS_KEY = "dfs.namenode.edekcacheloader.initial.delay.ms";
|
||||||
|
|
|
@ -44,6 +44,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||||
import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
|
import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.OpenFileEntry;
|
||||||
import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
|
import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
|
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
|
||||||
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
|
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
|
||||||
|
@ -143,6 +144,8 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCa
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCachePoolsResponseProto;
|
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCachePoolsResponseProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCorruptFileBlocksRequestProto;
|
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCorruptFileBlocksRequestProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCorruptFileBlocksResponseProto;
|
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCorruptFileBlocksResponseProto;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListOpenFilesRequestProto;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListOpenFilesResponseProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MetaSaveRequestProto;
|
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MetaSaveRequestProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MetaSaveResponseProto;
|
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MetaSaveResponseProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MkdirsRequestProto;
|
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MkdirsRequestProto;
|
||||||
|
@ -1565,4 +1568,21 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
|
||||||
throw new ServiceException(e);
|
throw new ServiceException(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ListOpenFilesResponseProto listOpenFiles(RpcController controller,
|
||||||
|
ListOpenFilesRequestProto req) throws ServiceException {
|
||||||
|
try {
|
||||||
|
BatchedEntries<OpenFileEntry> entries = server.listOpenFiles(req.getId());
|
||||||
|
ListOpenFilesResponseProto.Builder builder =
|
||||||
|
ListOpenFilesResponseProto.newBuilder();
|
||||||
|
builder.setHasMore(entries.hasMore());
|
||||||
|
for (int i = 0; i < entries.size(); i++) {
|
||||||
|
builder.addEntries(PBHelperClient.convert(entries.get(i)));
|
||||||
|
}
|
||||||
|
return builder.build();
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new ServiceException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -203,6 +203,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.OpenFileEntry;
|
||||||
import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
|
import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
|
||||||
import org.apache.hadoop.hdfs.protocol.RollingUpgradeException;
|
import org.apache.hadoop.hdfs.protocol.RollingUpgradeException;
|
||||||
import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
|
import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
|
||||||
|
@ -411,6 +412,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
||||||
/** Maximum time the lock is hold to release lease. */
|
/** Maximum time the lock is hold to release lease. */
|
||||||
private final long maxLockHoldToReleaseLeaseMs;
|
private final long maxLockHoldToReleaseLeaseMs;
|
||||||
|
|
||||||
|
// Batch size for open files response
|
||||||
|
private final int maxListOpenFilesResponses;
|
||||||
|
|
||||||
// Scan interval is not configurable.
|
// Scan interval is not configurable.
|
||||||
private static final long DELEGATION_TOKEN_REMOVER_SCAN_INTERVAL =
|
private static final long DELEGATION_TOKEN_REMOVER_SCAN_INTERVAL =
|
||||||
TimeUnit.MILLISECONDS.convert(1, TimeUnit.HOURS);
|
TimeUnit.MILLISECONDS.convert(1, TimeUnit.HOURS);
|
||||||
|
@ -880,6 +884,14 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
||||||
inodeAttributeProvider = ReflectionUtils.newInstance(klass, conf);
|
inodeAttributeProvider = ReflectionUtils.newInstance(klass, conf);
|
||||||
LOG.info("Using INode attribute provider: " + klass.getName());
|
LOG.info("Using INode attribute provider: " + klass.getName());
|
||||||
}
|
}
|
||||||
|
this.maxListOpenFilesResponses = conf.getInt(
|
||||||
|
DFSConfigKeys.DFS_NAMENODE_LIST_OPENFILES_NUM_RESPONSES,
|
||||||
|
DFSConfigKeys.DFS_NAMENODE_LIST_OPENFILES_NUM_RESPONSES_DEFAULT
|
||||||
|
);
|
||||||
|
Preconditions.checkArgument(maxListOpenFilesResponses > 0,
|
||||||
|
DFSConfigKeys.DFS_NAMENODE_LIST_OPENFILES_NUM_RESPONSES +
|
||||||
|
" must be a positive integer."
|
||||||
|
);
|
||||||
} catch(IOException e) {
|
} catch(IOException e) {
|
||||||
LOG.error(getClass().getSimpleName() + " initialization failed.", e);
|
LOG.error(getClass().getSimpleName() + " initialization failed.", e);
|
||||||
close();
|
close();
|
||||||
|
@ -911,6 +923,10 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
||||||
return maxLockHoldToReleaseLeaseMs;
|
return maxLockHoldToReleaseLeaseMs;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public int getMaxListOpenFilesResponses() {
|
||||||
|
return maxListOpenFilesResponses;
|
||||||
|
}
|
||||||
|
|
||||||
void lockRetryCache() {
|
void lockRetryCache() {
|
||||||
if (retryCache != null) {
|
if (retryCache != null) {
|
||||||
retryCache.lock();
|
retryCache.lock();
|
||||||
|
@ -1688,6 +1704,35 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
||||||
blockManager.metaSave(out);
|
blockManager.metaSave(out);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* List open files in the system in batches. prevId is the cursor INode id and
|
||||||
|
* the open files returned in a batch will have their INode ids greater than
|
||||||
|
* this cursor. Open files can only be requested by super user and the the
|
||||||
|
* list across batches does not represent a consistent view of all open files.
|
||||||
|
*
|
||||||
|
* @param prevId the cursor INode id.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
BatchedListEntries<OpenFileEntry> listOpenFiles(long prevId)
|
||||||
|
throws IOException {
|
||||||
|
final String operationName = "listOpenFiles";
|
||||||
|
checkSuperuserPrivilege();
|
||||||
|
checkOperation(OperationCategory.READ);
|
||||||
|
readLock();
|
||||||
|
BatchedListEntries<OpenFileEntry> batchedListEntries;
|
||||||
|
try {
|
||||||
|
checkOperation(OperationCategory.READ);
|
||||||
|
batchedListEntries = leaseManager.getUnderConstructionFiles(prevId);
|
||||||
|
} catch (AccessControlException e) {
|
||||||
|
logAuditEvent(false, operationName, null);
|
||||||
|
throw e;
|
||||||
|
} finally {
|
||||||
|
readUnlock(operationName);
|
||||||
|
}
|
||||||
|
logAuditEvent(true, operationName, null);
|
||||||
|
return batchedListEntries;
|
||||||
|
}
|
||||||
|
|
||||||
private String metaSaveAsString() {
|
private String metaSaveAsString() {
|
||||||
StringWriter sw = new StringWriter();
|
StringWriter sw = new StringWriter();
|
||||||
PrintWriter pw = new PrintWriter(sw);
|
PrintWriter pw = new PrintWriter(sw);
|
||||||
|
|
|
@ -24,17 +24,19 @@ import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.PriorityQueue;
|
import java.util.PriorityQueue;
|
||||||
import java.util.SortedMap;
|
import java.util.SortedMap;
|
||||||
import java.util.TreeMap;
|
import java.util.TreeMap;
|
||||||
|
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.OpenFileEntry;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
||||||
import org.apache.hadoop.util.Daemon;
|
import org.apache.hadoop.util.Daemon;
|
||||||
|
@ -90,7 +92,7 @@ public class LeaseManager {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
// INodeID -> Lease
|
// INodeID -> Lease
|
||||||
private final HashMap<Long, Lease> leasesById = new HashMap<>();
|
private final TreeMap<Long, Lease> leasesById = new TreeMap<>();
|
||||||
|
|
||||||
private Daemon lmthread;
|
private Daemon lmthread;
|
||||||
private volatile boolean shouldRunMonitor;
|
private volatile boolean shouldRunMonitor;
|
||||||
|
@ -151,6 +153,52 @@ public class LeaseManager {
|
||||||
|
|
||||||
Collection<Long> getINodeIdWithLeases() {return leasesById.keySet();}
|
Collection<Long> getINodeIdWithLeases() {return leasesById.keySet();}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get a batch of under construction files from the currently active leases.
|
||||||
|
* File INodeID is the cursor used to fetch new batch of results and the
|
||||||
|
* batch size is configurable using below config param. Since the list is
|
||||||
|
* fetched in batches, it does not represent a consistent view of all
|
||||||
|
* open files.
|
||||||
|
*
|
||||||
|
* @see org.apache.hadoop.hdfs.DFSConfigKeys#DFS_NAMENODE_LIST_OPENFILES_NUM_RESPONSES
|
||||||
|
* @param prevId the INodeID cursor
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public BatchedListEntries<OpenFileEntry> getUnderConstructionFiles(
|
||||||
|
final long prevId) throws IOException {
|
||||||
|
assert fsnamesystem.hasReadLock();
|
||||||
|
SortedMap<Long, Lease> remainingLeases;
|
||||||
|
synchronized (this) {
|
||||||
|
remainingLeases = leasesById.tailMap(prevId, false);
|
||||||
|
}
|
||||||
|
Collection<Long> inodeIds = remainingLeases.keySet();
|
||||||
|
final int numResponses = Math.min(
|
||||||
|
this.fsnamesystem.getMaxListOpenFilesResponses(), inodeIds.size());
|
||||||
|
final List<OpenFileEntry> openFileEntries =
|
||||||
|
Lists.newArrayListWithExpectedSize(numResponses);
|
||||||
|
|
||||||
|
int count = 0;
|
||||||
|
for (Long inodeId: inodeIds) {
|
||||||
|
final INodeFile inodeFile =
|
||||||
|
fsnamesystem.getFSDirectory().getInode(inodeId).asFile();
|
||||||
|
if (!inodeFile.isUnderConstruction()) {
|
||||||
|
LOG.warn("The file " + inodeFile.getFullPathName()
|
||||||
|
+ " is not under construction but has lease.");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
openFileEntries.add(new OpenFileEntry(
|
||||||
|
inodeFile.getId(), inodeFile.getFullPathName(),
|
||||||
|
inodeFile.getFileUnderConstructionFeature().getClientName(),
|
||||||
|
inodeFile.getFileUnderConstructionFeature().getClientMachine()));
|
||||||
|
count++;
|
||||||
|
if (count >= numResponses) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
boolean hasMore = (numResponses < remainingLeases.size());
|
||||||
|
return new BatchedListEntries<>(openFileEntries, hasMore);
|
||||||
|
}
|
||||||
|
|
||||||
/** @return the lease containing src */
|
/** @return the lease containing src */
|
||||||
public synchronized Lease getLease(INodeFile src) {return leasesById.get(src.getId());}
|
public synchronized Lease getLease(INodeFile src) {return leasesById.get(src.getId());}
|
||||||
|
|
||||||
|
|
|
@ -104,6 +104,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||||
import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
|
import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.OpenFileEntry;
|
||||||
import org.apache.hadoop.hdfs.protocol.QuotaByStorageTypeExceededException;
|
import org.apache.hadoop.hdfs.protocol.QuotaByStorageTypeExceededException;
|
||||||
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
|
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
|
||||||
import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
|
import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
|
||||||
|
@ -1260,6 +1261,13 @@ public class NameNodeRpcServer implements NamenodeProtocols {
|
||||||
namesystem.metaSave(filename);
|
namesystem.metaSave(filename);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override // ClientProtocol
|
||||||
|
public BatchedEntries<OpenFileEntry> listOpenFiles(long prevId)
|
||||||
|
throws IOException {
|
||||||
|
checkNNStartup();
|
||||||
|
return namesystem.listOpenFiles(prevId);
|
||||||
|
}
|
||||||
|
|
||||||
@Override // ClientProtocol
|
@Override // ClientProtocol
|
||||||
public CorruptFileBlocks listCorruptFileBlocks(String path, String cookie)
|
public CorruptFileBlocks listCorruptFileBlocks(String path, String cookie)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
|
|
@ -48,7 +48,9 @@ import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.FsShell;
|
import org.apache.hadoop.fs.FsShell;
|
||||||
import org.apache.hadoop.fs.FsStatus;
|
import org.apache.hadoop.fs.FsStatus;
|
||||||
|
import org.apache.hadoop.fs.FsTracer;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.RemoteIterator;
|
||||||
import org.apache.hadoop.fs.shell.Command;
|
import org.apache.hadoop.fs.shell.Command;
|
||||||
import org.apache.hadoop.fs.shell.CommandFormat;
|
import org.apache.hadoop.fs.shell.CommandFormat;
|
||||||
import org.apache.hadoop.fs.shell.PathData;
|
import org.apache.hadoop.fs.shell.PathData;
|
||||||
|
@ -71,6 +73,8 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.OpenFileEntry;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.OpenFilesIterator;
|
||||||
import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
|
import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.SnapshotException;
|
import org.apache.hadoop.hdfs.protocol.SnapshotException;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||||
|
@ -449,6 +453,7 @@ public class DFSAdmin extends FsShell {
|
||||||
"\t[-getDatanodeInfo <datanode_host:ipc_port>]\n" +
|
"\t[-getDatanodeInfo <datanode_host:ipc_port>]\n" +
|
||||||
"\t[-metasave filename]\n" +
|
"\t[-metasave filename]\n" +
|
||||||
"\t[-triggerBlockReport [-incremental] <datanode_host:ipc_port>]\n" +
|
"\t[-triggerBlockReport [-incremental] <datanode_host:ipc_port>]\n" +
|
||||||
|
"\t[-listOpenFiles]\n" +
|
||||||
"\t[-help [cmd]]\n";
|
"\t[-help [cmd]]\n";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -847,6 +852,45 @@ public class DFSAdmin extends FsShell {
|
||||||
return exitCode;
|
return exitCode;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Command to list all the open files currently managed by NameNode.
|
||||||
|
* Usage: hdfs dfsadmin -listOpenFiles
|
||||||
|
*
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public int listOpenFiles() throws IOException {
|
||||||
|
DistributedFileSystem dfs = getDFS();
|
||||||
|
Configuration dfsConf = dfs.getConf();
|
||||||
|
URI dfsUri = dfs.getUri();
|
||||||
|
boolean isHaEnabled = HAUtilClient.isLogicalUri(dfsConf, dfsUri);
|
||||||
|
|
||||||
|
RemoteIterator<OpenFileEntry> openFilesRemoteIterator;
|
||||||
|
if (isHaEnabled) {
|
||||||
|
ProxyAndInfo<ClientProtocol> proxy = NameNodeProxies.createNonHAProxy(
|
||||||
|
dfsConf, HAUtil.getAddressOfActive(getDFS()), ClientProtocol.class,
|
||||||
|
UserGroupInformation.getCurrentUser(), false);
|
||||||
|
openFilesRemoteIterator = new OpenFilesIterator(proxy.getProxy(),
|
||||||
|
FsTracer.get(dfsConf));
|
||||||
|
} else {
|
||||||
|
openFilesRemoteIterator = dfs.listOpenFiles();
|
||||||
|
}
|
||||||
|
printOpenFiles(openFilesRemoteIterator);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void printOpenFiles(RemoteIterator<OpenFileEntry> openFilesIterator)
|
||||||
|
throws IOException {
|
||||||
|
System.out.println(String.format("%-20s\t%-20s\t%s", "Client Host",
|
||||||
|
"Client Name", "Open File Path"));
|
||||||
|
while (openFilesIterator.hasNext()) {
|
||||||
|
OpenFileEntry openFileEntry = openFilesIterator.next();
|
||||||
|
System.out.println(String.format("%-20s\t%-20s\t%20s",
|
||||||
|
openFileEntry.getClientMachine(),
|
||||||
|
openFileEntry.getClientName(),
|
||||||
|
openFileEntry.getFilePath()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Command to ask the namenode to set the balancer bandwidth for all of the
|
* Command to ask the namenode to set the balancer bandwidth for all of the
|
||||||
* datanodes.
|
* datanodes.
|
||||||
|
@ -1095,6 +1139,10 @@ public class DFSAdmin extends FsShell {
|
||||||
+ "\tIf 'incremental' is specified, it will be an incremental\n"
|
+ "\tIf 'incremental' is specified, it will be an incremental\n"
|
||||||
+ "\tblock report; otherwise, it will be a full block report.\n";
|
+ "\tblock report; otherwise, it will be a full block report.\n";
|
||||||
|
|
||||||
|
String listOpenFiles = "-listOpenFiles\n"
|
||||||
|
+ "\tList all open files currently managed by the NameNode along\n"
|
||||||
|
+ "\twith client name and client machine accessing them.\n";
|
||||||
|
|
||||||
String help = "-help [cmd]: \tDisplays help for the given command or all commands if none\n" +
|
String help = "-help [cmd]: \tDisplays help for the given command or all commands if none\n" +
|
||||||
"\t\tis specified.\n";
|
"\t\tis specified.\n";
|
||||||
|
|
||||||
|
@ -1158,6 +1206,8 @@ public class DFSAdmin extends FsShell {
|
||||||
System.out.println(evictWriters);
|
System.out.println(evictWriters);
|
||||||
} else if ("getDatanodeInfo".equalsIgnoreCase(cmd)) {
|
} else if ("getDatanodeInfo".equalsIgnoreCase(cmd)) {
|
||||||
System.out.println(getDatanodeInfo);
|
System.out.println(getDatanodeInfo);
|
||||||
|
} else if ("listOpenFiles".equalsIgnoreCase(cmd)) {
|
||||||
|
System.out.println(listOpenFiles);
|
||||||
} else if ("help".equals(cmd)) {
|
} else if ("help".equals(cmd)) {
|
||||||
System.out.println(help);
|
System.out.println(help);
|
||||||
} else {
|
} else {
|
||||||
|
@ -1193,6 +1243,7 @@ public class DFSAdmin extends FsShell {
|
||||||
System.out.println(evictWriters);
|
System.out.println(evictWriters);
|
||||||
System.out.println(getDatanodeInfo);
|
System.out.println(getDatanodeInfo);
|
||||||
System.out.println(triggerBlockReport);
|
System.out.println(triggerBlockReport);
|
||||||
|
System.out.println(listOpenFiles);
|
||||||
System.out.println(help);
|
System.out.println(help);
|
||||||
System.out.println();
|
System.out.println();
|
||||||
ToolRunner.printGenericCommandUsage(System.out);
|
ToolRunner.printGenericCommandUsage(System.out);
|
||||||
|
@ -1748,6 +1799,8 @@ public class DFSAdmin extends FsShell {
|
||||||
} else if ("-triggerBlockReport".equals(cmd)) {
|
} else if ("-triggerBlockReport".equals(cmd)) {
|
||||||
System.err.println("Usage: hdfs dfsadmin"
|
System.err.println("Usage: hdfs dfsadmin"
|
||||||
+ " [-triggerBlockReport [-incremental] <datanode_host:ipc_port>]");
|
+ " [-triggerBlockReport [-incremental] <datanode_host:ipc_port>]");
|
||||||
|
} else if ("-listOpenFiles".equals(cmd)) {
|
||||||
|
System.err.println("Usage: hdfs dfsadmin [-listOpenFiles]");
|
||||||
} else {
|
} else {
|
||||||
System.err.println("Usage: hdfs dfsadmin");
|
System.err.println("Usage: hdfs dfsadmin");
|
||||||
System.err.println("Note: Administrative commands can only be run as the HDFS superuser.");
|
System.err.println("Note: Administrative commands can only be run as the HDFS superuser.");
|
||||||
|
@ -1896,6 +1949,11 @@ public class DFSAdmin extends FsShell {
|
||||||
printUsage(cmd);
|
printUsage(cmd);
|
||||||
return exitCode;
|
return exitCode;
|
||||||
}
|
}
|
||||||
|
} else if ("-listOpenFiles".equals(cmd)) {
|
||||||
|
if (argv.length != 1) {
|
||||||
|
printUsage(cmd);
|
||||||
|
return exitCode;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// initialize DFSAdmin
|
// initialize DFSAdmin
|
||||||
|
@ -1975,6 +2033,8 @@ public class DFSAdmin extends FsShell {
|
||||||
exitCode = reconfig(argv, i);
|
exitCode = reconfig(argv, i);
|
||||||
} else if ("-triggerBlockReport".equals(cmd)) {
|
} else if ("-triggerBlockReport".equals(cmd)) {
|
||||||
exitCode = triggerBlockReport(argv);
|
exitCode = triggerBlockReport(argv);
|
||||||
|
} else if ("-listOpenFiles".equals(cmd)) {
|
||||||
|
exitCode = listOpenFiles();
|
||||||
} else if ("-help".equals(cmd)) {
|
} else if ("-help".equals(cmd)) {
|
||||||
if (i < argv.length) {
|
if (i < argv.length) {
|
||||||
printHelp(argv[i]);
|
printHelp(argv[i]);
|
||||||
|
|
|
@ -2683,6 +2683,16 @@
|
||||||
</description>
|
</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>dfs.namenode.list.openfiles.num.responses</name>
|
||||||
|
<value>1000</value>
|
||||||
|
<description>
|
||||||
|
When listing open files, the maximum number of open files that will be
|
||||||
|
returned in a single batch. Fetching the list incrementally in batches
|
||||||
|
improves namenode performance.
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<name>dfs.namenode.edekcacheloader.interval.ms</name>
|
<name>dfs.namenode.edekcacheloader.interval.ms</name>
|
||||||
<value>1000</value>
|
<value>1000</value>
|
||||||
|
|
|
@ -352,6 +352,7 @@ Usage:
|
||||||
[-getDatanodeInfo <datanode_host:ipc_port>]
|
[-getDatanodeInfo <datanode_host:ipc_port>]
|
||||||
[-evictWriters <datanode_host:ipc_port>]
|
[-evictWriters <datanode_host:ipc_port>]
|
||||||
[-triggerBlockReport [-incremental] <datanode_host:ipc_port>]
|
[-triggerBlockReport [-incremental] <datanode_host:ipc_port>]
|
||||||
|
[-listOpenFiles]
|
||||||
[-help [cmd]]
|
[-help [cmd]]
|
||||||
|
|
||||||
| COMMAND\_OPTION | Description |
|
| COMMAND\_OPTION | Description |
|
||||||
|
@ -387,6 +388,7 @@ Usage:
|
||||||
| `-evictWriters` \<datanode\_host:ipc\_port\> | Make the datanode evict all clients that are writing a block. This is useful if decommissioning is hung due to slow writers. |
|
| `-evictWriters` \<datanode\_host:ipc\_port\> | Make the datanode evict all clients that are writing a block. This is useful if decommissioning is hung due to slow writers. |
|
||||||
| `-getDatanodeInfo` \<datanode\_host:ipc\_port\> | Get the information about the given datanode. See [Rolling Upgrade document](./HdfsRollingUpgrade.html#dfsadmin_-getDatanodeInfo) for the detail. |
|
| `-getDatanodeInfo` \<datanode\_host:ipc\_port\> | Get the information about the given datanode. See [Rolling Upgrade document](./HdfsRollingUpgrade.html#dfsadmin_-getDatanodeInfo) for the detail. |
|
||||||
| `-triggerBlockReport` `[-incremental]` \<datanode\_host:ipc\_port\> | Trigger a block report for the given datanode. If 'incremental' is specified, it will be otherwise, it will be a full block report. |
|
| `-triggerBlockReport` `[-incremental]` \<datanode\_host:ipc\_port\> | Trigger a block report for the given datanode. If 'incremental' is specified, it will be otherwise, it will be a full block report. |
|
||||||
|
| `-listOpenFiles` | List all open files currently managed by the NameNode along with client name and client machine accessing them. |
|
||||||
| `-help` [cmd] | Displays help for the given command or all commands if none is specified. |
|
| `-help` [cmd] | Displays help for the given command or all commands if none is specified. |
|
||||||
|
|
||||||
Runs a HDFS dfsadmin client.
|
Runs a HDFS dfsadmin client.
|
||||||
|
|
|
@ -62,8 +62,10 @@ import java.util.Arrays;
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Map.Entry;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
@ -1775,8 +1777,8 @@ public class DFSTestUtil {
|
||||||
GenericTestUtils.setLogLevel(NameNode.blockStateChangeLog, level);
|
GenericTestUtils.setLogLevel(NameNode.blockStateChangeLog, level);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Change the length of a block at datanode dnIndex
|
* Change the length of a block at datanode dnIndex.
|
||||||
*/
|
*/
|
||||||
public static boolean changeReplicaLength(MiniDFSCluster cluster,
|
public static boolean changeReplicaLength(MiniDFSCluster cluster,
|
||||||
ExtendedBlock blk, int dnIndex, int lenDelta) throws IOException {
|
ExtendedBlock blk, int dnIndex, int lenDelta) throws IOException {
|
||||||
|
@ -2055,4 +2057,38 @@ public class DFSTestUtil {
|
||||||
assertFalse("File in trash : " + trashPath, fs.exists(trashPath));
|
assertFalse("File in trash : " + trashPath, fs.exists(trashPath));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static Map<Path, FSDataOutputStream> createOpenFiles(FileSystem fs,
|
||||||
|
String filePrefix, int numFilesToCreate) throws IOException {
|
||||||
|
final Map<Path, FSDataOutputStream> filesCreated = new HashMap<>();
|
||||||
|
final byte[] buffer = new byte[(int) (1024 * 1.75)];
|
||||||
|
final Random rand = new Random(0xFEED0BACL);
|
||||||
|
for (int i = 0; i < numFilesToCreate; i++) {
|
||||||
|
Path file = new Path("/" + filePrefix + "-" + i);
|
||||||
|
FSDataOutputStream stm = fs.create(file, true, 1024, (short) 1, 1024);
|
||||||
|
rand.nextBytes(buffer);
|
||||||
|
stm.write(buffer);
|
||||||
|
filesCreated.put(file, stm);
|
||||||
|
}
|
||||||
|
return filesCreated;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static HashSet<Path> closeOpenFiles(
|
||||||
|
HashMap<Path, FSDataOutputStream> openFilesMap,
|
||||||
|
int numFilesToClose) throws IOException {
|
||||||
|
HashSet<Path> closedFiles = new HashSet<>();
|
||||||
|
for (Iterator<Entry<Path, FSDataOutputStream>> it =
|
||||||
|
openFilesMap.entrySet().iterator(); it.hasNext();) {
|
||||||
|
Entry<Path, FSDataOutputStream> entry = it.next();
|
||||||
|
LOG.info("Closing file: " + entry.getKey());
|
||||||
|
entry.getValue().close();
|
||||||
|
closedFiles.add(entry.getKey());
|
||||||
|
it.remove();
|
||||||
|
numFilesToClose--;
|
||||||
|
if (numFilesToClose == 0) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return closedFiles;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,12 +18,14 @@
|
||||||
package org.apache.hadoop.hdfs;
|
package org.apache.hadoop.hdfs;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.net.URISyntaxException;
|
import java.net.URISyntaxException;
|
||||||
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
|
@ -31,11 +33,14 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.crypto.key.JavaKeyStoreProvider;
|
import org.apache.hadoop.crypto.key.JavaKeyStoreProvider;
|
||||||
import org.apache.hadoop.fs.BlockStoragePolicySpi;
|
import org.apache.hadoop.fs.BlockStoragePolicySpi;
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||||
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.FileSystemTestHelper;
|
import org.apache.hadoop.fs.FileSystemTestHelper;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.RemoteIterator;
|
||||||
import org.apache.hadoop.hdfs.client.HdfsAdmin;
|
import org.apache.hadoop.hdfs.client.HdfsAdmin;
|
||||||
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.OpenFileEntry;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
@ -49,11 +54,15 @@ public class TestHdfsAdmin {
|
||||||
private static final Path TEST_PATH = new Path("/test");
|
private static final Path TEST_PATH = new Path("/test");
|
||||||
private static final short REPL = 1;
|
private static final short REPL = 1;
|
||||||
private static final int SIZE = 128;
|
private static final int SIZE = 128;
|
||||||
|
private static final int OPEN_FILES_BATCH_SIZE = 5;
|
||||||
private final Configuration conf = new Configuration();
|
private final Configuration conf = new Configuration();
|
||||||
private MiniDFSCluster cluster;
|
private MiniDFSCluster cluster;
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setUpCluster() throws IOException {
|
public void setUpCluster() throws IOException {
|
||||||
|
conf.setLong(
|
||||||
|
DFSConfigKeys.DFS_NAMENODE_LIST_OPENFILES_NUM_RESPONSES,
|
||||||
|
OPEN_FILES_BATCH_SIZE);
|
||||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
|
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
|
||||||
cluster.waitActive();
|
cluster.waitActive();
|
||||||
}
|
}
|
||||||
|
@ -205,4 +214,54 @@ public class TestHdfsAdmin {
|
||||||
Assert.assertNotNull("should not return null for an encrypted cluster",
|
Assert.assertNotNull("should not return null for an encrypted cluster",
|
||||||
hdfsAdmin.getKeyProvider());
|
hdfsAdmin.getKeyProvider());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 120000L)
|
||||||
|
public void testListOpenFiles() throws IOException {
|
||||||
|
HashSet<Path> closedFileSet = new HashSet<>();
|
||||||
|
HashMap<Path, FSDataOutputStream> openFileMap = new HashMap<>();
|
||||||
|
FileSystem fs = FileSystem.get(conf);
|
||||||
|
verifyOpenFiles(closedFileSet, openFileMap);
|
||||||
|
|
||||||
|
int numClosedFiles = OPEN_FILES_BATCH_SIZE * 4;
|
||||||
|
int numOpenFiles = (OPEN_FILES_BATCH_SIZE * 3) + 1;
|
||||||
|
for (int i = 0; i < numClosedFiles; i++) {
|
||||||
|
Path filePath = new Path("/closed-file-" + i);
|
||||||
|
DFSTestUtil.createFile(fs, filePath, SIZE, REPL, 0);
|
||||||
|
closedFileSet.add(filePath);
|
||||||
|
}
|
||||||
|
verifyOpenFiles(closedFileSet, openFileMap);
|
||||||
|
|
||||||
|
openFileMap.putAll(
|
||||||
|
DFSTestUtil.createOpenFiles(fs, "open-file-1", numOpenFiles));
|
||||||
|
verifyOpenFiles(closedFileSet, openFileMap);
|
||||||
|
|
||||||
|
closedFileSet.addAll(DFSTestUtil.closeOpenFiles(openFileMap,
|
||||||
|
openFileMap.size() / 2));
|
||||||
|
verifyOpenFiles(closedFileSet, openFileMap);
|
||||||
|
|
||||||
|
openFileMap.putAll(
|
||||||
|
DFSTestUtil.createOpenFiles(fs, "open-file-2", 10));
|
||||||
|
verifyOpenFiles(closedFileSet, openFileMap);
|
||||||
|
|
||||||
|
while(openFileMap.size() > 0) {
|
||||||
|
closedFileSet.addAll(DFSTestUtil.closeOpenFiles(openFileMap, 1));
|
||||||
|
verifyOpenFiles(closedFileSet, openFileMap);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void verifyOpenFiles(HashSet<Path> closedFiles,
|
||||||
|
HashMap<Path, FSDataOutputStream> openFileMap) throws IOException {
|
||||||
|
HdfsAdmin hdfsAdmin = new HdfsAdmin(FileSystem.getDefaultUri(conf), conf);
|
||||||
|
HashSet<Path> openFiles = new HashSet<>(openFileMap.keySet());
|
||||||
|
RemoteIterator<OpenFileEntry> openFilesRemoteItr =
|
||||||
|
hdfsAdmin.listOpenFiles();
|
||||||
|
while (openFilesRemoteItr.hasNext()) {
|
||||||
|
String filePath = openFilesRemoteItr.next().getFilePath();
|
||||||
|
assertFalse(filePath + " should not be listed under open files!",
|
||||||
|
closedFiles.contains(filePath));
|
||||||
|
assertTrue(filePath + " is not listed under open files!",
|
||||||
|
openFiles.remove(new Path(filePath)));
|
||||||
|
}
|
||||||
|
assertTrue("Not all open files are listed!", openFiles.isEmpty());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,234 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.server.namenode;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.ThreadLocalRandom;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries;
|
||||||
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||||
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
|
import org.apache.hadoop.hdfs.HAUtil;
|
||||||
|
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.OpenFileEntry;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
||||||
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
|
import org.apache.hadoop.hdfs.tools.DFSAdmin;
|
||||||
|
import org.apache.hadoop.util.ToolRunner;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Verify open files listing.
|
||||||
|
*/
|
||||||
|
public class TestListOpenFiles {
|
||||||
|
private static final int NUM_DATA_NODES = 3;
|
||||||
|
private static final int BATCH_SIZE = 5;
|
||||||
|
private static MiniDFSCluster cluster = null;
|
||||||
|
private static DistributedFileSystem fs = null;
|
||||||
|
private static NamenodeProtocols nnRpc = null;
|
||||||
|
private static final Log LOG = LogFactory.getLog(TestListOpenFiles.class);
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() throws IOException {
|
||||||
|
Configuration conf = new HdfsConfiguration();
|
||||||
|
conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
|
||||||
|
conf.setLong(
|
||||||
|
DFSConfigKeys.DFS_NAMENODE_LIST_OPENFILES_NUM_RESPONSES, BATCH_SIZE);
|
||||||
|
cluster = new MiniDFSCluster.Builder(conf).
|
||||||
|
numDataNodes(NUM_DATA_NODES).build();
|
||||||
|
cluster.waitActive();
|
||||||
|
fs = cluster.getFileSystem();
|
||||||
|
nnRpc = cluster.getNameNodeRpc();
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() throws IOException {
|
||||||
|
if (fs != null) {
|
||||||
|
fs.close();
|
||||||
|
}
|
||||||
|
if (cluster != null) {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 120000L)
|
||||||
|
public void testListOpenFilesViaNameNodeRPC() throws Exception {
|
||||||
|
HashMap<Path, FSDataOutputStream> openFiles = new HashMap<>();
|
||||||
|
createFiles(fs, "closed", 10);
|
||||||
|
verifyOpenFiles(openFiles);
|
||||||
|
|
||||||
|
BatchedEntries<OpenFileEntry> openFileEntryBatchedEntries =
|
||||||
|
nnRpc.listOpenFiles(0);
|
||||||
|
assertTrue("Open files list should be empty!",
|
||||||
|
openFileEntryBatchedEntries.size() == 0);
|
||||||
|
|
||||||
|
openFiles.putAll(
|
||||||
|
DFSTestUtil.createOpenFiles(fs, "open-1", 1));
|
||||||
|
verifyOpenFiles(openFiles);
|
||||||
|
|
||||||
|
openFiles.putAll(
|
||||||
|
DFSTestUtil.createOpenFiles(fs, "open-2",
|
||||||
|
(BATCH_SIZE * 2 + BATCH_SIZE / 2)));
|
||||||
|
verifyOpenFiles(openFiles);
|
||||||
|
|
||||||
|
DFSTestUtil.closeOpenFiles(openFiles, openFiles.size() / 2);
|
||||||
|
verifyOpenFiles(openFiles);
|
||||||
|
|
||||||
|
openFiles.putAll(
|
||||||
|
DFSTestUtil.createOpenFiles(fs, "open-3", (BATCH_SIZE * 5)));
|
||||||
|
verifyOpenFiles(openFiles);
|
||||||
|
|
||||||
|
while(openFiles.size() > 0) {
|
||||||
|
DFSTestUtil.closeOpenFiles(openFiles, 1);
|
||||||
|
verifyOpenFiles(openFiles);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void verifyOpenFiles(Map<Path, FSDataOutputStream> openFiles)
|
||||||
|
throws IOException {
|
||||||
|
HashSet<Path> remainingFiles = new HashSet<>(openFiles.keySet());
|
||||||
|
OpenFileEntry lastEntry = null;
|
||||||
|
BatchedEntries<OpenFileEntry> batchedEntries;
|
||||||
|
do {
|
||||||
|
if (lastEntry == null) {
|
||||||
|
batchedEntries = nnRpc.listOpenFiles(0);
|
||||||
|
} else {
|
||||||
|
batchedEntries = nnRpc.listOpenFiles(lastEntry.getId());
|
||||||
|
}
|
||||||
|
assertTrue("Incorrect open files list size!",
|
||||||
|
batchedEntries.size() <= BATCH_SIZE);
|
||||||
|
for (int i = 0; i < batchedEntries.size(); i++) {
|
||||||
|
lastEntry = batchedEntries.get(i);
|
||||||
|
String filePath = lastEntry.getFilePath();
|
||||||
|
LOG.info("OpenFile: " + filePath);
|
||||||
|
assertTrue("Unexpected open file: " + filePath,
|
||||||
|
remainingFiles.remove(new Path(filePath)));
|
||||||
|
}
|
||||||
|
} while (batchedEntries.hasMore());
|
||||||
|
assertTrue(remainingFiles.size() + " open files not listed!",
|
||||||
|
remainingFiles.size() == 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
private Set<Path> createFiles(FileSystem fileSystem, String fileNamePrefix,
|
||||||
|
int numFilesToCreate) throws IOException {
|
||||||
|
HashSet<Path> files = new HashSet<>();
|
||||||
|
for (int i = 0; i < numFilesToCreate; i++) {
|
||||||
|
Path filePath = new Path(fileNamePrefix + "-" + i);
|
||||||
|
DFSTestUtil.createFile(fileSystem, filePath, 1024, (short) 3, 1);
|
||||||
|
}
|
||||||
|
return files;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Verify dfsadmin -listOpenFiles command in HA mode.
|
||||||
|
*/
|
||||||
|
@Test(timeout = 120000)
|
||||||
|
public void testListOpenFilesInHA() throws Exception {
|
||||||
|
fs.close();
|
||||||
|
cluster.shutdown();
|
||||||
|
HdfsConfiguration haConf = new HdfsConfiguration();
|
||||||
|
haConf.setLong(
|
||||||
|
DFSConfigKeys.DFS_NAMENODE_LIST_OPENFILES_NUM_RESPONSES, BATCH_SIZE);
|
||||||
|
MiniDFSCluster haCluster =
|
||||||
|
new MiniDFSCluster.Builder(haConf)
|
||||||
|
.nnTopology(MiniDFSNNTopology.simpleHATopology())
|
||||||
|
.numDataNodes(0)
|
||||||
|
.build();
|
||||||
|
try {
|
||||||
|
HATestUtil.setFailoverConfigurations(haCluster, haConf);
|
||||||
|
FileSystem fileSystem = HATestUtil.configureFailoverFs(haCluster, haConf);
|
||||||
|
|
||||||
|
List<ClientProtocol> namenodes =
|
||||||
|
HAUtil.getProxiesForAllNameNodesInNameservice(haConf,
|
||||||
|
HATestUtil.getLogicalHostname(haCluster));
|
||||||
|
haCluster.transitionToActive(0);
|
||||||
|
assertTrue(HAUtil.isAtLeastOneActive(namenodes));
|
||||||
|
|
||||||
|
final byte[] data = new byte[1024];
|
||||||
|
ThreadLocalRandom.current().nextBytes(data);
|
||||||
|
DFSTestUtil.createOpenFiles(fileSystem, "ha-open-file",
|
||||||
|
((BATCH_SIZE * 4) + (BATCH_SIZE / 2)));
|
||||||
|
|
||||||
|
final DFSAdmin dfsAdmin = new DFSAdmin(haConf);
|
||||||
|
final AtomicBoolean failoverCompleted = new AtomicBoolean(false);
|
||||||
|
final AtomicBoolean listOpenFilesError = new AtomicBoolean(false);
|
||||||
|
final int listingIntervalMsec = 250;
|
||||||
|
Thread clientThread = new Thread(new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
while(!failoverCompleted.get()) {
|
||||||
|
try {
|
||||||
|
assertEquals(0, ToolRunner.run(dfsAdmin,
|
||||||
|
new String[] {"-listOpenFiles"}));
|
||||||
|
// Sleep for some time to avoid
|
||||||
|
// flooding logs with listing.
|
||||||
|
Thread.sleep(listingIntervalMsec);
|
||||||
|
} catch (Exception e) {
|
||||||
|
listOpenFilesError.set(true);
|
||||||
|
LOG.info("Error listing open files: ", e);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
clientThread.start();
|
||||||
|
|
||||||
|
// Let client list open files for few
|
||||||
|
// times before the NN failover.
|
||||||
|
Thread.sleep(listingIntervalMsec * 2);
|
||||||
|
|
||||||
|
LOG.info("Shutting down Active NN0!");
|
||||||
|
haCluster.shutdownNameNode(0);
|
||||||
|
LOG.info("Transitioning NN1 to Active!");
|
||||||
|
haCluster.transitionToActive(1);
|
||||||
|
failoverCompleted.set(true);
|
||||||
|
|
||||||
|
assertEquals(0, ToolRunner.run(dfsAdmin,
|
||||||
|
new String[] {"-listOpenFiles"}));
|
||||||
|
assertFalse("Client Error!", listOpenFilesError.get());
|
||||||
|
|
||||||
|
clientThread.join();
|
||||||
|
} finally {
|
||||||
|
if (haCluster != null) {
|
||||||
|
haCluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -23,9 +23,12 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
|
||||||
import com.google.common.base.Supplier;
|
import com.google.common.base.Supplier;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import org.apache.commons.lang.text.StrBuilder;
|
import org.apache.commons.lang.text.StrBuilder;
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.conf.ReconfigurationUtil;
|
import org.apache.hadoop.conf.ReconfigurationUtil;
|
||||||
import org.apache.hadoop.fs.ChecksumException;
|
import org.apache.hadoop.fs.ChecksumException;
|
||||||
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hdfs.DFSClient;
|
import org.apache.hadoop.hdfs.DFSClient;
|
||||||
|
@ -54,6 +57,8 @@ import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.PrintStream;
|
import java.io.PrintStream;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Scanner;
|
import java.util.Scanner;
|
||||||
|
|
||||||
|
@ -74,6 +79,7 @@ import static org.mockito.Mockito.when;
|
||||||
* set/clrSpaceQuote are tested in {@link org.apache.hadoop.hdfs.TestQuota}.
|
* set/clrSpaceQuote are tested in {@link org.apache.hadoop.hdfs.TestQuota}.
|
||||||
*/
|
*/
|
||||||
public class TestDFSAdmin {
|
public class TestDFSAdmin {
|
||||||
|
private static final Log LOG = LogFactory.getLog(TestDFSAdmin.class);
|
||||||
private Configuration conf = null;
|
private Configuration conf = null;
|
||||||
private MiniDFSCluster cluster;
|
private MiniDFSCluster cluster;
|
||||||
private DFSAdmin admin;
|
private DFSAdmin admin;
|
||||||
|
@ -478,6 +484,75 @@ public class TestDFSAdmin {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 300000L)
|
||||||
|
public void testListOpenFiles() throws Exception {
|
||||||
|
redirectStream();
|
||||||
|
|
||||||
|
final Configuration dfsConf = new HdfsConfiguration();
|
||||||
|
dfsConf.setInt(
|
||||||
|
DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 500);
|
||||||
|
dfsConf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
|
||||||
|
dfsConf.setLong(DFSConfigKeys.DFS_NAMENODE_LIST_OPENFILES_NUM_RESPONSES, 5);
|
||||||
|
final Path baseDir = new Path(
|
||||||
|
PathUtils.getTestDir(getClass()).getAbsolutePath(),
|
||||||
|
GenericTestUtils.getMethodName());
|
||||||
|
dfsConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.toString());
|
||||||
|
|
||||||
|
final int numDataNodes = 3;
|
||||||
|
final int numClosedFiles = 25;
|
||||||
|
final int numOpenFiles = 15;
|
||||||
|
|
||||||
|
try(MiniDFSCluster miniCluster = new MiniDFSCluster
|
||||||
|
.Builder(dfsConf)
|
||||||
|
.numDataNodes(numDataNodes).build()) {
|
||||||
|
final short replFactor = 1;
|
||||||
|
final long fileLength = 512L;
|
||||||
|
final FileSystem fs = miniCluster.getFileSystem();
|
||||||
|
final Path parentDir = new Path("/tmp/files/");
|
||||||
|
|
||||||
|
fs.mkdirs(parentDir);
|
||||||
|
HashSet<Path> closedFileSet = new HashSet<>();
|
||||||
|
for (int i = 0; i < numClosedFiles; i++) {
|
||||||
|
Path file = new Path(parentDir, "closed-file-" + i);
|
||||||
|
DFSTestUtil.createFile(fs, file, fileLength, replFactor, 12345L);
|
||||||
|
closedFileSet.add(file);
|
||||||
|
}
|
||||||
|
|
||||||
|
HashMap<Path, FSDataOutputStream> openFilesMap = new HashMap<>();
|
||||||
|
for (int i = 0; i < numOpenFiles; i++) {
|
||||||
|
Path file = new Path(parentDir, "open-file-" + i);
|
||||||
|
DFSTestUtil.createFile(fs, file, fileLength, replFactor, 12345L);
|
||||||
|
FSDataOutputStream outputStream = fs.append(file);
|
||||||
|
openFilesMap.put(file, outputStream);
|
||||||
|
}
|
||||||
|
|
||||||
|
final DFSAdmin dfsAdmin = new DFSAdmin(dfsConf);
|
||||||
|
assertEquals(0, ToolRunner.run(dfsAdmin,
|
||||||
|
new String[]{"-listOpenFiles"}));
|
||||||
|
verifyOpenFilesListing(closedFileSet, openFilesMap);
|
||||||
|
|
||||||
|
for (int count = 0; count < numOpenFiles; count++) {
|
||||||
|
closedFileSet.addAll(DFSTestUtil.closeOpenFiles(openFilesMap, 1));
|
||||||
|
resetStream();
|
||||||
|
assertEquals(0, ToolRunner.run(dfsAdmin,
|
||||||
|
new String[]{"-listOpenFiles"}));
|
||||||
|
verifyOpenFilesListing(closedFileSet, openFilesMap);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void verifyOpenFilesListing(HashSet<Path> closedFileSet,
|
||||||
|
HashMap<Path, FSDataOutputStream> openFilesMap) {
|
||||||
|
final String outStr = scanIntoString(out);
|
||||||
|
LOG.info("dfsadmin -listOpenFiles output: \n" + out);
|
||||||
|
for (Path closedFilePath : closedFileSet) {
|
||||||
|
assertThat(outStr, not(containsString(closedFilePath.toString() + "\n")));
|
||||||
|
}
|
||||||
|
for (Path openFilePath : openFilesMap.keySet()) {
|
||||||
|
assertThat(outStr, is(containsString(openFilePath.toString() + "\n")));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void verifyNodesAndCorruptBlocks(
|
private void verifyNodesAndCorruptBlocks(
|
||||||
final int numDn,
|
final int numDn,
|
||||||
final int numLiveDn,
|
final int numLiveDn,
|
||||||
|
|
Loading…
Reference in New Issue