From 710555e1503c9690852fb071ace53c06c3e793f1 Mon Sep 17 00:00:00 2001 From: Manoj Govindassamy Date: Tue, 2 Jan 2018 14:59:36 -0800 Subject: [PATCH] HDFS-11847. Enhance dfsadmin listOpenFiles command to list files blocking datanode decommissioning. --- .../org/apache/hadoop/hdfs/DFSClient.java | 16 +- .../hadoop/hdfs/DistributedFileSystem.java | 8 + .../apache/hadoop/hdfs/client/HdfsAdmin.java | 7 + .../hadoop/hdfs/protocol/ClientProtocol.java | 16 ++ .../hdfs/protocol/OpenFilesIterator.java | 36 +++- .../ClientNamenodeProtocolTranslatorPB.java | 18 +- .../hdfs/protocolPB/PBHelperClient.java | 26 +++ .../main/proto/ClientNamenodeProtocol.proto | 7 + ...amenodeProtocolServerSideTranslatorPB.java | 7 +- .../server/blockmanagement/BlockManager.java | 2 +- .../blockmanagement/DatanodeAdminManager.java | 25 ++- .../blockmanagement/DatanodeDescriptor.java | 24 ++- .../federation/router/RouterRpcServer.java | 10 +- .../hdfs/server/namenode/FSNamesystem.java | 49 ++++- .../server/namenode/NameNodeRpcServer.java | 10 +- .../apache/hadoop/hdfs/tools/DFSAdmin.java | 36 +++- .../src/site/markdown/HDFSCommands.md | 2 +- .../hadoop/hdfs/AdminStatesBaseTest.java | 18 +- .../apache/hadoop/hdfs/TestDecommission.java | 177 ++++++++++++++++++ .../org/apache/hadoop/hdfs/TestHdfsAdmin.java | 4 +- .../blockmanagement/BlockManagerTestUtil.java | 12 +- .../server/namenode/TestLeaseManager.java | 48 ++--- .../server/namenode/TestListOpenFiles.java | 27 ++- 23 files changed, 522 insertions(+), 63 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index ff613849863..83c3b946bb2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -131,6 +131,7 @@ import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException; import org.apache.hadoop.hdfs.protocol.OpenFileEntry; import org.apache.hadoop.hdfs.protocol.OpenFilesIterator; +import org.apache.hadoop.hdfs.protocol.OpenFilesIterator.OpenFilesType; import org.apache.hadoop.hdfs.protocol.QuotaByStorageTypeExceededException; import org.apache.hadoop.hdfs.protocol.ReencryptionStatusIterator; import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo; @@ -3026,8 +3027,21 @@ Tracer getTracer() { * * @throws IOException */ + @Deprecated public RemoteIterator listOpenFiles() throws IOException { checkOpen(); - return new OpenFilesIterator(namenode, tracer); + return listOpenFiles(EnumSet.of(OpenFilesType.ALL_OPEN_FILES)); + } + + /** + * Get a remote iterator to the open files list by type, managed by NameNode. + * + * @param openFilesTypes + * @throws IOException + */ + public RemoteIterator listOpenFiles( + EnumSet openFilesTypes) throws IOException { + checkOpen(); + return new OpenFilesIterator(namenode, tracer, openFilesTypes); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java index cecd9d16311..54b428e12d8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java @@ -85,6 +85,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus; import org.apache.hadoop.hdfs.protocol.OpenFileEntry; +import org.apache.hadoop.hdfs.protocol.OpenFilesIterator.OpenFilesType; import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus; import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo; import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport; @@ -2978,10 +2979,17 @@ public HdfsDataOutputStreamBuilder createFile(Path path) { *

* This method can only be called by HDFS superusers. */ + @Deprecated public RemoteIterator listOpenFiles() throws IOException { return dfs.listOpenFiles(); } + public RemoteIterator listOpenFiles( + EnumSet openFilesTypes) throws IOException { + return dfs.listOpenFiles(openFilesTypes); + } + + /** * Create a {@link HdfsDataOutputStreamBuilder} to append a file on DFS. * diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsAdmin.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsAdmin.java index 91161674b5c..e6200397862 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsAdmin.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsAdmin.java @@ -46,6 +46,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants.ReencryptAction; import org.apache.hadoop.hdfs.protocol.OpenFileEntry; +import org.apache.hadoop.hdfs.protocol.OpenFilesIterator.OpenFilesType; import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus; import org.apache.hadoop.security.AccessControlException; @@ -652,8 +653,14 @@ private void provisionEZTrash(Path path) throws IOException { *

* This method can only be called by HDFS superusers. */ + @Deprecated public RemoteIterator listOpenFiles() throws IOException { return dfs.listOpenFiles(); } + public RemoteIterator listOpenFiles( + EnumSet openFilesTypes) throws IOException { + return dfs.listOpenFiles(openFilesTypes); + } + } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java index f61ec75a6fd..80e053cf932 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java @@ -43,6 +43,7 @@ import org.apache.hadoop.hdfs.inotify.EventBatchList; import org.apache.hadoop.hdfs.protocol.HdfsConstants.ReencryptAction; import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction; +import org.apache.hadoop.hdfs.protocol.OpenFilesIterator.OpenFilesType; import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector; @@ -1669,5 +1670,20 @@ AddErasureCodingPolicyResponse[] addErasureCodingPolicies( * @throws IOException */ @Idempotent + @Deprecated BatchedEntries listOpenFiles(long prevId) throws IOException; + + /** + * List open files in the system in batches. INode id is the cursor and the + * open files returned in a batch will have their INode ids greater than + * the cursor INode id. Open files can only be requested by super user and + * the the list across batches are not atomic. + * + * @param prevId the cursor INode id. + * @param openFilesTypes types to filter the open files + * @throws IOException + */ + @Idempotent + BatchedEntries listOpenFiles(long prevId, + EnumSet openFilesTypes) throws IOException; } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/OpenFilesIterator.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/OpenFilesIterator.java index c24e58591e4..d113d65c094 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/OpenFilesIterator.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/OpenFilesIterator.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.protocol; import java.io.IOException; +import java.util.EnumSet; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -35,20 +36,51 @@ @InterfaceStability.Evolving public class OpenFilesIterator extends BatchedRemoteIterator { + + /** + * Open file types to filter the results. + */ + public enum OpenFilesType { + + ALL_OPEN_FILES((short) 0x01), + BLOCKING_DECOMMISSION((short) 0x02); + + private final short mode; + OpenFilesType(short mode) { + this.mode = mode; + } + + public short getMode() { + return mode; + } + + public static OpenFilesType valueOf(short num) { + for (OpenFilesType type : OpenFilesType.values()) { + if (type.getMode() == num) { + return type; + } + } + return null; + } + } + private final ClientProtocol namenode; private final Tracer tracer; + private final EnumSet types; - public OpenFilesIterator(ClientProtocol namenode, Tracer tracer) { + public OpenFilesIterator(ClientProtocol namenode, Tracer tracer, + EnumSet types) { super(HdfsConstants.GRANDFATHER_INODE_ID); this.namenode = namenode; this.tracer = tracer; + this.types = types; } @Override public BatchedEntries makeRequest(Long prevId) throws IOException { try (TraceScope ignored = tracer.newScope("listOpenFiles")) { - return namenode.listOpenFiles(prevId); + return namenode.listOpenFiles(prevId, types); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java index aef7c1e9dec..c2973020db3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java @@ -74,6 +74,7 @@ import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.protocol.OpenFilesIterator.OpenFilesType; import org.apache.hadoop.hdfs.protocol.ReplicatedBlockStats; import org.apache.hadoop.hdfs.protocol.OpenFileEntry; import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus; @@ -1846,13 +1847,24 @@ public QuotaUsage getQuotaUsage(String path) throws IOException { } } + @Deprecated @Override public BatchedEntries listOpenFiles(long prevId) throws IOException { - ListOpenFilesRequestProto req = - ListOpenFilesRequestProto.newBuilder().setId(prevId).build(); + return listOpenFiles(prevId, EnumSet.of(OpenFilesType.ALL_OPEN_FILES)); + } + + @Override + public BatchedEntries listOpenFiles(long prevId, + EnumSet openFilesTypes) throws IOException { + ListOpenFilesRequestProto.Builder req = + ListOpenFilesRequestProto.newBuilder().setId(prevId); + if (openFilesTypes != null) { + req.addAllTypes(PBHelperClient.convertOpenFileTypes(openFilesTypes)); + } try { - ListOpenFilesResponseProto response = rpcProxy.listOpenFiles(null, req); + ListOpenFilesResponseProto response = + rpcProxy.listOpenFiles(null, req.build()); List openFileEntries = Lists.newArrayListWithCapacity(response.getEntriesCount()); for (OpenFilesBatchResponseProto p : response.getEntriesList()) { diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java index c0df3259fea..8d25d2c9eb7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java @@ -94,6 +94,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; +import org.apache.hadoop.hdfs.protocol.OpenFilesIterator.OpenFilesType; import org.apache.hadoop.hdfs.protocol.ReplicatedBlockStats; import org.apache.hadoop.hdfs.protocol.OpenFileEntry; import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo; @@ -128,6 +129,7 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFsReplicatedBlockStatsResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFsStatsResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.OpenFilesBatchResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.OpenFilesTypeProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RollingUpgradeActionProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RollingUpgradeInfoProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SafeModeActionProto; @@ -3078,4 +3080,28 @@ public static List convertAddBlockFlags( } return ret; } + + public static EnumSet convertOpenFileTypes( + List openFilesTypeProtos) { + EnumSet types = EnumSet.noneOf(OpenFilesType.class); + for (OpenFilesTypeProto af : openFilesTypeProtos) { + OpenFilesType type = OpenFilesType.valueOf((short)af.getNumber()); + if (type != null) { + types.add(type); + } + } + return types; + } + + public static List convertOpenFileTypes( + EnumSet types) { + List typeProtos = new ArrayList<>(); + for (OpenFilesType type : types) { + OpenFilesTypeProto typeProto = OpenFilesTypeProto.valueOf(type.getMode()); + if (typeProto != null) { + typeProtos.add(typeProto); + } + } + return typeProtos; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto index 6db6ad0804c..216821ae0dc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto @@ -777,8 +777,14 @@ message GetEditsFromTxidResponseProto { required EventsListProto eventsList = 1; } +enum OpenFilesTypeProto { + ALL_OPEN_FILES = 1; + BLOCKING_DECOMMISSION = 2; +} + message ListOpenFilesRequestProto { required int64 id = 1; + repeated OpenFilesTypeProto types = 2; } message OpenFilesBatchResponseProto { @@ -791,6 +797,7 @@ message OpenFilesBatchResponseProto { message ListOpenFilesResponseProto { repeated OpenFilesBatchResponseProto entries = 1; required bool hasMore = 2; + repeated OpenFilesTypeProto types = 3; } service ClientNamenodeProtocol { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java index dee3f25c283..d34032d5ece 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java @@ -53,6 +53,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.OpenFileEntry; +import org.apache.hadoop.hdfs.protocol.OpenFilesIterator.OpenFilesType; import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo; import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport; import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus; @@ -1807,13 +1808,17 @@ public GetQuotaUsageResponseProto getQuotaUsage( public ListOpenFilesResponseProto listOpenFiles(RpcController controller, ListOpenFilesRequestProto req) throws ServiceException { try { - BatchedEntries entries = server.listOpenFiles(req.getId()); + EnumSet openFilesTypes = + PBHelperClient.convertOpenFileTypes(req.getTypesList()); + BatchedEntries entries = server.listOpenFiles(req.getId(), + openFilesTypes); ListOpenFilesResponseProto.Builder builder = ListOpenFilesResponseProto.newBuilder(); builder.setHasMore(entries.hasMore()); for (int i = 0; i < entries.size(); i++) { builder.addEntries(PBHelperClient.convert(entries.get(i))); } + builder.addAllTypes(req.getTypesList()); return builder.build(); } catch (IOException e) { throw new ServiceException(e); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index ae04aace3b3..757016b216a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -2255,7 +2255,7 @@ DatanodeDescriptor[] chooseSourceDatanodes(BlockInfo block, * If there were any reconstruction requests that timed out, reap them * and put them back into the neededReconstruction queue */ - private void processPendingReconstructions() { + void processPendingReconstructions() { BlockInfo[] timedOutItems = pendingReconstruction.getTimedOutBlocks(); if (timedOutItems != null) { namesystem.writeLock(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminManager.java index 928036af869..e3385912612 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminManager.java @@ -36,10 +36,14 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdfs.server.namenode.INode; +import org.apache.hadoop.hdfs.server.namenode.INodeFile; import org.apache.hadoop.hdfs.server.namenode.INodeId; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.Namesystem; import org.apache.hadoop.hdfs.util.CyclicIteration; +import org.apache.hadoop.hdfs.util.LightWeightHashSet; +import org.apache.hadoop.hdfs.util.LightWeightLinkedSet; import org.apache.hadoop.util.ChunkedArrayList; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -649,8 +653,10 @@ private void processBlocksInternal( boolean pruneReliableBlocks) { boolean firstReplicationLog = true; // Low redundancy in UC Blocks only - int lowRedundancyInOpenFiles = 0; - // All low redundancy blocks. Includes lowRedundancyInOpenFiles. + int lowRedundancyBlocksInOpenFiles = 0; + LightWeightHashSet lowRedundancyOpenFiles = + new LightWeightLinkedSet<>(); + // All low redundancy blocks. Includes lowRedundancyOpenFiles. int lowRedundancyBlocks = 0; // All maintenance and decommission replicas. int outOfServiceOnlyReplicas = 0; @@ -737,15 +743,24 @@ private void processBlocksInternal( // Update various counts lowRedundancyBlocks++; if (bc.isUnderConstruction()) { - lowRedundancyInOpenFiles++; + INode ucFile = namesystem.getFSDirectory().getInode(bc.getId()); + if(!(ucFile instanceof INodeFile) || + !ucFile.asFile().isUnderConstruction()) { + LOG.warn("File " + ucFile.getLocalName() + " is not under " + + "construction. Skipping add to low redundancy open files!"); + } else { + lowRedundancyBlocksInOpenFiles++; + lowRedundancyOpenFiles.add(ucFile.getId()); + } } if ((liveReplicas == 0) && (num.outOfServiceReplicas() > 0)) { outOfServiceOnlyReplicas++; } } - datanode.getLeavingServiceStatus().set(lowRedundancyInOpenFiles, - lowRedundancyBlocks, outOfServiceOnlyReplicas); + datanode.getLeavingServiceStatus().set(lowRedundancyBlocksInOpenFiles, + lowRedundancyOpenFiles, lowRedundancyBlocks, + outOfServiceOnlyReplicas); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java index ee5aca141d8..e2b52f02ca4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java @@ -50,6 +50,7 @@ import org.apache.hadoop.hdfs.util.EnumCounters; import org.apache.hadoop.hdfs.util.LightWeightHashSet; import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; +import org.apache.hadoop.hdfs.util.LightWeightLinkedSet; import org.apache.hadoop.util.IntrusiveCollection; import org.apache.hadoop.util.Time; import org.slf4j.Logger; @@ -795,17 +796,21 @@ public boolean equals(Object obj) { /** Leaving service status. */ public class LeavingServiceStatus { private int underReplicatedBlocks; + private int underReplicatedBlocksInOpenFiles; private int outOfServiceOnlyReplicas; - private int underReplicatedInOpenFiles; + private LightWeightHashSet underReplicatedOpenFiles = + new LightWeightLinkedSet<>(); private long startTime; - synchronized void set(int underRepInOpenFiles, int underRepBlocks, - int outOfServiceOnlyRep) { + synchronized void set(int lowRedundancyBlocksInOpenFiles, + LightWeightHashSet underRepInOpenFiles, + int underRepBlocks, int outOfServiceOnlyRep) { if (!isDecommissionInProgress() && !isEnteringMaintenance()) { return; } - underReplicatedInOpenFiles = underRepInOpenFiles; + underReplicatedOpenFiles = underRepInOpenFiles; underReplicatedBlocks = underRepBlocks; + underReplicatedBlocksInOpenFiles = lowRedundancyBlocksInOpenFiles; outOfServiceOnlyReplicas = outOfServiceOnlyRep; } @@ -828,7 +833,14 @@ public synchronized int getUnderReplicatedInOpenFiles() { if (!isDecommissionInProgress() && !isEnteringMaintenance()) { return 0; } - return underReplicatedInOpenFiles; + return underReplicatedBlocksInOpenFiles; + } + /** @return the collection of under-replicated blocks in open files */ + public synchronized LightWeightHashSet getOpenFiles() { + if (!isDecommissionInProgress() && !isEnteringMaintenance()) { + return new LightWeightLinkedSet<>(); + } + return underReplicatedOpenFiles; } /** Set start time */ public synchronized void setStartTime(long time) { @@ -844,7 +856,7 @@ public synchronized long getStartTime() { } return startTime; } - } // End of class DecommissioningStatus + } // End of class LeavingServiceStatus /** * Set the flag to indicate if this datanode is disallowed from communicating diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java index a8c2257f8e2..d3d5959e797 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java @@ -89,6 +89,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.OpenFileEntry; +import org.apache.hadoop.hdfs.protocol.OpenFilesIterator.OpenFilesType; import org.apache.hadoop.hdfs.protocol.ReplicatedBlockStats; import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo; import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport; @@ -1913,9 +1914,16 @@ public ReplicatedBlockStats getReplicatedBlockStats() throws IOException { return null; } + @Deprecated @Override - public BatchedEntries listOpenFiles(long arg0) + public BatchedEntries listOpenFiles(long prevId) throws IOException { + return listOpenFiles(prevId, EnumSet.of(OpenFilesType.ALL_OPEN_FILES)); + } + + @Override + public BatchedEntries listOpenFiles(long prevId, + EnumSet openFilesTypes) throws IOException { checkOperation(OperationCategory.READ, false); return null; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 558fef72fe1..97423cbea3e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -90,6 +90,7 @@ import static org.apache.hadoop.hdfs.server.namenode.FSDirStatAndListingOp.*; import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyInfo; +import org.apache.hadoop.hdfs.protocol.OpenFilesIterator.OpenFilesType; import org.apache.hadoop.hdfs.protocol.ReplicatedBlockStats; import org.apache.hadoop.hdfs.protocol.ECBlockGroupStats; import org.apache.hadoop.hdfs.protocol.OpenFileEntry; @@ -272,6 +273,7 @@ import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks; import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary; +import org.apache.hadoop.hdfs.util.LightWeightHashSet; import org.apache.hadoop.hdfs.web.JsonUtil; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.Text; @@ -1743,12 +1745,14 @@ private void metaSave(PrintWriter out) { * the open files returned in a batch will have their INode ids greater than * this cursor. Open files can only be requested by super user and the the * list across batches does not represent a consistent view of all open files. + * TODO: HDFS-12969 - to report open files by type. * * @param prevId the cursor INode id. + * @param openFilesTypes * @throws IOException */ - BatchedListEntries listOpenFiles(long prevId) - throws IOException { + BatchedListEntries listOpenFiles(long prevId, + EnumSet openFilesTypes) throws IOException { final String operationName = "listOpenFiles"; checkSuperuserPrivilege(); checkOperation(OperationCategory.READ); @@ -1756,7 +1760,16 @@ BatchedListEntries listOpenFiles(long prevId) BatchedListEntries batchedListEntries; try { checkOperation(OperationCategory.READ); - batchedListEntries = leaseManager.getUnderConstructionFiles(prevId); + if(openFilesTypes.contains(OpenFilesType.ALL_OPEN_FILES)) { + batchedListEntries = leaseManager.getUnderConstructionFiles(prevId); + } else { + if(openFilesTypes.contains(OpenFilesType.BLOCKING_DECOMMISSION)) { + batchedListEntries = getFilesBlockingDecom(prevId); + } else { + throw new IllegalArgumentException("Unknown OpenFileType: " + + openFilesTypes); + } + } } catch (AccessControlException e) { logAuditEvent(false, operationName, null); throw e; @@ -1767,6 +1780,36 @@ BatchedListEntries listOpenFiles(long prevId) return batchedListEntries; } + public BatchedListEntries getFilesBlockingDecom(long prevId) { + assert hasReadLock(); + final List openFileEntries = Lists.newArrayList(); + LightWeightHashSet openFileIds = new LightWeightHashSet<>(); + for (DatanodeDescriptor dataNode : + blockManager.getDatanodeManager().getDatanodes()) { + for (long ucFileId : dataNode.getLeavingServiceStatus().getOpenFiles()) { + INode ucFile = getFSDirectory().getInode(ucFileId); + if (ucFile == null || ucFileId <= prevId || + openFileIds.contains(ucFileId)) { + // probably got deleted or + // part of previous batch or + // already part of the current batch + continue; + } + Preconditions.checkState(ucFile instanceof INodeFile); + openFileIds.add(ucFileId); + INodeFile inodeFile = ucFile.asFile(); + openFileEntries.add(new OpenFileEntry( + inodeFile.getId(), inodeFile.getFullPathName(), + inodeFile.getFileUnderConstructionFeature().getClientName(), + inodeFile.getFileUnderConstructionFeature().getClientMachine())); + if (openFileIds.size() >= this.maxListOpenFilesResponses) { + return new BatchedListEntries<>(openFileEntries, true); + } + } + } + return new BatchedListEntries<>(openFileEntries, false); + } + private String metaSaveAsString() { StringWriter sw = new StringWriter(); PrintWriter pw = new PrintWriter(sw); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index 283547e5f11..b44aaf10b62 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -114,6 +114,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException; import org.apache.hadoop.hdfs.protocol.OpenFileEntry; +import org.apache.hadoop.hdfs.protocol.OpenFilesIterator.OpenFilesType; import org.apache.hadoop.hdfs.protocol.QuotaByStorageTypeExceededException; import org.apache.hadoop.hdfs.protocol.QuotaExceededException; import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException; @@ -1319,11 +1320,18 @@ public void metaSave(String filename) throws IOException { namesystem.metaSave(filename); } + @Deprecated @Override // ClientProtocol public BatchedEntries listOpenFiles(long prevId) throws IOException { + return listOpenFiles(prevId, EnumSet.of(OpenFilesType.ALL_OPEN_FILES)); + } + + @Override // ClientProtocol + public BatchedEntries listOpenFiles(long prevId, + EnumSet openFilesTypes) throws IOException { checkNNStartup(); - return namesystem.listOpenFiles(prevId); + return namesystem.listOpenFiles(prevId, openFilesTypes); } @Override // ClientProtocol diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java index 9268f388710..7f79b1c653b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java @@ -29,6 +29,7 @@ import java.util.Collection; import java.util.Collections; import java.util.Date; +import java.util.EnumSet; import java.util.HashMap; import java.util.LinkedList; import java.util.List; @@ -66,6 +67,7 @@ import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.NameNodeProxies; import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo; +import org.apache.hadoop.hdfs.protocol.OpenFilesIterator.OpenFilesType; import org.apache.hadoop.hdfs.protocol.ReplicatedBlockStats; import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; import org.apache.hadoop.hdfs.protocol.ClientProtocol; @@ -462,7 +464,7 @@ static int run(DistributedFileSystem dfs, String[] argv, int idx) throws IOExcep "\t[-getDatanodeInfo ]\n" + "\t[-metasave filename]\n" + "\t[-triggerBlockReport [-incremental] ]\n" + - "\t[-listOpenFiles]\n" + + "\t[-listOpenFiles [-blockingDecommission]]\n" + "\t[-help [cmd]]\n"; /** @@ -913,8 +915,21 @@ public int refreshNodes() throws IOException { * Usage: hdfs dfsadmin -listOpenFiles * * @throws IOException + * @param argv */ - public int listOpenFiles() throws IOException { + public int listOpenFiles(String[] argv) throws IOException { + List types = new ArrayList<>(); + if (argv != null) { + List args = new ArrayList<>(Arrays.asList(argv)); + if (StringUtils.popOption("-blockingDecommission", args)) { + types.add(OpenFilesType.BLOCKING_DECOMMISSION); + } + } + if (types.isEmpty()) { + types.add(OpenFilesType.ALL_OPEN_FILES); + } + EnumSet openFilesTypes = EnumSet.copyOf(types); + DistributedFileSystem dfs = getDFS(); Configuration dfsConf = dfs.getConf(); URI dfsUri = dfs.getUri(); @@ -926,9 +941,9 @@ public int listOpenFiles() throws IOException { dfsConf, HAUtil.getAddressOfActive(getDFS()), ClientProtocol.class, UserGroupInformation.getCurrentUser(), false); openFilesRemoteIterator = new OpenFilesIterator(proxy.getProxy(), - FsTracer.get(dfsConf)); + FsTracer.get(dfsConf), openFilesTypes); } else { - openFilesRemoteIterator = dfs.listOpenFiles(); + openFilesRemoteIterator = dfs.listOpenFiles(openFilesTypes); } printOpenFiles(openFilesRemoteIterator); return 0; @@ -1214,9 +1229,11 @@ private void printHelp(String cmd) { + "\tIf 'incremental' is specified, it will be an incremental\n" + "\tblock report; otherwise, it will be a full block report.\n"; - String listOpenFiles = "-listOpenFiles\n" + String listOpenFiles = "-listOpenFiles [-blockingDecommission]\n" + "\tList all open files currently managed by the NameNode along\n" - + "\twith client name and client machine accessing them.\n"; + + "\twith client name and client machine accessing them.\n" + + "\tIf 'blockingDecommission' option is specified, it will list the\n" + + "\topen files only that are blocking the ongoing Decommission."; String help = "-help [cmd]: \tDisplays help for the given command or all commands if none\n" + "\t\tis specified.\n"; @@ -1964,7 +1981,8 @@ private static void printUsage(String cmd) { System.err.println("Usage: hdfs dfsadmin" + " [-triggerBlockReport [-incremental] ]"); } else if ("-listOpenFiles".equals(cmd)) { - System.err.println("Usage: hdfs dfsadmin [-listOpenFiles]"); + System.err.println("Usage: hdfs dfsadmin" + + " [-listOpenFiles [-blockingDecommission]]"); } else { System.err.println("Usage: hdfs dfsadmin"); System.err.println("Note: Administrative commands can only be run as the HDFS superuser."); @@ -2119,7 +2137,7 @@ public int run(String[] argv) throws Exception { return exitCode; } } else if ("-listOpenFiles".equals(cmd)) { - if (argv.length != 1) { + if ((argv.length != 1) && (argv.length != 2)) { printUsage(cmd); return exitCode; } @@ -2205,7 +2223,7 @@ public int run(String[] argv) throws Exception { } else if ("-triggerBlockReport".equals(cmd)) { exitCode = triggerBlockReport(argv); } else if ("-listOpenFiles".equals(cmd)) { - exitCode = listOpenFiles(); + exitCode = listOpenFiles(argv); } else if ("-help".equals(cmd)) { if (i < argv.length) { printHelp(argv[i]); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md index ce61fa95fc3..db983e238ec 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md +++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md @@ -407,7 +407,7 @@ Usage: | `-getDatanodeInfo` \ | Get the information about the given datanode. See [Rolling Upgrade document](./HdfsRollingUpgrade.html#dfsadmin_-getDatanodeInfo) for the detail. | | `-metasave` filename | Save Namenode's primary data structures to *filename* in the directory specified by hadoop.log.dir property. *filename* is overwritten if it exists. *filename* will contain one line for each of the following
1. Datanodes heart beating with Namenode
2. Blocks waiting to be replicated
3. Blocks currently being replicated
4. Blocks waiting to be deleted | | `-triggerBlockReport` `[-incremental]` \ | Trigger a block report for the given datanode. If 'incremental' is specified, it will be otherwise, it will be a full block report. | -| `-listOpenFiles` | List all open files currently managed by the NameNode along with client name and client machine accessing them. | +| `-listOpenFiles` `[-blockingDecommission]` | List all open files currently managed by the NameNode along with client name and client machine accessing them. | | `-help` [cmd] | Displays help for the given command or all commands if none is specified. | Runs a HDFS dfsadmin client. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AdminStatesBaseTest.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AdminStatesBaseTest.java index c0cef192ff1..5d96b7bb76a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AdminStatesBaseTest.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AdminStatesBaseTest.java @@ -388,9 +388,19 @@ protected static void validateCluster(DFSClient client, int numDNs) protected void startCluster(int numNameNodes, int numDatanodes, boolean setupHostsFile, long[] nodesCapacity, boolean checkDataNodeHostConfig) throws IOException { + startCluster(numNameNodes, numDatanodes, setupHostsFile, nodesCapacity, + checkDataNodeHostConfig, true); + } + + protected void startCluster(int numNameNodes, int numDatanodes, + boolean setupHostsFile, long[] nodesCapacity, + boolean checkDataNodeHostConfig, boolean federation) throws IOException { MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf) - .nnTopology(MiniDFSNNTopology.simpleFederatedTopology(numNameNodes)) .numDataNodes(numDatanodes); + if (federation) { + builder.nnTopology( + MiniDFSNNTopology.simpleFederatedTopology(numNameNodes)); + } if (setupHostsFile) { builder.setupHostsFile(setupHostsFile); } @@ -413,6 +423,12 @@ protected void startCluster(int numNameNodes, int numDatanodes) startCluster(numNameNodes, numDatanodes, false, null, false); } + protected void startSimpleCluster(int numNameNodes, int numDatanodes) + throws IOException { + startCluster(numNameNodes, numDatanodes, false, null, false, false); + } + + protected void startSimpleHACluster(int numDatanodes) throws IOException { cluster = new MiniDFSCluster.Builder(conf) .nnTopology(MiniDFSNNTopology.simpleHATopology()).numDataNodes( diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java index ac14a2a41ac..d82025c5703 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java @@ -22,16 +22,23 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.io.PrintStream; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Scanner; import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicBoolean; import com.google.common.base.Supplier; import com.google.common.collect.Lists; +import org.apache.commons.lang.text.StrBuilder; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.FSDataOutputStream; @@ -60,7 +67,9 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStatistics; +import org.apache.hadoop.hdfs.tools.DFSAdmin; import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.util.ToolRunner; import org.apache.log4j.Level; import org.junit.Assert; import org.junit.Ignore; @@ -651,6 +660,174 @@ public void testDecommissionWithOpenfile() fdos.close(); } + private static String scanIntoString(final ByteArrayOutputStream baos) { + final StrBuilder sb = new StrBuilder(); + final Scanner scanner = new Scanner(baos.toString()); + while (scanner.hasNextLine()) { + sb.appendln(scanner.nextLine()); + } + scanner.close(); + return sb.toString(); + } + + private boolean verifyOpenFilesListing(String message, + HashSet closedFileSet, + HashMap openFilesMap, + ByteArrayOutputStream out, int expOpenFilesListSize) { + final String outStr = scanIntoString(out); + LOG.info(message + " - stdout: \n" + outStr); + for (Path closedFilePath : closedFileSet) { + if(outStr.contains(closedFilePath.toString())) { + return false; + } + } + HashSet openFilesNotListed = new HashSet<>(); + for (Path openFilePath : openFilesMap.keySet()) { + if(!outStr.contains(openFilePath.toString())) { + openFilesNotListed.add(openFilePath); + } + } + int actualOpenFilesListedSize = + openFilesMap.size() - openFilesNotListed.size(); + if (actualOpenFilesListedSize >= expOpenFilesListSize) { + return true; + } else { + LOG.info("Open files that are not listed yet: " + openFilesNotListed); + return false; + } + } + + private void verifyOpenFilesBlockingDecommission(HashSet closedFileSet, + HashMap openFilesMap, final int maxOpenFiles) + throws Exception { + final PrintStream oldStreamOut = System.out; + try { + final ByteArrayOutputStream toolOut = new ByteArrayOutputStream(); + System.setOut(new PrintStream(toolOut)); + final DFSAdmin dfsAdmin = new DFSAdmin(getConf()); + + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + try { + toolOut.reset(); + assertEquals(0, ToolRunner.run(dfsAdmin, + new String[]{"-listOpenFiles", "-blockingDecommission"})); + toolOut.flush(); + return verifyOpenFilesListing( + "dfsadmin -listOpenFiles -blockingDecommission", + closedFileSet, openFilesMap, toolOut, maxOpenFiles); + } catch (Exception e) { + LOG.warn("Unexpected exception: " + e); + } + return false; + } + }, 1000, 60000); + } finally { + System.setOut(oldStreamOut); + } + } + + @Test(timeout=180000) + public void testDecommissionWithOpenfileReporting() + throws Exception { + LOG.info("Starting test testDecommissionWithOpenfileReporting"); + + // Disable redundancy monitor check so that open files blocking + // decommission can be listed and verified. + getConf().setInt( + DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY, + 1000); + getConf().setLong( + DFSConfigKeys.DFS_NAMENODE_LIST_OPENFILES_NUM_RESPONSES, 1); + + //At most 1 node can be decommissioned + startSimpleCluster(1, 4); + + FileSystem fileSys = getCluster().getFileSystem(0); + FSNamesystem ns = getCluster().getNamesystem(0); + + final String[] closedFiles = new String[3]; + final String[] openFiles = new String[3]; + HashSet closedFileSet = new HashSet<>(); + HashMap openFilesMap = new HashMap<>(); + for (int i = 0; i < 3; i++) { + closedFiles[i] = "/testDecommissionWithOpenfileReporting.closed." + i; + openFiles[i] = "/testDecommissionWithOpenfileReporting.open." + i; + writeFile(fileSys, new Path(closedFiles[i]), (short)3, 10); + closedFileSet.add(new Path(closedFiles[i])); + writeFile(fileSys, new Path(openFiles[i]), (short)3, 10); + FSDataOutputStream fdos = fileSys.append(new Path(openFiles[i])); + openFilesMap.put(new Path(openFiles[i]), fdos); + } + + HashMap dnInfoMap = new HashMap<>(); + for (int i = 0; i < 3; i++) { + LocatedBlocks lbs = NameNodeAdapter.getBlockLocations( + getCluster().getNameNode(0), openFiles[i], 0, blockSize * 10); + for (DatanodeInfo dn : lbs.getLastLocatedBlock().getLocations()) { + if (dnInfoMap.containsKey(dn)) { + dnInfoMap.put(dn, dnInfoMap.get(dn) + 1); + } else { + dnInfoMap.put(dn, 1); + } + } + } + + DatanodeInfo dnToDecommission = null; + int maxDnOccurance = 0; + for (Map.Entry entry : dnInfoMap.entrySet()) { + if (entry.getValue() > maxDnOccurance) { + maxDnOccurance = entry.getValue(); + dnToDecommission = entry.getKey(); + } + } + LOG.info("XXX Dn to decommission: " + dnToDecommission + ", max: " + + maxDnOccurance); + + //decommission one of the 3 nodes which have last block + DatanodeManager dm = ns.getBlockManager().getDatanodeManager(); + ArrayList nodes = new ArrayList<>(); + dnToDecommission = dm.getDatanode(dnToDecommission.getDatanodeUuid()); + nodes.add(dnToDecommission.getXferAddr()); + initExcludeHosts(nodes); + refreshNodes(0); + waitNodeState(dnToDecommission, AdminStates.DECOMMISSION_INPROGRESS); + + // list and verify all the open files that are blocking decommission + verifyOpenFilesBlockingDecommission( + closedFileSet, openFilesMap, maxDnOccurance); + + final AtomicBoolean stopRedundancyMonitor = new AtomicBoolean(false); + Thread monitorThread = new Thread(new Runnable() { + @Override + public void run() { + while (!stopRedundancyMonitor.get()) { + try { + BlockManagerTestUtil.checkRedundancy( + getCluster().getNamesystem().getBlockManager()); + BlockManagerTestUtil.updateState( + getCluster().getNamesystem().getBlockManager()); + Thread.sleep(1000); + } catch (Exception e) { + LOG.warn("Encountered exception during redundancy monitor: " + e); + } + } + } + }); + monitorThread.start(); + + waitNodeState(dnToDecommission, AdminStates.DECOMMISSIONED); + stopRedundancyMonitor.set(true); + monitorThread.join(); + + // Open file is no more blocking decommission as all its blocks + // are re-replicated. + openFilesMap.clear(); + verifyOpenFilesBlockingDecommission( + closedFileSet, openFilesMap, 0); + } + @Test(timeout = 360000) public void testDecommissionWithOpenFileAndBlockRecovery() throws IOException, InterruptedException { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHdfsAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHdfsAdmin.java index 685ea8b5019..3cb10bf1e9d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHdfsAdmin.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHdfsAdmin.java @@ -25,6 +25,7 @@ import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; +import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; import java.util.Set; @@ -41,6 +42,7 @@ import org.apache.hadoop.hdfs.client.HdfsAdmin; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.protocol.OpenFileEntry; +import org.apache.hadoop.hdfs.protocol.OpenFilesIterator.OpenFilesType; import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite; import org.junit.After; import org.junit.Assert; @@ -254,7 +256,7 @@ private void verifyOpenFiles(HashSet closedFiles, HdfsAdmin hdfsAdmin = new HdfsAdmin(FileSystem.getDefaultUri(conf), conf); HashSet openFiles = new HashSet<>(openFileMap.keySet()); RemoteIterator openFilesRemoteItr = - hdfsAdmin.listOpenFiles(); + hdfsAdmin.listOpenFiles(EnumSet.of(OpenFilesType.ALL_OPEN_FILES)); while (openFilesRemoteItr.hasNext()) { String filePath = openFilesRemoteItr.next().getFilePath(); assertFalse(filePath + " should not be listed under open files!", diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java index 7ee766fb294..dfb40a6c07b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java @@ -168,7 +168,17 @@ public static int getComputedDatanodeWork(final BlockManager blockManager) throw public static int computeInvalidationWork(BlockManager bm) { return bm.computeInvalidateWork(Integer.MAX_VALUE); } - + + /** + * Check the redundancy of blocks and trigger replication if needed. + * @param blockManager + */ + public static void checkRedundancy(final BlockManager blockManager) { + blockManager.computeDatanodeWork(); + blockManager.processPendingReconstructions(); + blockManager.rescanPostponedMisreplicatedBlocks(); + } + /** * Compute all the replication and invalidation work for the * given BlockManager. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestLeaseManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestLeaseManager.java index 55bc7c3bf85..0a8da4b01b5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestLeaseManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestLeaseManager.java @@ -206,7 +206,7 @@ public void testInodeWithLeases() throws Exception { HdfsConstants.GRANDFATHER_INODE_ID, DFSUtil.string2Bytes(""), perm, 0L); when(fsDirectory.getRoot()).thenReturn(rootInodeDirectory); - verifyINodeLeaseCounts(lm, rootInodeDirectory, 0, 0, 0); + verifyINodeLeaseCounts(fsNamesystem, lm, rootInodeDirectory, 0, 0, 0); for (Long iNodeId : iNodeIds) { INodeFile iNodeFile = stubInodeFile(iNodeId); @@ -215,13 +215,13 @@ public void testInodeWithLeases() throws Exception { when(fsDirectory.getInode(iNodeId)).thenReturn(iNodeFile); lm.addLease("holder_" + iNodeId, iNodeId); } - verifyINodeLeaseCounts(lm, rootInodeDirectory, iNodeIds.size(), - iNodeIds.size(), iNodeIds.size()); + verifyINodeLeaseCounts(fsNamesystem, lm, rootInodeDirectory, + iNodeIds.size(), iNodeIds.size(), iNodeIds.size()); for (Long iNodeId : iNodeIds) { lm.removeLease(iNodeId); } - verifyINodeLeaseCounts(lm, rootInodeDirectory, 0, 0, 0); + verifyINodeLeaseCounts(fsNamesystem, lm, rootInodeDirectory, 0, 0, 0); } /** @@ -246,41 +246,44 @@ public void testInodeWithLeasesAtScale() throws Exception { // Case 1: No open files int scale = 0; - testInodeWithLeasesAtScaleImpl(lm, fsDirectory, rootInodeDirectory, scale); + testInodeWithLeasesAtScaleImpl(fsNamesystem, lm, fsDirectory, + rootInodeDirectory, scale); for (int workerCount = 1; workerCount <= LeaseManager.INODE_FILTER_WORKER_COUNT_MAX / 2; workerCount++) { // Case 2: Open files count is half of worker task size scale = workerCount * LeaseManager.INODE_FILTER_WORKER_TASK_MIN / 2; - testInodeWithLeasesAtScaleImpl(lm, fsDirectory, + testInodeWithLeasesAtScaleImpl(fsNamesystem, lm, fsDirectory, rootInodeDirectory, scale); // Case 3: Open files count is 1 less of worker task size scale = workerCount * LeaseManager.INODE_FILTER_WORKER_TASK_MIN - 1; - testInodeWithLeasesAtScaleImpl(lm, fsDirectory, + testInodeWithLeasesAtScaleImpl(fsNamesystem, lm, fsDirectory, rootInodeDirectory, scale); // Case 4: Open files count is equal to worker task size scale = workerCount * LeaseManager.INODE_FILTER_WORKER_TASK_MIN; - testInodeWithLeasesAtScaleImpl(lm, fsDirectory, + testInodeWithLeasesAtScaleImpl(fsNamesystem, lm, fsDirectory, rootInodeDirectory, scale); // Case 5: Open files count is 1 more than worker task size scale = workerCount * LeaseManager.INODE_FILTER_WORKER_TASK_MIN + 1; - testInodeWithLeasesAtScaleImpl(lm, fsDirectory, + testInodeWithLeasesAtScaleImpl(fsNamesystem, lm, fsDirectory, rootInodeDirectory, scale); } // Case 6: Open files count is way more than worker count scale = 1279; - testInodeWithLeasesAtScaleImpl(lm, fsDirectory, rootInodeDirectory, scale); + testInodeWithLeasesAtScaleImpl(fsNamesystem, lm, fsDirectory, + rootInodeDirectory, scale); } - private void testInodeWithLeasesAtScaleImpl(final LeaseManager leaseManager, - final FSDirectory fsDirectory, INodeDirectory ancestorDirectory, - int scale) throws IOException { - verifyINodeLeaseCounts(leaseManager, ancestorDirectory, 0, 0, 0); + private void testInodeWithLeasesAtScaleImpl(FSNamesystem fsNamesystem, + final LeaseManager leaseManager, final FSDirectory fsDirectory, + INodeDirectory ancestorDirectory, int scale) throws IOException { + verifyINodeLeaseCounts( + fsNamesystem, leaseManager, ancestorDirectory, 0, 0, 0); Set iNodeIds = new HashSet<>(); for (int i = 0; i < scale; i++) { @@ -293,11 +296,12 @@ private void testInodeWithLeasesAtScaleImpl(final LeaseManager leaseManager, when(fsDirectory.getInode(iNodeId)).thenReturn(iNodeFile); leaseManager.addLease("holder_" + iNodeId, iNodeId); } - verifyINodeLeaseCounts(leaseManager, ancestorDirectory, iNodeIds.size(), - iNodeIds.size(), iNodeIds.size()); + verifyINodeLeaseCounts(fsNamesystem, leaseManager, + ancestorDirectory, iNodeIds.size(), iNodeIds.size(), iNodeIds.size()); leaseManager.removeAllLeases(); - verifyINodeLeaseCounts(leaseManager, ancestorDirectory, 0, 0, 0); + verifyINodeLeaseCounts(fsNamesystem, leaseManager, + ancestorDirectory, 0, 0, 0); } /** @@ -389,10 +393,10 @@ public void testInodeWithLeasesForAncestorDir() throws Exception { } - private void verifyINodeLeaseCounts(final LeaseManager leaseManager, - INodeDirectory ancestorDirectory, int iNodeIdWithLeaseCount, - int iNodeWithLeaseCount, int iNodeUnderAncestorLeaseCount) - throws IOException { + private void verifyINodeLeaseCounts(FSNamesystem fsNamesystem, + LeaseManager leaseManager, INodeDirectory ancestorDirectory, + int iNodeIdWithLeaseCount, int iNodeWithLeaseCount, + int iNodeUnderAncestorLeaseCount) throws IOException { assertEquals(iNodeIdWithLeaseCount, leaseManager.getINodeIdWithLeases().size()); assertEquals(iNodeWithLeaseCount, @@ -401,6 +405,8 @@ private void verifyINodeLeaseCounts(final LeaseManager leaseManager, leaseManager.getINodeWithLeases(ancestorDirectory).size()); assertEquals(iNodeIdWithLeaseCount, leaseManager.getUnderConstructionFiles(0).size()); + assertEquals(0, (fsNamesystem.getFilesBlockingDecom(0) == null ? + 0 : fsNamesystem.getFilesBlockingDecom(0).size())); } private Map createINodeTree(INodeDirectory parentDir, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListOpenFiles.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListOpenFiles.java index b29019437e1..cfee7ba46c3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListOpenFiles.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListOpenFiles.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertTrue; import java.io.IOException; +import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -43,6 +44,7 @@ import org.apache.hadoop.hdfs.MiniDFSNNTopology; import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.OpenFileEntry; +import org.apache.hadoop.hdfs.protocol.OpenFilesIterator.OpenFilesType; import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.hdfs.DFSConfigKeys; @@ -95,9 +97,13 @@ public void testListOpenFilesViaNameNodeRPC() throws Exception { verifyOpenFiles(openFiles); BatchedEntries openFileEntryBatchedEntries = - nnRpc.listOpenFiles(0); + nnRpc.listOpenFiles(0, EnumSet.of(OpenFilesType.ALL_OPEN_FILES)); assertTrue("Open files list should be empty!", openFileEntryBatchedEntries.size() == 0); + BatchedEntries openFilesBlockingDecomEntries = + nnRpc.listOpenFiles(0, EnumSet.of(OpenFilesType.BLOCKING_DECOMMISSION)); + assertTrue("Open files list blocking decommission should be empty!", + openFilesBlockingDecomEntries.size() == 0); openFiles.putAll( DFSTestUtil.createOpenFiles(fs, "open-1", 1)); @@ -121,16 +127,16 @@ public void testListOpenFilesViaNameNodeRPC() throws Exception { } } - private void verifyOpenFiles(Map openFiles) - throws IOException { + private void verifyOpenFiles(Map openFiles, + EnumSet openFilesTypes) throws IOException { HashSet remainingFiles = new HashSet<>(openFiles.keySet()); OpenFileEntry lastEntry = null; BatchedEntries batchedEntries; do { if (lastEntry == null) { - batchedEntries = nnRpc.listOpenFiles(0); + batchedEntries = nnRpc.listOpenFiles(0, openFilesTypes); } else { - batchedEntries = nnRpc.listOpenFiles(lastEntry.getId()); + batchedEntries = nnRpc.listOpenFiles(lastEntry.getId(), openFilesTypes); } assertTrue("Incorrect open files list size!", batchedEntries.size() <= BATCH_SIZE); @@ -146,6 +152,13 @@ private void verifyOpenFiles(Map openFiles) remainingFiles.size() == 0); } + private void verifyOpenFiles(Map openFiles) + throws IOException { + verifyOpenFiles(openFiles, EnumSet.of(OpenFilesType.ALL_OPEN_FILES)); + verifyOpenFiles(new HashMap<>(), + EnumSet.of(OpenFilesType.BLOCKING_DECOMMISSION)); + } + private Set createFiles(FileSystem fileSystem, String fileNamePrefix, int numFilesToCreate) throws IOException { HashSet files = new HashSet<>(); @@ -197,6 +210,8 @@ public void run() { try { assertEquals(0, ToolRunner.run(dfsAdmin, new String[] {"-listOpenFiles"})); + assertEquals(0, ToolRunner.run(dfsAdmin, + new String[] {"-listOpenFiles", "-blockingDecommission"})); // Sleep for some time to avoid // flooding logs with listing. Thread.sleep(listingIntervalMsec); @@ -222,6 +237,8 @@ public void run() { assertEquals(0, ToolRunner.run(dfsAdmin, new String[] {"-listOpenFiles"})); + assertEquals(0, ToolRunner.run(dfsAdmin, + new String[] {"-listOpenFiles", "-blockingDecommission"})); assertFalse("Client Error!", listOpenFilesError.get()); clientThread.join();