diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index fe43d05da72..7e62d6348b8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -916,6 +916,9 @@ Release 2.7.0 - UNRELEASED provided by the client is larger than the one stored in the datanode. (Brahma Reddy Battula via szetszwo) + HDFS-7960. The full block report should prune zombie storages even if + they're not empty. (cmccabe and Eddy Xu via wang) + BREAKDOWN OF HDFS-7584 SUBTASKS AND RELATED JIRAS HDFS-7720. Quota by Storage Type API, tools and ClientNameNode diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java index c4003f157c0..825e83586b3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java @@ -47,6 +47,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReportBadBlo import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageBlockReportProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageReceivedDeletedBlocksProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.VersionRequestProto; +import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; @@ -169,7 +170,8 @@ public class DatanodeProtocolClientSideTranslatorPB implements @Override public DatanodeCommand blockReport(DatanodeRegistration registration, - String poolId, StorageBlockReport[] reports) throws IOException { + String poolId, StorageBlockReport[] reports, BlockReportContext context) + throws IOException { BlockReportRequestProto.Builder builder = BlockReportRequestProto .newBuilder().setRegistration(PBHelper.convert(registration)) .setBlockPoolId(poolId); @@ -191,6 +193,7 @@ public class DatanodeProtocolClientSideTranslatorPB implements } builder.addReports(reportBuilder.build()); } + builder.setContext(PBHelper.convert(context)); BlockReportResponseProto resp; try { resp = rpcProxy.blockReport(NULL_CONTROLLER, builder.build()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java index e18081f2814..873eb6d1708 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java @@ -161,7 +161,9 @@ public class DatanodeProtocolServerSideTranslatorPB implements } try { cmd = impl.blockReport(PBHelper.convert(request.getRegistration()), - request.getBlockPoolId(), report); + request.getBlockPoolId(), report, + request.hasContext() ? + PBHelper.convert(request.getContext()) : null); } catch (IOException e) { throw new ServiceException(e); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java index cda138a9406..eb834852965 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java @@ -111,6 +111,7 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.Rollin import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SafeModeActionProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmIdProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmSlotProto; +import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BalancerBandwidthCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockIdCommandProto; @@ -123,6 +124,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.NNHAStatusHe import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReceivedDeletedBlockInfoProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.RegisterCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.VolumeFailureSummaryProto; +import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportContextProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockKeyProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto; @@ -195,6 +197,7 @@ import org.apache.hadoop.hdfs.server.protocol.BlockCommand; import org.apache.hadoop.hdfs.server.protocol.BlockIdCommand; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; +import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations; import org.apache.hadoop.hdfs.server.protocol.CheckpointCommand; @@ -3010,4 +3013,16 @@ public class PBHelper { return targetPinnings; } + public static BlockReportContext convert(BlockReportContextProto proto) { + return new BlockReportContext(proto.getTotalRpcs(), + proto.getCurRpc(), proto.getId()); + } + + public static BlockReportContextProto convert(BlockReportContext context) { + return BlockReportContextProto.newBuilder(). + setTotalRpcs(context.getTotalRpcs()). + setCurRpc(context.getCurRpc()). + setId(context.getReportId()). + build(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 246ac484159..308a8fe6618 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -69,6 +69,7 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory; import org.apache.hadoop.hdfs.server.namenode.Namesystem; import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics; import org.apache.hadoop.hdfs.server.protocol.BlockCommand; +import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; @@ -1773,7 +1774,8 @@ public class BlockManager { */ public boolean processReport(final DatanodeID nodeID, final DatanodeStorage storage, - final BlockListAsLongs newReport) throws IOException { + final BlockListAsLongs newReport, BlockReportContext context, + boolean lastStorageInRpc) throws IOException { namesystem.writeLock(); final long startTime = Time.monotonicNow(); //after acquiring write lock final long endTime; @@ -1812,6 +1814,29 @@ public class BlockManager { } storageInfo.receivedBlockReport(); + if (context != null) { + storageInfo.setLastBlockReportId(context.getReportId()); + if (lastStorageInRpc) { + int rpcsSeen = node.updateBlockReportContext(context); + if (rpcsSeen >= context.getTotalRpcs()) { + List zombies = node.removeZombieStorages(); + if (zombies.isEmpty()) { + LOG.debug("processReport 0x{}: no zombie storages found.", + Long.toHexString(context.getReportId())); + } else { + for (DatanodeStorageInfo zombie : zombies) { + removeZombieReplicas(context, zombie); + } + } + node.clearBlockReportContext(); + } else { + LOG.debug("processReport 0x{}: {} more RPCs remaining in this " + + "report.", Long.toHexString(context.getReportId()), + (context.getTotalRpcs() - rpcsSeen) + ); + } + } + } } finally { endTime = Time.monotonicNow(); namesystem.writeUnlock(); @@ -1836,6 +1861,32 @@ public class BlockManager { return !node.hasStaleStorages(); } + private void removeZombieReplicas(BlockReportContext context, + DatanodeStorageInfo zombie) { + LOG.warn("processReport 0x{}: removing zombie storage {}, which no " + + "longer exists on the DataNode.", + Long.toHexString(context.getReportId()), zombie.getStorageID()); + assert(namesystem.hasWriteLock()); + Iterator iter = zombie.getBlockIterator(); + int prevBlocks = zombie.numBlocks(); + while (iter.hasNext()) { + BlockInfoContiguous block = iter.next(); + // We assume that a block can be on only one storage in a DataNode. + // That's why we pass in the DatanodeDescriptor rather than the + // DatanodeStorageInfo. + // TODO: remove this assumption in case we want to put a block on + // more than one storage on a datanode (and because it's a difficult + // assumption to really enforce) + removeStoredBlock(block, zombie.getDatanodeDescriptor()); + invalidateBlocks.remove(zombie.getDatanodeDescriptor(), block); + } + assert(zombie.numBlocks() == 0); + LOG.warn("processReport 0x{}: removed {} replicas from storage {}, " + + "which no longer exists on the DataNode.", + Long.toHexString(context.getReportId()), prevBlocks, + zombie.getStorageID()); + } + /** * Rescan the list of blocks which were previously postponed. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java index 83eebff24d6..96084a43005 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs.server.blockmanagement; import java.util.ArrayList; +import java.util.BitSet; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -31,6 +32,7 @@ import java.util.Set; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -40,6 +42,7 @@ import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.server.namenode.CachedBlock; +import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary; @@ -64,7 +67,25 @@ public class DatanodeDescriptor extends DatanodeInfo { // Stores status of decommissioning. // If node is not decommissioning, do not use this object for anything. public final DecommissioningStatus decommissioningStatus = new DecommissioningStatus(); - + + private long curBlockReportId = 0; + + private BitSet curBlockReportRpcsSeen = null; + + public int updateBlockReportContext(BlockReportContext context) { + if (curBlockReportId != context.getReportId()) { + curBlockReportId = context.getReportId(); + curBlockReportRpcsSeen = new BitSet(context.getTotalRpcs()); + } + curBlockReportRpcsSeen.set(context.getCurRpc()); + return curBlockReportRpcsSeen.cardinality(); + } + + public void clearBlockReportContext() { + curBlockReportId = 0; + curBlockReportRpcsSeen = null; + } + /** Block and targets pair */ @InterfaceAudience.Private @InterfaceStability.Evolving @@ -284,6 +305,34 @@ public class DatanodeDescriptor extends DatanodeInfo { } } + static final private List EMPTY_STORAGE_INFO_LIST = + ImmutableList.of(); + + List removeZombieStorages() { + List zombies = null; + synchronized (storageMap) { + Iterator> iter = + storageMap.entrySet().iterator(); + while (iter.hasNext()) { + Map.Entry entry = iter.next(); + DatanodeStorageInfo storageInfo = entry.getValue(); + if (storageInfo.getLastBlockReportId() != curBlockReportId) { + LOG.info(storageInfo.getStorageID() + " had lastBlockReportId 0x" + + Long.toHexString(storageInfo.getLastBlockReportId()) + + ", but curBlockReportId = 0x" + + Long.toHexString(curBlockReportId)); + iter.remove(); + if (zombies == null) { + zombies = new LinkedList(); + } + zombies.add(storageInfo); + } + storageInfo.setLastBlockReportId(0); + } + } + return zombies == null ? EMPTY_STORAGE_INFO_LIST : zombies; + } + /** * Remove block from the list of blocks belonging to the data-node. Remove * data-node from the block. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java index c4612a325ba..be16a873149 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java @@ -115,6 +115,9 @@ public class DatanodeStorageInfo { private volatile BlockInfoContiguous blockList = null; private int numBlocks = 0; + // The ID of the last full block report which updated this storage. + private long lastBlockReportId = 0; + /** The number of block reports received */ private int blockReportCount = 0; @@ -178,7 +181,15 @@ public class DatanodeStorageInfo { this.remaining = remaining; this.blockPoolUsed = blockPoolUsed; } - + + long getLastBlockReportId() { + return lastBlockReportId; + } + + void setLastBlockReportId(long lastBlockReportId) { + this.lastBlockReportId = lastBlockReportId; + } + State getState() { return this.state; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java index 90f2fe670d5..10cce4545ae 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java @@ -44,6 +44,7 @@ import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException; import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; import org.apache.hadoop.hdfs.server.common.IncorrectVersionException; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; +import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; @@ -434,6 +435,17 @@ class BPServiceActor implements Runnable { return sendImmediateIBR; } + private long prevBlockReportId = 0; + + private long generateUniqueBlockReportId() { + long id = System.nanoTime(); + if (id <= prevBlockReportId) { + id = prevBlockReportId + 1; + } + prevBlockReportId = id; + return id; + } + /** * Report the list blocks to the Namenode * @return DatanodeCommands returned by the NN. May be null. @@ -476,11 +488,13 @@ class BPServiceActor implements Runnable { int numRPCs = 0; boolean success = false; long brSendStartTime = monotonicNow(); + long reportId = generateUniqueBlockReportId(); try { if (totalBlockCount < dnConf.blockReportSplitThreshold) { // Below split threshold, send all reports in a single message. DatanodeCommand cmd = bpNamenode.blockReport( - bpRegistration, bpos.getBlockPoolId(), reports); + bpRegistration, bpos.getBlockPoolId(), reports, + new BlockReportContext(1, 0, reportId)); numRPCs = 1; numReportsSent = reports.length; if (cmd != null) { @@ -488,10 +502,11 @@ class BPServiceActor implements Runnable { } } else { // Send one block report per message. - for (StorageBlockReport report : reports) { - StorageBlockReport singleReport[] = { report }; + for (int r = 0; r < reports.length; r++) { + StorageBlockReport singleReport[] = { reports[r] }; DatanodeCommand cmd = bpNamenode.blockReport( - bpRegistration, bpos.getBlockPoolId(), singleReport); + bpRegistration, bpos.getBlockPoolId(), singleReport, + new BlockReportContext(reports.length, r, reportId)); numReportsSent++; numRPCs++; if (cmd != null) { @@ -507,11 +522,12 @@ class BPServiceActor implements Runnable { dn.getMetrics().addBlockReport(brSendCost); final int nCmds = cmds.size(); LOG.info((success ? "S" : "Uns") + - "uccessfully sent " + numReportsSent + - " of " + reports.length + - " blockreports for " + totalBlockCount + - " total blocks using " + numRPCs + - " RPCs. This took " + brCreateCost + + "uccessfully sent block report 0x" + + Long.toHexString(reportId) + ", containing " + reports.length + + " storage report(s), of which we sent " + numReportsSent + "." + + " The reports had " + totalBlockCount + + " total blocks and used " + numRPCs + + " RPC(s). This took " + brCreateCost + " msec to generate and " + brSendCost + " msecs for RPC and NN processing." + " Got back " + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index 940478c7c90..dffeecab073 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -121,6 +121,7 @@ import org.apache.hadoop.hdfs.server.common.IncorrectVersionException; import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory; import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics; import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods; +import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; @@ -1286,7 +1287,8 @@ class NameNodeRpcServer implements NamenodeProtocols { @Override // DatanodeProtocol public DatanodeCommand blockReport(DatanodeRegistration nodeReg, - String poolId, StorageBlockReport[] reports) throws IOException { + String poolId, StorageBlockReport[] reports, + BlockReportContext context) throws IOException { checkNNStartup(); verifyRequest(nodeReg); if(blockStateChangeLog.isDebugEnabled()) { @@ -1295,14 +1297,15 @@ class NameNodeRpcServer implements NamenodeProtocols { } final BlockManager bm = namesystem.getBlockManager(); boolean noStaleStorages = false; - for(StorageBlockReport r : reports) { - final BlockListAsLongs blocks = r.getBlocks(); + for (int r = 0; r < reports.length; r++) { + final BlockListAsLongs blocks = reports[r].getBlocks(); // // BlockManager.processReport accumulates information of prior calls // for the same node and storage, so the value returned by the last // call of this loop is the final updated value for noStaleStorage. // - noStaleStorages = bm.processReport(nodeReg, r.getStorage(), blocks); + noStaleStorages = bm.processReport(nodeReg, reports[r].getStorage(), + blocks, context, (r == reports.length - 1)); metrics.incrStorageBlockReportOps(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockReportContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockReportContext.java new file mode 100644 index 00000000000..a084a813cf6 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockReportContext.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.protocol; + +/** + * The context of the block report. + * + * This is a set of fields that the Datanode sends to provide context about a + * block report RPC. The context includes a unique 64-bit ID which + * identifies the block report as a whole. It also includes the total number + * of RPCs which this block report is split into, and the index into that + * total for the current RPC. + */ +public class BlockReportContext { + private final int totalRpcs; + private final int curRpc; + private final long reportId; + + public BlockReportContext(int totalRpcs, int curRpc, long reportId) { + this.totalRpcs = totalRpcs; + this.curRpc = curRpc; + this.reportId = reportId; + } + + public int getTotalRpcs() { + return totalRpcs; + } + + public int getCurRpc() { + return curRpc; + } + + public long getReportId() { + return reportId; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java index 047de569618..a3b6004644b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java @@ -23,7 +23,6 @@ import java.util.List; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock; @@ -128,20 +127,23 @@ public interface DatanodeProtocol { * Each finalized block is represented as 3 longs. Each under- * construction replica is represented as 4 longs. * This is done instead of Block[] to reduce memory used by block reports. - * + * @param reports report of blocks per storage + * @param context Context information for this block report. + * * @return - the next command for DN to process. * @throws IOException */ @Idempotent public DatanodeCommand blockReport(DatanodeRegistration registration, - String poolId, StorageBlockReport[] reports) throws IOException; + String poolId, StorageBlockReport[] reports, + BlockReportContext context) throws IOException; /** * Communicates the complete list of locally cached blocks to the NameNode. * * This method is similar to - * {@link #blockReport(DatanodeRegistration, String, StorageBlockReport[])}, + * {@link #blockReport(DatanodeRegistration, String, StorageBlockReport[], BlockReportContext)}, * which is used to communicated blocks stored on disk. * * @param The datanode registration. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto index 0014db75e97..bce5f5677e4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto @@ -227,11 +227,25 @@ message HeartbeatResponseProto { * second long represents length * third long represents gen stamp * fourth long (if under construction) represents replica state + * context - An optional field containing information about the context + * of this block report. */ message BlockReportRequestProto { required DatanodeRegistrationProto registration = 1; required string blockPoolId = 2; repeated StorageBlockReportProto reports = 3; + optional BlockReportContextProto context = 4; +} + +message BlockReportContextProto { + // The total number of RPCs this block report is broken into. + required int32 totalRpcs = 1; + + // The index of the current RPC (zero-based) + required int32 curRpc = 2; + + // The unique 64-bit ID of this block report + required int64 id = 3; } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/TestBlockListAsLongs.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/TestBlockListAsLongs.java index bebde18c078..f0dab4c0133 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/TestBlockListAsLongs.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/TestBlockListAsLongs.java @@ -47,6 +47,7 @@ import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica; import org.apache.hadoop.hdfs.server.datanode.Replica; import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten; import org.apache.hadoop.hdfs.server.datanode.ReplicaWaitingToBeRecovered; +import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; @@ -219,7 +220,8 @@ public class TestBlockListAsLongs { // check DN sends new-style BR request.set(null); nsInfo.setCapabilities(Capability.STORAGE_BLOCK_REPORT_BUFFERS.getMask()); - nn.blockReport(reg, "pool", sbr); + nn.blockReport(reg, "pool", sbr, + new BlockReportContext(1, 0, System.nanoTime())); BlockReportRequestProto proto = request.get(); assertNotNull(proto); assertTrue(proto.getReports(0).getBlocksList().isEmpty()); @@ -228,7 +230,8 @@ public class TestBlockListAsLongs { // back up to prior version and check DN sends old-style BR request.set(null); nsInfo.setCapabilities(Capability.UNKNOWN.getMask()); - nn.blockReport(reg, "pool", sbr); + nn.blockReport(reg, "pool", sbr, + new BlockReportContext(1, 0, System.nanoTime())); proto = request.get(); assertNotNull(proto); assertFalse(proto.getReports(0).getBlocksList().isEmpty()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java index d9ac9e550c6..707c7807b80 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java @@ -555,12 +555,12 @@ public class TestBlockManager { reset(node); bm.processReport(node, new DatanodeStorage(ds.getStorageID()), - BlockListAsLongs.EMPTY); + BlockListAsLongs.EMPTY, null, false); assertEquals(1, ds.getBlockReportCount()); // send block report again, should NOT be processed reset(node); bm.processReport(node, new DatanodeStorage(ds.getStorageID()), - BlockListAsLongs.EMPTY); + BlockListAsLongs.EMPTY, null, false); assertEquals(1, ds.getBlockReportCount()); // re-register as if node restarted, should update existing node @@ -571,7 +571,7 @@ public class TestBlockManager { // send block report, should be processed after restart reset(node); bm.processReport(node, new DatanodeStorage(ds.getStorageID()), - BlockListAsLongs.EMPTY); + BlockListAsLongs.EMPTY, null, false); // Reinitialize as registration with empty storage list pruned // node.storageMap. ds = node.getStorageInfos()[0]; @@ -600,7 +600,7 @@ public class TestBlockManager { reset(node); doReturn(1).when(node).numBlocks(); bm.processReport(node, new DatanodeStorage(ds.getStorageID()), - BlockListAsLongs.EMPTY); + BlockListAsLongs.EMPTY, null, false); assertEquals(1, ds.getBlockReportCount()); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java index b67ae7a6232..4b97d01b73e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java @@ -18,26 +18,40 @@ package org.apache.hadoop.hdfs.server.blockmanagement; +import com.google.common.base.Supplier; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.commons.math3.stat.inference.TestUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.test.GenericTestUtils; +import org.junit.Assert; import org.junit.Test; +import java.io.File; +import java.io.FileOutputStream; import java.io.IOException; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; import static org.hamcrest.core.Is.is; import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertEquals; public class TestNameNodePrunesMissingStorages { @@ -110,7 +124,9 @@ public class TestNameNodePrunesMissingStorages { } /** - * Verify that the NameNode does not prune storages with blocks. + * Verify that the NameNode does not prune storages with blocks + * simply as a result of a heartbeat being sent missing that storage. + * * @throws IOException */ @Test (timeout=300000) @@ -118,4 +134,119 @@ public class TestNameNodePrunesMissingStorages { // Run the test with 1 storage, after the text still expect 1 storage. runTest(GenericTestUtils.getMethodName(), true, 1, 1); } + + /** + * Regression test for HDFS-7960.

+ * + * Shutting down a datanode, removing a storage directory, and restarting + * the DataNode should not produce zombie storages. + */ + @Test(timeout=300000) + public void testRemovingStorageDoesNotProduceZombies() throws Exception { + Configuration conf = new HdfsConfiguration(); + conf.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 1); + final int NUM_STORAGES_PER_DN = 2; + final MiniDFSCluster cluster = new MiniDFSCluster + .Builder(conf).numDataNodes(3) + .storagesPerDatanode(NUM_STORAGES_PER_DN) + .build(); + try { + cluster.waitActive(); + for (DataNode dn : cluster.getDataNodes()) { + assertEquals(NUM_STORAGES_PER_DN, + cluster.getNamesystem().getBlockManager(). + getDatanodeManager().getDatanode(dn.getDatanodeId()). + getStorageInfos().length); + } + // Create a file which will end up on all 3 datanodes. + final Path TEST_PATH = new Path("/foo1"); + DistributedFileSystem fs = cluster.getFileSystem(); + DFSTestUtil.createFile(fs, TEST_PATH, 1024, (short) 3, 0xcafecafe); + for (DataNode dn : cluster.getDataNodes()) { + DataNodeTestUtils.triggerBlockReport(dn); + } + ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, new Path("/foo1")); + cluster.getNamesystem().writeLock(); + final String storageIdToRemove; + String datanodeUuid; + // Find the first storage which this block is in. + try { + Iterator storageInfoIter = + cluster.getNamesystem().getBlockManager(). + getStorages(block.getLocalBlock()).iterator(); + assertTrue(storageInfoIter.hasNext()); + DatanodeStorageInfo info = storageInfoIter.next(); + storageIdToRemove = info.getStorageID(); + datanodeUuid = info.getDatanodeDescriptor().getDatanodeUuid(); + } finally { + cluster.getNamesystem().writeUnlock(); + } + // Find the DataNode which holds that first storage. + final DataNode datanodeToRemoveStorageFrom; + int datanodeToRemoveStorageFromIdx = 0; + while (true) { + if (datanodeToRemoveStorageFromIdx >= cluster.getDataNodes().size()) { + Assert.fail("failed to find datanode with uuid " + datanodeUuid); + datanodeToRemoveStorageFrom = null; + break; + } + DataNode dn = cluster.getDataNodes(). + get(datanodeToRemoveStorageFromIdx); + if (dn.getDatanodeUuid().equals(datanodeUuid)) { + datanodeToRemoveStorageFrom = dn; + break; + } + datanodeToRemoveStorageFromIdx++; + } + // Find the volume within the datanode which holds that first storage. + List volumes = + datanodeToRemoveStorageFrom.getFSDataset().getVolumes(); + assertEquals(NUM_STORAGES_PER_DN, volumes.size()); + String volumeDirectoryToRemove = null; + for (FsVolumeSpi volume : volumes) { + if (volume.getStorageID().equals(storageIdToRemove)) { + volumeDirectoryToRemove = volume.getBasePath(); + } + } + // Shut down the datanode and remove the volume. + // Replace the volume directory with a regular file, which will + // cause a volume failure. (If we merely removed the directory, + // it would be re-initialized with a new storage ID.) + assertNotNull(volumeDirectoryToRemove); + datanodeToRemoveStorageFrom.shutdown(); + FileUtil.fullyDelete(new File(volumeDirectoryToRemove)); + FileOutputStream fos = new FileOutputStream(volumeDirectoryToRemove); + try { + fos.write(1); + } finally { + fos.close(); + } + cluster.restartDataNode(datanodeToRemoveStorageFromIdx); + // Wait for the NameNode to remove the storage. + LOG.info("waiting for the datanode to remove " + storageIdToRemove); + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + final DatanodeDescriptor dnDescriptor = + cluster.getNamesystem().getBlockManager().getDatanodeManager(). + getDatanode(datanodeToRemoveStorageFrom.getDatanodeUuid()); + assertNotNull(dnDescriptor); + DatanodeStorageInfo[] infos = dnDescriptor.getStorageInfos(); + for (DatanodeStorageInfo info : infos) { + if (info.getStorageID().equals(storageIdToRemove)) { + LOG.info("Still found storage " + storageIdToRemove + " on " + + info + "."); + return false; + } + } + assertEquals(NUM_STORAGES_PER_DN - 1, infos.length); + return true; + } + }, 10, 30000); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java index de66db5fb9c..c4a2d06c0e9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java @@ -53,6 +53,7 @@ import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport; @@ -613,7 +614,8 @@ public abstract class BlockReportTestBase { .when(spy).blockReport( Mockito.anyObject(), Mockito.anyString(), - Mockito.anyObject()); + Mockito.anyObject(), + Mockito.anyObject()); // Force a block report to be generated. The block report will have // an RBW replica in it. Wait for the RPC to be sent, but block diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java index bc497934f25..3aa9a7b7b69 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java @@ -43,6 +43,7 @@ import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics; import org.apache.hadoop.hdfs.server.protocol.BlockCommand; +import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; @@ -216,7 +217,8 @@ public class TestBPOfferService { .when(mockNN2).blockReport( Mockito.anyObject(), Mockito.eq(FAKE_BPID), - Mockito.anyObject()); + Mockito.anyObject(), + Mockito.anyObject()); bpos.start(); try { @@ -406,7 +408,8 @@ public class TestBPOfferService { Mockito.verify(mockNN).blockReport( Mockito.anyObject(), Mockito.eq(FAKE_BPID), - Mockito.anyObject()); + Mockito.anyObject(), + Mockito.anyObject()); return true; } catch (Throwable t) { LOG.info("waiting on block report: " + t.getMessage()); @@ -431,7 +434,8 @@ public class TestBPOfferService { Mockito.verify(mockNN).blockReport( Mockito.anyObject(), Mockito.eq(FAKE_BPID), - Mockito.anyObject()); + Mockito.anyObject(), + Mockito.anyObject()); return true; } catch (Throwable t) { LOG.info("waiting on block report: " + t.getMessage()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockHasMultipleReplicasOnSameDN.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockHasMultipleReplicasOnSameDN.java index 3238d6acf2b..c47209e9eea 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockHasMultipleReplicasOnSameDN.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockHasMultipleReplicasOnSameDN.java @@ -28,6 +28,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.*; import org.apache.hadoop.hdfs.protocol.*; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; +import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport; @@ -122,7 +123,8 @@ public class TestBlockHasMultipleReplicasOnSameDN { } // Should not assert! - cluster.getNameNodeRpc().blockReport(dnReg, bpid, reports); + cluster.getNameNodeRpc().blockReport(dnReg, bpid, reports, + new BlockReportContext(1, 0, System.nanoTime())); // Get the block locations once again. locatedBlocks = client.getLocatedBlocks(filename, 0, BLOCK_SIZE * NUM_BLOCKS); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java index 0428b814f29..41e8d7b45bb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java @@ -63,6 +63,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; +import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; @@ -185,7 +186,7 @@ public class TestDataNodeVolumeFailure { new StorageBlockReport(dnStorage, blockList); } - cluster.getNameNodeRpc().blockReport(dnR, bpid, reports); + cluster.getNameNodeRpc().blockReport(dnR, bpid, reports, null); // verify number of blocks and files... verify(filename, filesize); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java index ac7ebc05eba..cab50b56297 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; +import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse; @@ -136,7 +137,8 @@ public class TestDatanodeProtocolRetryPolicy { Mockito.verify(mockNN).blockReport( Mockito.eq(datanodeRegistration), Mockito.eq(POOL_ID), - Mockito.anyObject()); + Mockito.anyObject(), + Mockito.anyObject()); return true; } catch (Throwable t) { LOG.info("waiting on block report: " + t.getMessage()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDnRespectsBlockReportSplitThreshold.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDnRespectsBlockReportSplitThreshold.java index a5e4d4eb829..aadd9b2020e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDnRespectsBlockReportSplitThreshold.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDnRespectsBlockReportSplitThreshold.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hdfs.*; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY; @@ -133,7 +134,7 @@ public class TestDnRespectsBlockReportSplitThreshold { Mockito.verify(nnSpy, times(cluster.getStoragesPerDatanode())).blockReport( any(DatanodeRegistration.class), anyString(), - captor.capture()); + captor.capture(), Mockito.anyObject()); verifyCapturedArguments(captor, 1, BLOCKS_IN_FILE); } @@ -165,7 +166,7 @@ public class TestDnRespectsBlockReportSplitThreshold { Mockito.verify(nnSpy, times(1)).blockReport( any(DatanodeRegistration.class), anyString(), - captor.capture()); + captor.capture(), Mockito.anyObject()); verifyCapturedArguments(captor, cluster.getStoragesPerDatanode(), BLOCKS_IN_FILE); } @@ -197,7 +198,7 @@ public class TestDnRespectsBlockReportSplitThreshold { Mockito.verify(nnSpy, times(cluster.getStoragesPerDatanode())).blockReport( any(DatanodeRegistration.class), anyString(), - captor.capture()); + captor.capture(), Mockito.anyObject()); verifyCapturedArguments(captor, 1, BLOCKS_IN_FILE); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesBlockReportPerStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesBlockReportPerStorage.java index 1b03786fa39..b150b0d3452 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesBlockReportPerStorage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesBlockReportPerStorage.java @@ -20,8 +20,10 @@ package org.apache.hadoop.hdfs.server.datanode; import java.io.IOException; +import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport; +import org.apache.hadoop.util.Time; /** @@ -33,10 +35,13 @@ public class TestNNHandlesBlockReportPerStorage extends BlockReportTestBase { @Override protected void sendBlockReports(DatanodeRegistration dnR, String poolId, StorageBlockReport[] reports) throws IOException { + int i = 0; for (StorageBlockReport report : reports) { LOG.info("Sending block report for storage " + report.getStorage().getStorageID()); StorageBlockReport[] singletonReport = { report }; - cluster.getNameNodeRpc().blockReport(dnR, poolId, singletonReport); + cluster.getNameNodeRpc().blockReport(dnR, poolId, singletonReport, + new BlockReportContext(reports.length, i, System.nanoTime())); + i++; } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesCombinedBlockReport.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesCombinedBlockReport.java index 036b550c668..dca3c880bf1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesCombinedBlockReport.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesCombinedBlockReport.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.datanode; import java.io.IOException; +import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport; @@ -34,6 +35,7 @@ public class TestNNHandlesCombinedBlockReport extends BlockReportTestBase { protected void sendBlockReports(DatanodeRegistration dnR, String poolId, StorageBlockReport[] reports) throws IOException { LOG.info("Sending combined block reports for " + dnR); - cluster.getNameNodeRpc().blockReport(dnR, poolId, reports); + cluster.getNameNodeRpc().blockReport(dnR, poolId, reports, + new BlockReportContext(1, 0, System.nanoTime())); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestTriggerBlockReport.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestTriggerBlockReport.java index efb9d980b35..3195d7d8e87 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestTriggerBlockReport.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestTriggerBlockReport.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.client.BlockReportOptions; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; +import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus; @@ -76,7 +77,8 @@ public final class TestTriggerBlockReport { Mockito.verify(spy, times(0)).blockReport( any(DatanodeRegistration.class), anyString(), - any(StorageBlockReport[].class)); + any(StorageBlockReport[].class), + Mockito.anyObject()); Mockito.verify(spy, times(1)).blockReceivedAndDeleted( any(DatanodeRegistration.class), anyString(), @@ -113,7 +115,8 @@ public final class TestTriggerBlockReport { Mockito.verify(spy, timeout(60000)).blockReport( any(DatanodeRegistration.class), anyString(), - any(StorageBlockReport[].class)); + any(StorageBlockReport[].class), + Mockito.anyObject()); } cluster.shutdown(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java index bc3c6b513cd..9e24f7277ec 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java @@ -48,6 +48,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataStorage; import org.apache.hadoop.hdfs.server.protocol.BlockCommand; +import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; @@ -939,7 +940,8 @@ public class NNThroughputBenchmark implements Tool { new StorageBlockReport(storage, BlockListAsLongs.EMPTY) }; nameNodeProto.blockReport(dnRegistration, - nameNode.getNamesystem().getBlockPoolId(), reports); + nameNode.getNamesystem().getBlockPoolId(), reports, + new BlockReportContext(1, 0, System.nanoTime())); } /** @@ -1184,8 +1186,9 @@ public class NNThroughputBenchmark implements Tool { long start = Time.now(); StorageBlockReport[] report = { new StorageBlockReport( dn.storage, dn.getBlockReportList()) }; - nameNodeProto.blockReport(dn.dnRegistration, nameNode.getNamesystem() - .getBlockPoolId(), report); + nameNodeProto.blockReport(dn.dnRegistration, + nameNode.getNamesystem().getBlockPoolId(), report, + new BlockReportContext(1, 0, System.nanoTime())); long end = Time.now(); return end-start; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java index ee80b33810b..92c329e332a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; +import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; @@ -107,7 +108,8 @@ public class TestDeadDatanode { new DatanodeStorage(reg.getDatanodeUuid()), BlockListAsLongs.EMPTY) }; try { - dnp.blockReport(reg, poolId, report); + dnp.blockReport(reg, poolId, report, + new BlockReportContext(1, 0, System.nanoTime())); fail("Expected IOException is not thrown"); } catch (IOException ex) { // Expected diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java index fa7a3079b15..74358bbd829 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java @@ -53,6 +53,7 @@ import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; +import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport; import org.apache.hadoop.io.IOUtils; @@ -547,7 +548,8 @@ public class TestDNFencing { .when(spy).blockReport( Mockito.anyObject(), Mockito.anyString(), - Mockito.anyObject()); + Mockito.anyObject(), + Mockito.anyObject()); dn.scheduleAllBlockReport(0); delayer.waitForCall();