HDFS-7960. The full block report should prune zombie storages even if they're not empty. Contributed by Colin McCabe and Eddy Xu.

(cherry picked from commit 50ee8f4e67)
This commit is contained in:
Andrew Wang 2015-03-23 22:00:34 -07:00
parent fe693b72de
commit 2f46ee50bd
27 changed files with 433 additions and 49 deletions

View File

@ -941,6 +941,9 @@ Release 2.7.0 - UNRELEASED
provided by the client is larger than the one stored in the datanode. provided by the client is larger than the one stored in the datanode.
(Brahma Reddy Battula via szetszwo) (Brahma Reddy Battula via szetszwo)
HDFS-7960. The full block report should prune zombie storages even if
they're not empty. (cmccabe and Eddy Xu via wang)
BREAKDOWN OF HDFS-7584 SUBTASKS AND RELATED JIRAS BREAKDOWN OF HDFS-7584 SUBTASKS AND RELATED JIRAS
HDFS-7720. Quota by Storage Type API, tools and ClientNameNode HDFS-7720. Quota by Storage Type API, tools and ClientNameNode

View File

@ -47,6 +47,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReportBadBlo
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageBlockReportProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageBlockReportProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageReceivedDeletedBlocksProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageReceivedDeletedBlocksProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.VersionRequestProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.VersionRequestProto;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@ -169,7 +170,8 @@ public class DatanodeProtocolClientSideTranslatorPB implements
@Override @Override
public DatanodeCommand blockReport(DatanodeRegistration registration, public DatanodeCommand blockReport(DatanodeRegistration registration,
String poolId, StorageBlockReport[] reports) throws IOException { String poolId, StorageBlockReport[] reports, BlockReportContext context)
throws IOException {
BlockReportRequestProto.Builder builder = BlockReportRequestProto BlockReportRequestProto.Builder builder = BlockReportRequestProto
.newBuilder().setRegistration(PBHelper.convert(registration)) .newBuilder().setRegistration(PBHelper.convert(registration))
.setBlockPoolId(poolId); .setBlockPoolId(poolId);
@ -191,6 +193,7 @@ public class DatanodeProtocolClientSideTranslatorPB implements
} }
builder.addReports(reportBuilder.build()); builder.addReports(reportBuilder.build());
} }
builder.setContext(PBHelper.convert(context));
BlockReportResponseProto resp; BlockReportResponseProto resp;
try { try {
resp = rpcProxy.blockReport(NULL_CONTROLLER, builder.build()); resp = rpcProxy.blockReport(NULL_CONTROLLER, builder.build());

View File

@ -161,7 +161,9 @@ public class DatanodeProtocolServerSideTranslatorPB implements
} }
try { try {
cmd = impl.blockReport(PBHelper.convert(request.getRegistration()), cmd = impl.blockReport(PBHelper.convert(request.getRegistration()),
request.getBlockPoolId(), report); request.getBlockPoolId(), report,
request.hasContext() ?
PBHelper.convert(request.getContext()) : null);
} catch (IOException e) { } catch (IOException e) {
throw new ServiceException(e); throw new ServiceException(e);
} }

View File

@ -111,6 +111,7 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.Rollin
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SafeModeActionProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SafeModeActionProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmIdProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmIdProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmSlotProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmSlotProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BalancerBandwidthCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BalancerBandwidthCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockIdCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockIdCommandProto;
@ -123,6 +124,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.NNHAStatusHe
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReceivedDeletedBlockInfoProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReceivedDeletedBlockInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.RegisterCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.RegisterCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.VolumeFailureSummaryProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.VolumeFailureSummaryProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportContextProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockKeyProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockKeyProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
@ -195,6 +197,7 @@ import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockIdCommand; import org.apache.hadoop.hdfs.server.protocol.BlockIdCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
import org.apache.hadoop.hdfs.server.protocol.CheckpointCommand; import org.apache.hadoop.hdfs.server.protocol.CheckpointCommand;
@ -3010,4 +3013,16 @@ public class PBHelper {
return targetPinnings; return targetPinnings;
} }
public static BlockReportContext convert(BlockReportContextProto proto) {
return new BlockReportContext(proto.getTotalRpcs(),
proto.getCurRpc(), proto.getId());
}
public static BlockReportContextProto convert(BlockReportContext context) {
return BlockReportContextProto.newBuilder().
setTotalRpcs(context.getTotalRpcs()).
setCurRpc(context.getCurRpc()).
setId(context.getReportId()).
build();
}
} }

View File

@ -69,6 +69,7 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
import org.apache.hadoop.hdfs.server.namenode.Namesystem; import org.apache.hadoop.hdfs.server.namenode.Namesystem;
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics; import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand; import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
@ -1773,7 +1774,8 @@ public class BlockManager {
*/ */
public boolean processReport(final DatanodeID nodeID, public boolean processReport(final DatanodeID nodeID,
final DatanodeStorage storage, final DatanodeStorage storage,
final BlockListAsLongs newReport) throws IOException { final BlockListAsLongs newReport, BlockReportContext context,
boolean lastStorageInRpc) throws IOException {
namesystem.writeLock(); namesystem.writeLock();
final long startTime = Time.monotonicNow(); //after acquiring write lock final long startTime = Time.monotonicNow(); //after acquiring write lock
final long endTime; final long endTime;
@ -1812,6 +1814,29 @@ public class BlockManager {
} }
storageInfo.receivedBlockReport(); storageInfo.receivedBlockReport();
if (context != null) {
storageInfo.setLastBlockReportId(context.getReportId());
if (lastStorageInRpc) {
int rpcsSeen = node.updateBlockReportContext(context);
if (rpcsSeen >= context.getTotalRpcs()) {
List<DatanodeStorageInfo> zombies = node.removeZombieStorages();
if (zombies.isEmpty()) {
LOG.debug("processReport 0x{}: no zombie storages found.",
Long.toHexString(context.getReportId()));
} else {
for (DatanodeStorageInfo zombie : zombies) {
removeZombieReplicas(context, zombie);
}
}
node.clearBlockReportContext();
} else {
LOG.debug("processReport 0x{}: {} more RPCs remaining in this " +
"report.", Long.toHexString(context.getReportId()),
(context.getTotalRpcs() - rpcsSeen)
);
}
}
}
} finally { } finally {
endTime = Time.monotonicNow(); endTime = Time.monotonicNow();
namesystem.writeUnlock(); namesystem.writeUnlock();
@ -1836,6 +1861,32 @@ public class BlockManager {
return !node.hasStaleStorages(); return !node.hasStaleStorages();
} }
private void removeZombieReplicas(BlockReportContext context,
DatanodeStorageInfo zombie) {
LOG.warn("processReport 0x{}: removing zombie storage {}, which no " +
"longer exists on the DataNode.",
Long.toHexString(context.getReportId()), zombie.getStorageID());
assert(namesystem.hasWriteLock());
Iterator<BlockInfoContiguous> iter = zombie.getBlockIterator();
int prevBlocks = zombie.numBlocks();
while (iter.hasNext()) {
BlockInfoContiguous block = iter.next();
// We assume that a block can be on only one storage in a DataNode.
// That's why we pass in the DatanodeDescriptor rather than the
// DatanodeStorageInfo.
// TODO: remove this assumption in case we want to put a block on
// more than one storage on a datanode (and because it's a difficult
// assumption to really enforce)
removeStoredBlock(block, zombie.getDatanodeDescriptor());
invalidateBlocks.remove(zombie.getDatanodeDescriptor(), block);
}
assert(zombie.numBlocks() == 0);
LOG.warn("processReport 0x{}: removed {} replicas from storage {}, " +
"which no longer exists on the DataNode.",
Long.toHexString(context.getReportId()), prevBlocks,
zombie.getStorageID());
}
/** /**
* Rescan the list of blocks which were previously postponed. * Rescan the list of blocks which were previously postponed.
*/ */

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.blockmanagement; package org.apache.hadoop.hdfs.server.blockmanagement;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
@ -31,6 +32,7 @@ import java.util.Set;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
@ -40,6 +42,7 @@ import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.server.namenode.CachedBlock; import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary; import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
@ -65,6 +68,24 @@ public class DatanodeDescriptor extends DatanodeInfo {
// If node is not decommissioning, do not use this object for anything. // If node is not decommissioning, do not use this object for anything.
public final DecommissioningStatus decommissioningStatus = new DecommissioningStatus(); public final DecommissioningStatus decommissioningStatus = new DecommissioningStatus();
private long curBlockReportId = 0;
private BitSet curBlockReportRpcsSeen = null;
public int updateBlockReportContext(BlockReportContext context) {
if (curBlockReportId != context.getReportId()) {
curBlockReportId = context.getReportId();
curBlockReportRpcsSeen = new BitSet(context.getTotalRpcs());
}
curBlockReportRpcsSeen.set(context.getCurRpc());
return curBlockReportRpcsSeen.cardinality();
}
public void clearBlockReportContext() {
curBlockReportId = 0;
curBlockReportRpcsSeen = null;
}
/** Block and targets pair */ /** Block and targets pair */
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Evolving @InterfaceStability.Evolving
@ -284,6 +305,34 @@ public class DatanodeDescriptor extends DatanodeInfo {
} }
} }
static final private List<DatanodeStorageInfo> EMPTY_STORAGE_INFO_LIST =
ImmutableList.of();
List<DatanodeStorageInfo> removeZombieStorages() {
List<DatanodeStorageInfo> zombies = null;
synchronized (storageMap) {
Iterator<Map.Entry<String, DatanodeStorageInfo>> iter =
storageMap.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry<String, DatanodeStorageInfo> entry = iter.next();
DatanodeStorageInfo storageInfo = entry.getValue();
if (storageInfo.getLastBlockReportId() != curBlockReportId) {
LOG.info(storageInfo.getStorageID() + " had lastBlockReportId 0x" +
Long.toHexString(storageInfo.getLastBlockReportId()) +
", but curBlockReportId = 0x" +
Long.toHexString(curBlockReportId));
iter.remove();
if (zombies == null) {
zombies = new LinkedList<DatanodeStorageInfo>();
}
zombies.add(storageInfo);
}
storageInfo.setLastBlockReportId(0);
}
}
return zombies == null ? EMPTY_STORAGE_INFO_LIST : zombies;
}
/** /**
* Remove block from the list of blocks belonging to the data-node. Remove * Remove block from the list of blocks belonging to the data-node. Remove
* data-node from the block. * data-node from the block.

View File

@ -115,6 +115,9 @@ public class DatanodeStorageInfo {
private volatile BlockInfoContiguous blockList = null; private volatile BlockInfoContiguous blockList = null;
private int numBlocks = 0; private int numBlocks = 0;
// The ID of the last full block report which updated this storage.
private long lastBlockReportId = 0;
/** The number of block reports received */ /** The number of block reports received */
private int blockReportCount = 0; private int blockReportCount = 0;
@ -179,6 +182,14 @@ public class DatanodeStorageInfo {
this.blockPoolUsed = blockPoolUsed; this.blockPoolUsed = blockPoolUsed;
} }
long getLastBlockReportId() {
return lastBlockReportId;
}
void setLastBlockReportId(long lastBlockReportId) {
this.lastBlockReportId = lastBlockReportId;
}
State getState() { State getState() {
return this.state; return this.state;
} }

View File

@ -44,6 +44,7 @@ import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.common.IncorrectVersionException; import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
@ -434,6 +435,17 @@ class BPServiceActor implements Runnable {
return sendImmediateIBR; return sendImmediateIBR;
} }
private long prevBlockReportId = 0;
private long generateUniqueBlockReportId() {
long id = System.nanoTime();
if (id <= prevBlockReportId) {
id = prevBlockReportId + 1;
}
prevBlockReportId = id;
return id;
}
/** /**
* Report the list blocks to the Namenode * Report the list blocks to the Namenode
* @return DatanodeCommands returned by the NN. May be null. * @return DatanodeCommands returned by the NN. May be null.
@ -476,11 +488,13 @@ class BPServiceActor implements Runnable {
int numRPCs = 0; int numRPCs = 0;
boolean success = false; boolean success = false;
long brSendStartTime = monotonicNow(); long brSendStartTime = monotonicNow();
long reportId = generateUniqueBlockReportId();
try { try {
if (totalBlockCount < dnConf.blockReportSplitThreshold) { if (totalBlockCount < dnConf.blockReportSplitThreshold) {
// Below split threshold, send all reports in a single message. // Below split threshold, send all reports in a single message.
DatanodeCommand cmd = bpNamenode.blockReport( DatanodeCommand cmd = bpNamenode.blockReport(
bpRegistration, bpos.getBlockPoolId(), reports); bpRegistration, bpos.getBlockPoolId(), reports,
new BlockReportContext(1, 0, reportId));
numRPCs = 1; numRPCs = 1;
numReportsSent = reports.length; numReportsSent = reports.length;
if (cmd != null) { if (cmd != null) {
@ -488,10 +502,11 @@ class BPServiceActor implements Runnable {
} }
} else { } else {
// Send one block report per message. // Send one block report per message.
for (StorageBlockReport report : reports) { for (int r = 0; r < reports.length; r++) {
StorageBlockReport singleReport[] = { report }; StorageBlockReport singleReport[] = { reports[r] };
DatanodeCommand cmd = bpNamenode.blockReport( DatanodeCommand cmd = bpNamenode.blockReport(
bpRegistration, bpos.getBlockPoolId(), singleReport); bpRegistration, bpos.getBlockPoolId(), singleReport,
new BlockReportContext(reports.length, r, reportId));
numReportsSent++; numReportsSent++;
numRPCs++; numRPCs++;
if (cmd != null) { if (cmd != null) {
@ -507,11 +522,12 @@ class BPServiceActor implements Runnable {
dn.getMetrics().addBlockReport(brSendCost); dn.getMetrics().addBlockReport(brSendCost);
final int nCmds = cmds.size(); final int nCmds = cmds.size();
LOG.info((success ? "S" : "Uns") + LOG.info((success ? "S" : "Uns") +
"uccessfully sent " + numReportsSent + "uccessfully sent block report 0x" +
" of " + reports.length + Long.toHexString(reportId) + ", containing " + reports.length +
" blockreports for " + totalBlockCount + " storage report(s), of which we sent " + numReportsSent + "." +
" total blocks using " + numRPCs + " The reports had " + totalBlockCount +
" RPCs. This took " + brCreateCost + " total blocks and used " + numRPCs +
" RPC(s). This took " + brCreateCost +
" msec to generate and " + brSendCost + " msec to generate and " + brSendCost +
" msecs for RPC and NN processing." + " msecs for RPC and NN processing." +
" Got back " + " Got back " +

View File

@ -121,6 +121,7 @@ import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory; import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics; import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods; import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
@ -1286,7 +1287,8 @@ class NameNodeRpcServer implements NamenodeProtocols {
@Override // DatanodeProtocol @Override // DatanodeProtocol
public DatanodeCommand blockReport(DatanodeRegistration nodeReg, public DatanodeCommand blockReport(DatanodeRegistration nodeReg,
String poolId, StorageBlockReport[] reports) throws IOException { String poolId, StorageBlockReport[] reports,
BlockReportContext context) throws IOException {
checkNNStartup(); checkNNStartup();
verifyRequest(nodeReg); verifyRequest(nodeReg);
if(blockStateChangeLog.isDebugEnabled()) { if(blockStateChangeLog.isDebugEnabled()) {
@ -1295,14 +1297,15 @@ class NameNodeRpcServer implements NamenodeProtocols {
} }
final BlockManager bm = namesystem.getBlockManager(); final BlockManager bm = namesystem.getBlockManager();
boolean noStaleStorages = false; boolean noStaleStorages = false;
for(StorageBlockReport r : reports) { for (int r = 0; r < reports.length; r++) {
final BlockListAsLongs blocks = r.getBlocks(); final BlockListAsLongs blocks = reports[r].getBlocks();
// //
// BlockManager.processReport accumulates information of prior calls // BlockManager.processReport accumulates information of prior calls
// for the same node and storage, so the value returned by the last // for the same node and storage, so the value returned by the last
// call of this loop is the final updated value for noStaleStorage. // call of this loop is the final updated value for noStaleStorage.
// //
noStaleStorages = bm.processReport(nodeReg, r.getStorage(), blocks); noStaleStorages = bm.processReport(nodeReg, reports[r].getStorage(),
blocks, context, (r == reports.length - 1));
metrics.incrStorageBlockReportOps(); metrics.incrStorageBlockReportOps();
} }

View File

@ -0,0 +1,52 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.protocol;
/**
* The context of the block report.
*
* This is a set of fields that the Datanode sends to provide context about a
* block report RPC. The context includes a unique 64-bit ID which
* identifies the block report as a whole. It also includes the total number
* of RPCs which this block report is split into, and the index into that
* total for the current RPC.
*/
public class BlockReportContext {
private final int totalRpcs;
private final int curRpc;
private final long reportId;
public BlockReportContext(int totalRpcs, int curRpc, long reportId) {
this.totalRpcs = totalRpcs;
this.curRpc = curRpc;
this.reportId = reportId;
}
public int getTotalRpcs() {
return totalRpcs;
}
public int getCurRpc() {
return curRpc;
}
public long getReportId() {
return reportId;
}
}

View File

@ -23,7 +23,6 @@ import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@ -128,20 +127,23 @@ public interface DatanodeProtocol {
* Each finalized block is represented as 3 longs. Each under- * Each finalized block is represented as 3 longs. Each under-
* construction replica is represented as 4 longs. * construction replica is represented as 4 longs.
* This is done instead of Block[] to reduce memory used by block reports. * This is done instead of Block[] to reduce memory used by block reports.
* @param reports report of blocks per storage
* @param context Context information for this block report.
* *
* @return - the next command for DN to process. * @return - the next command for DN to process.
* @throws IOException * @throws IOException
*/ */
@Idempotent @Idempotent
public DatanodeCommand blockReport(DatanodeRegistration registration, public DatanodeCommand blockReport(DatanodeRegistration registration,
String poolId, StorageBlockReport[] reports) throws IOException; String poolId, StorageBlockReport[] reports,
BlockReportContext context) throws IOException;
/** /**
* Communicates the complete list of locally cached blocks to the NameNode. * Communicates the complete list of locally cached blocks to the NameNode.
* *
* This method is similar to * This method is similar to
* {@link #blockReport(DatanodeRegistration, String, StorageBlockReport[])}, * {@link #blockReport(DatanodeRegistration, String, StorageBlockReport[], BlockReportContext)},
* which is used to communicated blocks stored on disk. * which is used to communicated blocks stored on disk.
* *
* @param The datanode registration. * @param The datanode registration.

View File

@ -227,11 +227,25 @@ message HeartbeatResponseProto {
* second long represents length * second long represents length
* third long represents gen stamp * third long represents gen stamp
* fourth long (if under construction) represents replica state * fourth long (if under construction) represents replica state
* context - An optional field containing information about the context
* of this block report.
*/ */
message BlockReportRequestProto { message BlockReportRequestProto {
required DatanodeRegistrationProto registration = 1; required DatanodeRegistrationProto registration = 1;
required string blockPoolId = 2; required string blockPoolId = 2;
repeated StorageBlockReportProto reports = 3; repeated StorageBlockReportProto reports = 3;
optional BlockReportContextProto context = 4;
}
message BlockReportContextProto {
// The total number of RPCs this block report is broken into.
required int32 totalRpcs = 1;
// The index of the current RPC (zero-based)
required int32 curRpc = 2;
// The unique 64-bit ID of this block report
required int64 id = 3;
} }
/** /**

View File

@ -47,6 +47,7 @@ import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
import org.apache.hadoop.hdfs.server.datanode.Replica; import org.apache.hadoop.hdfs.server.datanode.Replica;
import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten; import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten;
import org.apache.hadoop.hdfs.server.datanode.ReplicaWaitingToBeRecovered; import org.apache.hadoop.hdfs.server.datanode.ReplicaWaitingToBeRecovered;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
@ -219,7 +220,8 @@ public class TestBlockListAsLongs {
// check DN sends new-style BR // check DN sends new-style BR
request.set(null); request.set(null);
nsInfo.setCapabilities(Capability.STORAGE_BLOCK_REPORT_BUFFERS.getMask()); nsInfo.setCapabilities(Capability.STORAGE_BLOCK_REPORT_BUFFERS.getMask());
nn.blockReport(reg, "pool", sbr); nn.blockReport(reg, "pool", sbr,
new BlockReportContext(1, 0, System.nanoTime()));
BlockReportRequestProto proto = request.get(); BlockReportRequestProto proto = request.get();
assertNotNull(proto); assertNotNull(proto);
assertTrue(proto.getReports(0).getBlocksList().isEmpty()); assertTrue(proto.getReports(0).getBlocksList().isEmpty());
@ -228,7 +230,8 @@ public class TestBlockListAsLongs {
// back up to prior version and check DN sends old-style BR // back up to prior version and check DN sends old-style BR
request.set(null); request.set(null);
nsInfo.setCapabilities(Capability.UNKNOWN.getMask()); nsInfo.setCapabilities(Capability.UNKNOWN.getMask());
nn.blockReport(reg, "pool", sbr); nn.blockReport(reg, "pool", sbr,
new BlockReportContext(1, 0, System.nanoTime()));
proto = request.get(); proto = request.get();
assertNotNull(proto); assertNotNull(proto);
assertFalse(proto.getReports(0).getBlocksList().isEmpty()); assertFalse(proto.getReports(0).getBlocksList().isEmpty());

View File

@ -555,12 +555,12 @@ public class TestBlockManager {
reset(node); reset(node);
bm.processReport(node, new DatanodeStorage(ds.getStorageID()), bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
BlockListAsLongs.EMPTY); BlockListAsLongs.EMPTY, null, false);
assertEquals(1, ds.getBlockReportCount()); assertEquals(1, ds.getBlockReportCount());
// send block report again, should NOT be processed // send block report again, should NOT be processed
reset(node); reset(node);
bm.processReport(node, new DatanodeStorage(ds.getStorageID()), bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
BlockListAsLongs.EMPTY); BlockListAsLongs.EMPTY, null, false);
assertEquals(1, ds.getBlockReportCount()); assertEquals(1, ds.getBlockReportCount());
// re-register as if node restarted, should update existing node // re-register as if node restarted, should update existing node
@ -571,7 +571,7 @@ public class TestBlockManager {
// send block report, should be processed after restart // send block report, should be processed after restart
reset(node); reset(node);
bm.processReport(node, new DatanodeStorage(ds.getStorageID()), bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
BlockListAsLongs.EMPTY); BlockListAsLongs.EMPTY, null, false);
// Reinitialize as registration with empty storage list pruned // Reinitialize as registration with empty storage list pruned
// node.storageMap. // node.storageMap.
ds = node.getStorageInfos()[0]; ds = node.getStorageInfos()[0];
@ -600,7 +600,7 @@ public class TestBlockManager {
reset(node); reset(node);
doReturn(1).when(node).numBlocks(); doReturn(1).when(node).numBlocks();
bm.processReport(node, new DatanodeStorage(ds.getStorageID()), bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
BlockListAsLongs.EMPTY); BlockListAsLongs.EMPTY, null, false);
assertEquals(1, ds.getBlockReportCount()); assertEquals(1, ds.getBlockReportCount());
} }

View File

@ -18,26 +18,40 @@
package org.apache.hadoop.hdfs.server.blockmanagement; package org.apache.hadoop.hdfs.server.blockmanagement;
import com.google.common.base.Supplier;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.commons.math3.stat.inference.TestUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import static org.hamcrest.core.Is.is; import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat; import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertEquals;
public class TestNameNodePrunesMissingStorages { public class TestNameNodePrunesMissingStorages {
@ -110,7 +124,9 @@ public class TestNameNodePrunesMissingStorages {
} }
/** /**
* Verify that the NameNode does not prune storages with blocks. * Verify that the NameNode does not prune storages with blocks
* simply as a result of a heartbeat being sent missing that storage.
*
* @throws IOException * @throws IOException
*/ */
@Test (timeout=300000) @Test (timeout=300000)
@ -118,4 +134,119 @@ public class TestNameNodePrunesMissingStorages {
// Run the test with 1 storage, after the text still expect 1 storage. // Run the test with 1 storage, after the text still expect 1 storage.
runTest(GenericTestUtils.getMethodName(), true, 1, 1); runTest(GenericTestUtils.getMethodName(), true, 1, 1);
} }
/**
* Regression test for HDFS-7960.<p/>
*
* Shutting down a datanode, removing a storage directory, and restarting
* the DataNode should not produce zombie storages.
*/
@Test(timeout=300000)
public void testRemovingStorageDoesNotProduceZombies() throws Exception {
Configuration conf = new HdfsConfiguration();
conf.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 1);
final int NUM_STORAGES_PER_DN = 2;
final MiniDFSCluster cluster = new MiniDFSCluster
.Builder(conf).numDataNodes(3)
.storagesPerDatanode(NUM_STORAGES_PER_DN)
.build();
try {
cluster.waitActive();
for (DataNode dn : cluster.getDataNodes()) {
assertEquals(NUM_STORAGES_PER_DN,
cluster.getNamesystem().getBlockManager().
getDatanodeManager().getDatanode(dn.getDatanodeId()).
getStorageInfos().length);
}
// Create a file which will end up on all 3 datanodes.
final Path TEST_PATH = new Path("/foo1");
DistributedFileSystem fs = cluster.getFileSystem();
DFSTestUtil.createFile(fs, TEST_PATH, 1024, (short) 3, 0xcafecafe);
for (DataNode dn : cluster.getDataNodes()) {
DataNodeTestUtils.triggerBlockReport(dn);
}
ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, new Path("/foo1"));
cluster.getNamesystem().writeLock();
final String storageIdToRemove;
String datanodeUuid;
// Find the first storage which this block is in.
try {
Iterator<DatanodeStorageInfo> storageInfoIter =
cluster.getNamesystem().getBlockManager().
getStorages(block.getLocalBlock()).iterator();
assertTrue(storageInfoIter.hasNext());
DatanodeStorageInfo info = storageInfoIter.next();
storageIdToRemove = info.getStorageID();
datanodeUuid = info.getDatanodeDescriptor().getDatanodeUuid();
} finally {
cluster.getNamesystem().writeUnlock();
}
// Find the DataNode which holds that first storage.
final DataNode datanodeToRemoveStorageFrom;
int datanodeToRemoveStorageFromIdx = 0;
while (true) {
if (datanodeToRemoveStorageFromIdx >= cluster.getDataNodes().size()) {
Assert.fail("failed to find datanode with uuid " + datanodeUuid);
datanodeToRemoveStorageFrom = null;
break;
}
DataNode dn = cluster.getDataNodes().
get(datanodeToRemoveStorageFromIdx);
if (dn.getDatanodeUuid().equals(datanodeUuid)) {
datanodeToRemoveStorageFrom = dn;
break;
}
datanodeToRemoveStorageFromIdx++;
}
// Find the volume within the datanode which holds that first storage.
List<? extends FsVolumeSpi> volumes =
datanodeToRemoveStorageFrom.getFSDataset().getVolumes();
assertEquals(NUM_STORAGES_PER_DN, volumes.size());
String volumeDirectoryToRemove = null;
for (FsVolumeSpi volume : volumes) {
if (volume.getStorageID().equals(storageIdToRemove)) {
volumeDirectoryToRemove = volume.getBasePath();
}
}
// Shut down the datanode and remove the volume.
// Replace the volume directory with a regular file, which will
// cause a volume failure. (If we merely removed the directory,
// it would be re-initialized with a new storage ID.)
assertNotNull(volumeDirectoryToRemove);
datanodeToRemoveStorageFrom.shutdown();
FileUtil.fullyDelete(new File(volumeDirectoryToRemove));
FileOutputStream fos = new FileOutputStream(volumeDirectoryToRemove);
try {
fos.write(1);
} finally {
fos.close();
}
cluster.restartDataNode(datanodeToRemoveStorageFromIdx);
// Wait for the NameNode to remove the storage.
LOG.info("waiting for the datanode to remove " + storageIdToRemove);
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
final DatanodeDescriptor dnDescriptor =
cluster.getNamesystem().getBlockManager().getDatanodeManager().
getDatanode(datanodeToRemoveStorageFrom.getDatanodeUuid());
assertNotNull(dnDescriptor);
DatanodeStorageInfo[] infos = dnDescriptor.getStorageInfos();
for (DatanodeStorageInfo info : infos) {
if (info.getStorageID().equals(storageIdToRemove)) {
LOG.info("Still found storage " + storageIdToRemove + " on " +
info + ".");
return false;
}
}
assertEquals(NUM_STORAGES_PER_DN - 1, infos.length);
return true;
}
}, 10, 30000);
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
} }

View File

@ -53,6 +53,7 @@ import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport; import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
@ -613,7 +614,8 @@ public abstract class BlockReportTestBase {
.when(spy).blockReport( .when(spy).blockReport(
Mockito.<DatanodeRegistration>anyObject(), Mockito.<DatanodeRegistration>anyObject(),
Mockito.anyString(), Mockito.anyString(),
Mockito.<StorageBlockReport[]>anyObject()); Mockito.<StorageBlockReport[]>anyObject(),
Mockito.<BlockReportContext>anyObject());
// Force a block report to be generated. The block report will have // Force a block report to be generated. The block report will have
// an RBW replica in it. Wait for the RPC to be sent, but block // an RBW replica in it. Wait for the RPC to be sent, but block

View File

@ -43,6 +43,7 @@ import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics; import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand; import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@ -216,7 +217,8 @@ public class TestBPOfferService {
.when(mockNN2).blockReport( .when(mockNN2).blockReport(
Mockito.<DatanodeRegistration>anyObject(), Mockito.<DatanodeRegistration>anyObject(),
Mockito.eq(FAKE_BPID), Mockito.eq(FAKE_BPID),
Mockito.<StorageBlockReport[]>anyObject()); Mockito.<StorageBlockReport[]>anyObject(),
Mockito.<BlockReportContext>anyObject());
bpos.start(); bpos.start();
try { try {
@ -406,7 +408,8 @@ public class TestBPOfferService {
Mockito.verify(mockNN).blockReport( Mockito.verify(mockNN).blockReport(
Mockito.<DatanodeRegistration>anyObject(), Mockito.<DatanodeRegistration>anyObject(),
Mockito.eq(FAKE_BPID), Mockito.eq(FAKE_BPID),
Mockito.<StorageBlockReport[]>anyObject()); Mockito.<StorageBlockReport[]>anyObject(),
Mockito.<BlockReportContext>anyObject());
return true; return true;
} catch (Throwable t) { } catch (Throwable t) {
LOG.info("waiting on block report: " + t.getMessage()); LOG.info("waiting on block report: " + t.getMessage());
@ -431,7 +434,8 @@ public class TestBPOfferService {
Mockito.verify(mockNN).blockReport( Mockito.verify(mockNN).blockReport(
Mockito.<DatanodeRegistration>anyObject(), Mockito.<DatanodeRegistration>anyObject(),
Mockito.eq(FAKE_BPID), Mockito.eq(FAKE_BPID),
Mockito.<StorageBlockReport[]>anyObject()); Mockito.<StorageBlockReport[]>anyObject(),
Mockito.<BlockReportContext>anyObject());
return true; return true;
} catch (Throwable t) { } catch (Throwable t) {
LOG.info("waiting on block report: " + t.getMessage()); LOG.info("waiting on block report: " + t.getMessage());

View File

@ -28,6 +28,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.*; import org.apache.hadoop.hdfs.*;
import org.apache.hadoop.hdfs.protocol.*; import org.apache.hadoop.hdfs.protocol.*;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport; import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
@ -122,7 +123,8 @@ public class TestBlockHasMultipleReplicasOnSameDN {
} }
// Should not assert! // Should not assert!
cluster.getNameNodeRpc().blockReport(dnReg, bpid, reports); cluster.getNameNodeRpc().blockReport(dnReg, bpid, reports,
new BlockReportContext(1, 0, System.nanoTime()));
// Get the block locations once again. // Get the block locations once again.
locatedBlocks = client.getLocatedBlocks(filename, 0, BLOCK_SIZE * NUM_BLOCKS); locatedBlocks = client.getLocatedBlocks(filename, 0, BLOCK_SIZE * NUM_BLOCKS);

View File

@ -63,6 +63,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
@ -185,7 +186,7 @@ public class TestDataNodeVolumeFailure {
new StorageBlockReport(dnStorage, blockList); new StorageBlockReport(dnStorage, blockList);
} }
cluster.getNameNodeRpc().blockReport(dnR, bpid, reports); cluster.getNameNodeRpc().blockReport(dnR, bpid, reports, null);
// verify number of blocks and files... // verify number of blocks and files...
verify(filename, filesize); verify(filename, filesize);

View File

@ -42,6 +42,7 @@ import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse; import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
@ -136,7 +137,8 @@ public class TestDatanodeProtocolRetryPolicy {
Mockito.verify(mockNN).blockReport( Mockito.verify(mockNN).blockReport(
Mockito.eq(datanodeRegistration), Mockito.eq(datanodeRegistration),
Mockito.eq(POOL_ID), Mockito.eq(POOL_ID),
Mockito.<StorageBlockReport[]>anyObject()); Mockito.<StorageBlockReport[]>anyObject(),
Mockito.<BlockReportContext>anyObject());
return true; return true;
} catch (Throwable t) { } catch (Throwable t) {
LOG.info("waiting on block report: " + t.getMessage()); LOG.info("waiting on block report: " + t.getMessage());

View File

@ -29,6 +29,7 @@ import org.apache.hadoop.hdfs.*;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport; import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY;
@ -133,7 +134,7 @@ public class TestDnRespectsBlockReportSplitThreshold {
Mockito.verify(nnSpy, times(cluster.getStoragesPerDatanode())).blockReport( Mockito.verify(nnSpy, times(cluster.getStoragesPerDatanode())).blockReport(
any(DatanodeRegistration.class), any(DatanodeRegistration.class),
anyString(), anyString(),
captor.capture()); captor.capture(), Mockito.<BlockReportContext>anyObject());
verifyCapturedArguments(captor, 1, BLOCKS_IN_FILE); verifyCapturedArguments(captor, 1, BLOCKS_IN_FILE);
} }
@ -165,7 +166,7 @@ public class TestDnRespectsBlockReportSplitThreshold {
Mockito.verify(nnSpy, times(1)).blockReport( Mockito.verify(nnSpy, times(1)).blockReport(
any(DatanodeRegistration.class), any(DatanodeRegistration.class),
anyString(), anyString(),
captor.capture()); captor.capture(), Mockito.<BlockReportContext>anyObject());
verifyCapturedArguments(captor, cluster.getStoragesPerDatanode(), BLOCKS_IN_FILE); verifyCapturedArguments(captor, cluster.getStoragesPerDatanode(), BLOCKS_IN_FILE);
} }
@ -197,7 +198,7 @@ public class TestDnRespectsBlockReportSplitThreshold {
Mockito.verify(nnSpy, times(cluster.getStoragesPerDatanode())).blockReport( Mockito.verify(nnSpy, times(cluster.getStoragesPerDatanode())).blockReport(
any(DatanodeRegistration.class), any(DatanodeRegistration.class),
anyString(), anyString(),
captor.capture()); captor.capture(), Mockito.<BlockReportContext>anyObject());
verifyCapturedArguments(captor, 1, BLOCKS_IN_FILE); verifyCapturedArguments(captor, 1, BLOCKS_IN_FILE);
} }

View File

@ -20,8 +20,10 @@ package org.apache.hadoop.hdfs.server.datanode;
import java.io.IOException; import java.io.IOException;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport; import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
import org.apache.hadoop.util.Time;
/** /**
@ -33,10 +35,13 @@ public class TestNNHandlesBlockReportPerStorage extends BlockReportTestBase {
@Override @Override
protected void sendBlockReports(DatanodeRegistration dnR, String poolId, protected void sendBlockReports(DatanodeRegistration dnR, String poolId,
StorageBlockReport[] reports) throws IOException { StorageBlockReport[] reports) throws IOException {
int i = 0;
for (StorageBlockReport report : reports) { for (StorageBlockReport report : reports) {
LOG.info("Sending block report for storage " + report.getStorage().getStorageID()); LOG.info("Sending block report for storage " + report.getStorage().getStorageID());
StorageBlockReport[] singletonReport = { report }; StorageBlockReport[] singletonReport = { report };
cluster.getNameNodeRpc().blockReport(dnR, poolId, singletonReport); cluster.getNameNodeRpc().blockReport(dnR, poolId, singletonReport,
new BlockReportContext(reports.length, i, System.nanoTime()));
i++;
} }
} }
} }

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.datanode;
import java.io.IOException; import java.io.IOException;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport; import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
@ -34,6 +35,7 @@ public class TestNNHandlesCombinedBlockReport extends BlockReportTestBase {
protected void sendBlockReports(DatanodeRegistration dnR, String poolId, protected void sendBlockReports(DatanodeRegistration dnR, String poolId,
StorageBlockReport[] reports) throws IOException { StorageBlockReport[] reports) throws IOException {
LOG.info("Sending combined block reports for " + dnR); LOG.info("Sending combined block reports for " + dnR);
cluster.getNameNodeRpc().blockReport(dnR, poolId, reports); cluster.getNameNodeRpc().blockReport(dnR, poolId, reports,
new BlockReportContext(1, 0, System.nanoTime()));
} }
} }

View File

@ -32,6 +32,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.client.BlockReportOptions; import org.apache.hadoop.hdfs.client.BlockReportOptions;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus; import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus;
@ -76,7 +77,8 @@ public final class TestTriggerBlockReport {
Mockito.verify(spy, times(0)).blockReport( Mockito.verify(spy, times(0)).blockReport(
any(DatanodeRegistration.class), any(DatanodeRegistration.class),
anyString(), anyString(),
any(StorageBlockReport[].class)); any(StorageBlockReport[].class),
Mockito.<BlockReportContext>anyObject());
Mockito.verify(spy, times(1)).blockReceivedAndDeleted( Mockito.verify(spy, times(1)).blockReceivedAndDeleted(
any(DatanodeRegistration.class), any(DatanodeRegistration.class),
anyString(), anyString(),
@ -113,7 +115,8 @@ public final class TestTriggerBlockReport {
Mockito.verify(spy, timeout(60000)).blockReport( Mockito.verify(spy, timeout(60000)).blockReport(
any(DatanodeRegistration.class), any(DatanodeRegistration.class),
anyString(), anyString(),
any(StorageBlockReport[].class)); any(StorageBlockReport[].class),
Mockito.<BlockReportContext>anyObject());
} }
cluster.shutdown(); cluster.shutdown();

View File

@ -48,6 +48,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataStorage; import org.apache.hadoop.hdfs.server.datanode.DataStorage;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand; import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@ -939,7 +940,8 @@ public class NNThroughputBenchmark implements Tool {
new StorageBlockReport(storage, BlockListAsLongs.EMPTY) new StorageBlockReport(storage, BlockListAsLongs.EMPTY)
}; };
nameNodeProto.blockReport(dnRegistration, nameNodeProto.blockReport(dnRegistration,
nameNode.getNamesystem().getBlockPoolId(), reports); nameNode.getNamesystem().getBlockPoolId(), reports,
new BlockReportContext(1, 0, System.nanoTime()));
} }
/** /**
@ -1184,8 +1186,9 @@ public class NNThroughputBenchmark implements Tool {
long start = Time.now(); long start = Time.now();
StorageBlockReport[] report = { new StorageBlockReport( StorageBlockReport[] report = { new StorageBlockReport(
dn.storage, dn.getBlockReportList()) }; dn.storage, dn.getBlockReportList()) };
nameNodeProto.blockReport(dn.dnRegistration, nameNode.getNamesystem() nameNodeProto.blockReport(dn.dnRegistration,
.getBlockPoolId(), report); nameNode.getNamesystem().getBlockPoolId(), report,
new BlockReportContext(1, 0, System.nanoTime()));
long end = Time.now(); long end = Time.now();
return end-start; return end-start;
} }

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@ -107,7 +108,8 @@ public class TestDeadDatanode {
new DatanodeStorage(reg.getDatanodeUuid()), new DatanodeStorage(reg.getDatanodeUuid()),
BlockListAsLongs.EMPTY) }; BlockListAsLongs.EMPTY) };
try { try {
dnp.blockReport(reg, poolId, report); dnp.blockReport(reg, poolId, report,
new BlockReportContext(1, 0, System.nanoTime()));
fail("Expected IOException is not thrown"); fail("Expected IOException is not thrown");
} catch (IOException ex) { } catch (IOException ex) {
// Expected // Expected

View File

@ -53,6 +53,7 @@ import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport; import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
@ -547,7 +548,8 @@ public class TestDNFencing {
.when(spy).blockReport( .when(spy).blockReport(
Mockito.<DatanodeRegistration>anyObject(), Mockito.<DatanodeRegistration>anyObject(),
Mockito.anyString(), Mockito.anyString(),
Mockito.<StorageBlockReport[]>anyObject()); Mockito.<StorageBlockReport[]>anyObject(),
Mockito.<BlockReportContext>anyObject());
dn.scheduleAllBlockReport(0); dn.scheduleAllBlockReport(0);
delayer.waitForCall(); delayer.waitForCall();