HDFS-7960. The full block report should prune zombie storages even if they're not empty. Contributed by Colin McCabe and Eddy Xu.

(cherry picked from commit 50ee8f4e67)
(cherry picked from commit 2f46ee50bd)

Conflicts:
	hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
	hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
	hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
	hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
	hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/TestBlockListAsLongs.java
	hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
	hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java
	hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestTriggerBlockReport.java

(cherry picked from commit 03d4af39e794dc03d764122077b434d658b6405e)
This commit is contained in:
Andrew Wang 2015-03-23 22:00:34 -07:00 committed by Vinod Kumar Vavilapalli
parent 9b105b78d4
commit 0b1e66f01d
25 changed files with 425 additions and 46 deletions

View File

@ -106,6 +106,9 @@ Release 2.6.1 - UNRELEASED
provided by the client is larger than the one stored in the datanode.
(Brahma Reddy Battula via szetszwo)
HDFS-7960. The full block report should prune zombie storages even if
they're not empty. (cmccabe and Eddy Xu via wang)
Release 2.6.0 - 2014-11-18
INCOMPATIBLE CHANGES

View File

@ -46,6 +46,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReportBadBlo
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageBlockReportProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageReceivedDeletedBlocksProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.VersionRequestProto;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@ -156,7 +157,8 @@ public class DatanodeProtocolClientSideTranslatorPB implements
@Override
public DatanodeCommand blockReport(DatanodeRegistration registration,
String poolId, StorageBlockReport[] reports) throws IOException {
String poolId, StorageBlockReport[] reports, BlockReportContext context)
throws IOException {
BlockReportRequestProto.Builder builder = BlockReportRequestProto
.newBuilder().setRegistration(PBHelper.convert(registration))
.setBlockPoolId(poolId);
@ -170,6 +172,7 @@ public class DatanodeProtocolClientSideTranslatorPB implements
}
builder.addReports(reportBuilder.build());
}
builder.setContext(PBHelper.convert(context));
BlockReportResponseProto resp;
try {
resp = rpcProxy.blockReport(NULL_CONTROLLER, builder.build());

View File

@ -150,7 +150,9 @@ public class DatanodeProtocolServerSideTranslatorPB implements
}
try {
cmd = impl.blockReport(PBHelper.convert(request.getRegistration()),
request.getBlockPoolId(), report);
request.getBlockPoolId(), report,
request.hasContext() ?
PBHelper.convert(request.getContext()) : null);
} catch (IOException e) {
throw new ServiceException(e);
}

View File

@ -110,10 +110,12 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.Rollin
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SafeModeActionProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmIdProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmSlotProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BalancerBandwidthCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockIdCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockRecoveryCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportContextProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeRegistrationProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.FinalizeCommandProto;
@ -193,6 +195,7 @@ import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockIdCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
import org.apache.hadoop.hdfs.server.protocol.CheckpointCommand;
@ -2901,4 +2904,16 @@ public class PBHelper {
ezKeyVersionName);
}
public static BlockReportContext convert(BlockReportContextProto proto) {
return new BlockReportContext(proto.getTotalRpcs(),
proto.getCurRpc(), proto.getId());
}
public static BlockReportContextProto convert(BlockReportContext context) {
return BlockReportContextProto.newBuilder().
setTotalRpcs(context.getTotalRpcs()).
setCurRpc(context.getCurRpc()).
setId(context.getReportId()).
build();
}
}

View File

@ -72,6 +72,7 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
@ -1783,7 +1784,8 @@ public class BlockManager {
*/
public boolean processReport(final DatanodeID nodeID,
final DatanodeStorage storage,
final BlockListAsLongs newReport) throws IOException {
final BlockListAsLongs newReport, BlockReportContext context,
boolean lastStorageInRpc) throws IOException {
namesystem.writeLock();
final long startTime = Time.now(); //after acquiring write lock
final long endTime;
@ -1831,7 +1833,31 @@ public class BlockManager {
+ "contents are no longer considered stale");
rescanPostponedMisreplicatedBlocks();
}
if (context != null) {
storageInfo.setLastBlockReportId(context.getReportId());
if (lastStorageInRpc) {
int rpcsSeen = node.updateBlockReportContext(context);
if (rpcsSeen >= context.getTotalRpcs()) {
List<DatanodeStorageInfo> zombies = node.removeZombieStorages();
if (zombies.isEmpty()) {
LOG.debug("processReport 0x" +
Long.toHexString(context.getReportId()) +
": no zombie storages found.");
} else {
for (DatanodeStorageInfo zombie : zombies) {
removeZombieReplicas(context, zombie);
}
}
node.clearBlockReportContext();
} else {
LOG.debug("processReport 0x" +
Long.toHexString(context.getReportId()) + ": " +
(context.getTotalRpcs() - rpcsSeen) +
" more RPCs remaining in this report.");
}
}
}
} finally {
endTime = Time.now();
namesystem.writeUnlock();
@ -1857,6 +1883,31 @@ public class BlockManager {
return !node.hasStaleStorages();
}
private void removeZombieReplicas(BlockReportContext context,
DatanodeStorageInfo zombie) {
LOG.warn("processReport 0x" + Long.toHexString(context.getReportId()) +
": removing zombie storage " + zombie.getStorageID() +
", which no longer exists on the DataNode.");
assert(namesystem.hasWriteLock());
Iterator<BlockInfo> iter = zombie.getBlockIterator();
int prevBlocks = zombie.numBlocks();
while (iter.hasNext()) {
BlockInfo block = iter.next();
// We assume that a block can be on only one storage in a DataNode.
// That's why we pass in the DatanodeDescriptor rather than the
// DatanodeStorageInfo.
// TODO: remove this assumption in case we want to put a block on
// more than one storage on a datanode (and because it's a difficult
// assumption to really enforce)
removeStoredBlock(block, zombie.getDatanodeDescriptor());
invalidateBlocks.remove(zombie.getDatanodeDescriptor(), block);
}
assert(zombie.numBlocks() == 0);
LOG.warn("processReport 0x" + Long.toHexString(context.getReportId()) +
": removed " + prevBlocks + " replicas from storage " +
zombie.getStorageID() + ", which no longer exists on the DataNode.");
}
/**
* Rescan the list of blocks which were previously postponed.
*/

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.blockmanagement;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@ -31,6 +32,7 @@ import java.util.Set;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@ -40,6 +42,7 @@ import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.util.EnumCounters;
@ -63,7 +66,25 @@ public class DatanodeDescriptor extends DatanodeInfo {
// Stores status of decommissioning.
// If node is not decommissioning, do not use this object for anything.
public final DecommissioningStatus decommissioningStatus = new DecommissioningStatus();
private long curBlockReportId = 0;
private BitSet curBlockReportRpcsSeen = null;
public int updateBlockReportContext(BlockReportContext context) {
if (curBlockReportId != context.getReportId()) {
curBlockReportId = context.getReportId();
curBlockReportRpcsSeen = new BitSet(context.getTotalRpcs());
}
curBlockReportRpcsSeen.set(context.getCurRpc());
return curBlockReportRpcsSeen.cardinality();
}
public void clearBlockReportContext() {
curBlockReportId = 0;
curBlockReportRpcsSeen = null;
}
/** Block and targets pair */
@InterfaceAudience.Private
@InterfaceStability.Evolving
@ -282,6 +303,34 @@ public class DatanodeDescriptor extends DatanodeInfo {
}
}
static final private List<DatanodeStorageInfo> EMPTY_STORAGE_INFO_LIST =
ImmutableList.of();
List<DatanodeStorageInfo> removeZombieStorages() {
List<DatanodeStorageInfo> zombies = null;
synchronized (storageMap) {
Iterator<Map.Entry<String, DatanodeStorageInfo>> iter =
storageMap.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry<String, DatanodeStorageInfo> entry = iter.next();
DatanodeStorageInfo storageInfo = entry.getValue();
if (storageInfo.getLastBlockReportId() != curBlockReportId) {
LOG.info(storageInfo.getStorageID() + " had lastBlockReportId 0x" +
Long.toHexString(storageInfo.getLastBlockReportId()) +
", but curBlockReportId = 0x" +
Long.toHexString(curBlockReportId));
iter.remove();
if (zombies == null) {
zombies = new LinkedList<DatanodeStorageInfo>();
}
zombies.add(storageInfo);
}
storageInfo.setLastBlockReportId(0);
}
}
return zombies == null ? EMPTY_STORAGE_INFO_LIST : zombies;
}
/**
* Remove block from the list of blocks belonging to the data-node. Remove
* data-node from the block.

View File

@ -115,6 +115,9 @@ public class DatanodeStorageInfo {
private volatile BlockInfo blockList = null;
private int numBlocks = 0;
// The ID of the last full block report which updated this storage.
private long lastBlockReportId = 0;
/** The number of block reports received */
private int blockReportCount = 0;
@ -178,7 +181,15 @@ public class DatanodeStorageInfo {
this.remaining = remaining;
this.blockPoolUsed = blockPoolUsed;
}
long getLastBlockReportId() {
return lastBlockReportId;
}
void setLastBlockReportId(long lastBlockReportId) {
this.lastBlockReportId = lastBlockReportId;
}
State getState() {
return this.state;
}

View File

@ -43,6 +43,7 @@ import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
@ -449,6 +450,17 @@ class BPServiceActor implements Runnable {
return sendImmediateIBR;
}
private long prevBlockReportId = 0;
private long generateUniqueBlockReportId() {
long id = System.nanoTime();
if (id <= prevBlockReportId) {
id = prevBlockReportId + 1;
}
prevBlockReportId = id;
return id;
}
/**
* Report the list blocks to the Namenode
* @return DatanodeCommands returned by the NN. May be null.
@ -492,11 +504,13 @@ class BPServiceActor implements Runnable {
int numRPCs = 0;
boolean success = false;
long brSendStartTime = now();
long reportId = generateUniqueBlockReportId();
try {
if (totalBlockCount < dnConf.blockReportSplitThreshold) {
// Below split threshold, send all reports in a single message.
DatanodeCommand cmd = bpNamenode.blockReport(
bpRegistration, bpos.getBlockPoolId(), reports);
bpRegistration, bpos.getBlockPoolId(), reports,
new BlockReportContext(1, 0, reportId));
numRPCs = 1;
numReportsSent = reports.length;
if (cmd != null) {
@ -504,10 +518,11 @@ class BPServiceActor implements Runnable {
}
} else {
// Send one block report per message.
for (StorageBlockReport report : reports) {
StorageBlockReport singleReport[] = { report };
for (int r = 0; r < reports.length; r++) {
StorageBlockReport singleReport[] = { reports[r] };
DatanodeCommand cmd = bpNamenode.blockReport(
bpRegistration, bpos.getBlockPoolId(), singleReport);
bpRegistration, bpos.getBlockPoolId(), singleReport,
new BlockReportContext(reports.length, r, reportId));
numReportsSent++;
numRPCs++;
if (cmd != null) {
@ -523,11 +538,12 @@ class BPServiceActor implements Runnable {
dn.getMetrics().addBlockReport(brSendCost);
final int nCmds = cmds.size();
LOG.info((success ? "S" : "Uns") +
"uccessfully sent " + numReportsSent +
" of " + reports.length +
" blockreports for " + totalBlockCount +
" total blocks using " + numRPCs +
" RPCs. This took " + brCreateCost +
"uccessfully sent block report 0x" +
Long.toHexString(reportId) + ", containing " + reports.length +
" storage report(s), of which we sent " + numReportsSent + "." +
" The reports had " + totalBlockCount +
" total blocks and used " + numRPCs +
" RPC(s). This took " + brCreateCost +
" msec to generate and " + brSendCost +
" msecs for RPC and NN processing." +
" Got back " +

View File

@ -118,6 +118,7 @@ import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
@ -1142,7 +1143,8 @@ class NameNodeRpcServer implements NamenodeProtocols {
@Override // DatanodeProtocol
public DatanodeCommand blockReport(DatanodeRegistration nodeReg,
String poolId, StorageBlockReport[] reports) throws IOException {
String poolId, StorageBlockReport[] reports,
BlockReportContext context) throws IOException {
checkNNStartup();
verifyRequest(nodeReg);
if(blockStateChangeLog.isDebugEnabled()) {
@ -1151,14 +1153,16 @@ class NameNodeRpcServer implements NamenodeProtocols {
}
final BlockManager bm = namesystem.getBlockManager();
boolean noStaleStorages = false;
for(StorageBlockReport r : reports) {
final BlockListAsLongs blocks = new BlockListAsLongs(r.getBlocks());
for (int r = 0; r < reports.length; r++) {
final BlockListAsLongs blocks =
new BlockListAsLongs(reports[r].getBlocks());
//
// BlockManager.processReport accumulates information of prior calls
// for the same node and storage, so the value returned by the last
// call of this loop is the final updated value for noStaleStorage.
//
noStaleStorages = bm.processReport(nodeReg, r.getStorage(), blocks);
noStaleStorages = bm.processReport(nodeReg, reports[r].getStorage(),
blocks, context, (r == reports.length - 1));
metrics.incrStorageBlockReportOps();
}

View File

@ -0,0 +1,52 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.protocol;
/**
* The context of the block report.
*
* This is a set of fields that the Datanode sends to provide context about a
* block report RPC. The context includes a unique 64-bit ID which
* identifies the block report as a whole. It also includes the total number
* of RPCs which this block report is split into, and the index into that
* total for the current RPC.
*/
public class BlockReportContext {
private final int totalRpcs;
private final int curRpc;
private final long reportId;
public BlockReportContext(int totalRpcs, int curRpc, long reportId) {
this.totalRpcs = totalRpcs;
this.curRpc = curRpc;
this.reportId = reportId;
}
public int getTotalRpcs() {
return totalRpcs;
}
public int getCurRpc() {
return curRpc;
}
public long getReportId() {
return reportId;
}
}

View File

@ -23,7 +23,6 @@ import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@ -125,20 +124,23 @@ public interface DatanodeProtocol {
* Each finalized block is represented as 3 longs. Each under-
* construction replica is represented as 4 longs.
* This is done instead of Block[] to reduce memory used by block reports.
*
* @param reports report of blocks per storage
* @param context Context information for this block report.
*
* @return - the next command for DN to process.
* @throws IOException
*/
@Idempotent
public DatanodeCommand blockReport(DatanodeRegistration registration,
String poolId, StorageBlockReport[] reports) throws IOException;
String poolId, StorageBlockReport[] reports,
BlockReportContext context) throws IOException;
/**
* Communicates the complete list of locally cached blocks to the NameNode.
*
* This method is similar to
* {@link #blockReport(DatanodeRegistration, String, StorageBlockReport[])},
* {@link #blockReport(DatanodeRegistration, String, StorageBlockReport[], BlockReportContext)},
* which is used to communicated blocks stored on disk.
*
* @param The datanode registration.

View File

@ -212,11 +212,25 @@ message HeartbeatResponseProto {
* second long represents length
* third long represents gen stamp
* fourth long (if under construction) represents replica state
* context - An optional field containing information about the context
* of this block report.
*/
message BlockReportRequestProto {
required DatanodeRegistrationProto registration = 1;
required string blockPoolId = 2;
repeated StorageBlockReportProto reports = 3;
optional BlockReportContextProto context = 4;
}
message BlockReportContextProto {
// The total number of RPCs this block report is broken into.
required int32 totalRpcs = 1;
// The index of the current RPC (zero-based)
required int32 curRpc = 2;
// The unique 64-bit ID of this block report
required int64 id = 3;
}
/**

View File

@ -560,12 +560,12 @@ public class TestBlockManager {
reset(node);
bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
new BlockListAsLongs(null, null));
new BlockListAsLongs(null, null), null, false);
assertEquals(1, ds.getBlockReportCount());
// send block report again, should NOT be processed
reset(node);
bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
new BlockListAsLongs(null, null));
new BlockListAsLongs(null, null), null, false);
assertEquals(1, ds.getBlockReportCount());
// re-register as if node restarted, should update existing node
@ -576,7 +576,7 @@ public class TestBlockManager {
// send block report, should be processed after restart
reset(node);
bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
new BlockListAsLongs(null, null));
new BlockListAsLongs(null, null), null, false);
// Reinitialize as registration with empty storage list pruned
// node.storageMap.
ds = node.getStorageInfos()[0];
@ -608,7 +608,7 @@ public class TestBlockManager {
reset(node);
doReturn(1).when(node).numBlocks();
bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
new BlockListAsLongs(null, null));
new BlockListAsLongs(null, null), null, false);
assertEquals(1, ds.getBlockReportCount());
}

View File

@ -18,27 +18,41 @@
package org.apache.hadoop.hdfs.server.blockmanagement;
import com.google.common.base.Supplier;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.math3.stat.inference.TestUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Assert;
import org.junit.Test;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertEquals;
public class TestNameNodePrunesMissingStorages {
@ -110,7 +124,9 @@ public class TestNameNodePrunesMissingStorages {
}
/**
* Verify that the NameNode does not prune storages with blocks.
* Verify that the NameNode does not prune storages with blocks
* simply as a result of a heartbeat being sent missing that storage.
*
* @throws IOException
*/
@Test (timeout=300000)
@ -118,4 +134,119 @@ public class TestNameNodePrunesMissingStorages {
// Run the test with 1 storage, after the text still expect 1 storage.
runTest(GenericTestUtils.getMethodName(), true, 1, 1);
}
/**
* Regression test for HDFS-7960.<p/>
*
* Shutting down a datanode, removing a storage directory, and restarting
* the DataNode should not produce zombie storages.
*/
@Test(timeout=300000)
public void testRemovingStorageDoesNotProduceZombies() throws Exception {
Configuration conf = new HdfsConfiguration();
conf.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 1);
final int NUM_STORAGES_PER_DN = 2;
final MiniDFSCluster cluster = new MiniDFSCluster
.Builder(conf).numDataNodes(3)
.storagesPerDatanode(NUM_STORAGES_PER_DN)
.build();
try {
cluster.waitActive();
for (DataNode dn : cluster.getDataNodes()) {
assertEquals(NUM_STORAGES_PER_DN,
cluster.getNamesystem().getBlockManager().
getDatanodeManager().getDatanode(dn.getDatanodeId()).
getStorageInfos().length);
}
// Create a file which will end up on all 3 datanodes.
final Path TEST_PATH = new Path("/foo1");
DistributedFileSystem fs = cluster.getFileSystem();
DFSTestUtil.createFile(fs, TEST_PATH, 1024, (short) 3, 0xcafecafe);
for (DataNode dn : cluster.getDataNodes()) {
DataNodeTestUtils.triggerBlockReport(dn);
}
ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, new Path("/foo1"));
cluster.getNamesystem().writeLock();
final String storageIdToRemove;
String datanodeUuid;
// Find the first storage which this block is in.
try {
Iterator<DatanodeStorageInfo> storageInfoIter =
cluster.getNamesystem().getBlockManager().
getStorages(block.getLocalBlock()).iterator();
assertTrue(storageInfoIter.hasNext());
DatanodeStorageInfo info = storageInfoIter.next();
storageIdToRemove = info.getStorageID();
datanodeUuid = info.getDatanodeDescriptor().getDatanodeUuid();
} finally {
cluster.getNamesystem().writeUnlock();
}
// Find the DataNode which holds that first storage.
final DataNode datanodeToRemoveStorageFrom;
int datanodeToRemoveStorageFromIdx = 0;
while (true) {
if (datanodeToRemoveStorageFromIdx >= cluster.getDataNodes().size()) {
Assert.fail("failed to find datanode with uuid " + datanodeUuid);
datanodeToRemoveStorageFrom = null;
break;
}
DataNode dn = cluster.getDataNodes().
get(datanodeToRemoveStorageFromIdx);
if (dn.getDatanodeUuid().equals(datanodeUuid)) {
datanodeToRemoveStorageFrom = dn;
break;
}
datanodeToRemoveStorageFromIdx++;
}
// Find the volume within the datanode which holds that first storage.
List<? extends FsVolumeSpi> volumes =
datanodeToRemoveStorageFrom.getFSDataset().getVolumes();
assertEquals(NUM_STORAGES_PER_DN, volumes.size());
String volumeDirectoryToRemove = null;
for (FsVolumeSpi volume : volumes) {
if (volume.getStorageID().equals(storageIdToRemove)) {
volumeDirectoryToRemove = volume.getBasePath();
}
}
// Shut down the datanode and remove the volume.
// Replace the volume directory with a regular file, which will
// cause a volume failure. (If we merely removed the directory,
// it would be re-initialized with a new storage ID.)
assertNotNull(volumeDirectoryToRemove);
datanodeToRemoveStorageFrom.shutdown();
FileUtil.fullyDelete(new File(volumeDirectoryToRemove));
FileOutputStream fos = new FileOutputStream(volumeDirectoryToRemove);
try {
fos.write(1);
} finally {
fos.close();
}
cluster.restartDataNode(datanodeToRemoveStorageFromIdx);
// Wait for the NameNode to remove the storage.
LOG.info("waiting for the datanode to remove " + storageIdToRemove);
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
final DatanodeDescriptor dnDescriptor =
cluster.getNamesystem().getBlockManager().getDatanodeManager().
getDatanode(datanodeToRemoveStorageFrom.getDatanodeUuid());
assertNotNull(dnDescriptor);
DatanodeStorageInfo[] infos = dnDescriptor.getStorageInfos();
for (DatanodeStorageInfo info : infos) {
if (info.getStorageID().equals(storageIdToRemove)) {
LOG.info("Still found storage " + storageIdToRemove + " on " +
info + ".");
return false;
}
}
assertEquals(NUM_STORAGES_PER_DN - 1, infos.length);
return true;
}
}, 10, 30000);
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
}

View File

@ -53,6 +53,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
@ -603,7 +604,8 @@ public abstract class BlockReportTestBase {
.when(spy).blockReport(
Mockito.<DatanodeRegistration>anyObject(),
Mockito.anyString(),
Mockito.<StorageBlockReport[]>anyObject());
Mockito.<StorageBlockReport[]>anyObject(),
Mockito.<BlockReportContext>anyObject());
// Force a block report to be generated. The block report will have
// an RBW replica in it. Wait for the RPC to be sent, but block

View File

@ -41,6 +41,7 @@ import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@ -209,7 +210,8 @@ public class TestBPOfferService {
.when(mockNN2).blockReport(
Mockito.<DatanodeRegistration>anyObject(),
Mockito.eq(FAKE_BPID),
Mockito.<StorageBlockReport[]>anyObject());
Mockito.<StorageBlockReport[]>anyObject(),
Mockito.<BlockReportContext>anyObject());
bpos.start();
try {
@ -399,7 +401,8 @@ public class TestBPOfferService {
Mockito.verify(mockNN).blockReport(
Mockito.<DatanodeRegistration>anyObject(),
Mockito.eq(FAKE_BPID),
Mockito.<StorageBlockReport[]>anyObject());
Mockito.<StorageBlockReport[]>anyObject(),
Mockito.<BlockReportContext>anyObject());
return true;
} catch (Throwable t) {
LOG.info("waiting on block report: " + t.getMessage());
@ -424,7 +427,8 @@ public class TestBPOfferService {
Mockito.verify(mockNN).blockReport(
Mockito.<DatanodeRegistration>anyObject(),
Mockito.eq(FAKE_BPID),
Mockito.<StorageBlockReport[]>anyObject());
Mockito.<StorageBlockReport[]>anyObject(),
Mockito.<BlockReportContext>anyObject());
return true;
} catch (Throwable t) {
LOG.info("waiting on block report: " + t.getMessage());

View File

@ -28,6 +28,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.*;
import org.apache.hadoop.hdfs.protocol.*;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
@ -121,7 +122,8 @@ public class TestBlockHasMultipleReplicasOnSameDN {
}
// Should not assert!
cluster.getNameNodeRpc().blockReport(dnReg, bpid, reports);
cluster.getNameNodeRpc().blockReport(dnReg, bpid, reports,
new BlockReportContext(1, 0, System.nanoTime()));
// Get the block locations once again.
locatedBlocks = client.getLocatedBlocks(filename, 0, BLOCK_SIZE * NUM_BLOCKS);

View File

@ -58,6 +58,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
@ -184,7 +185,7 @@ public class TestDataNodeVolumeFailure {
new StorageBlockReport(dnStorage, blockList.getBlockListAsLongs());
}
cluster.getNameNodeRpc().blockReport(dnR, bpid, reports);
cluster.getNameNodeRpc().blockReport(dnR, bpid, reports, null);
// verify number of blocks and files...
verify(filename, filesize);

View File

@ -42,6 +42,7 @@ import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
@ -135,7 +136,8 @@ public class TestDatanodeProtocolRetryPolicy {
Mockito.verify(mockNN).blockReport(
Mockito.eq(datanodeRegistration),
Mockito.eq(POOL_ID),
Mockito.<StorageBlockReport[]>anyObject());
Mockito.<StorageBlockReport[]>anyObject(),
Mockito.<BlockReportContext>anyObject());
return true;
} catch (Throwable t) {
LOG.info("waiting on block report: " + t.getMessage());

View File

@ -29,6 +29,7 @@ import org.apache.hadoop.hdfs.*;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY;
@ -133,7 +134,7 @@ public class TestDnRespectsBlockReportSplitThreshold {
Mockito.verify(nnSpy, times(cluster.getStoragesPerDatanode())).blockReport(
any(DatanodeRegistration.class),
anyString(),
captor.capture());
captor.capture(), Mockito.<BlockReportContext>anyObject());
verifyCapturedArguments(captor, 1, BLOCKS_IN_FILE);
}
@ -165,7 +166,7 @@ public class TestDnRespectsBlockReportSplitThreshold {
Mockito.verify(nnSpy, times(1)).blockReport(
any(DatanodeRegistration.class),
anyString(),
captor.capture());
captor.capture(), Mockito.<BlockReportContext>anyObject());
verifyCapturedArguments(captor, cluster.getStoragesPerDatanode(), BLOCKS_IN_FILE);
}
@ -197,7 +198,7 @@ public class TestDnRespectsBlockReportSplitThreshold {
Mockito.verify(nnSpy, times(cluster.getStoragesPerDatanode())).blockReport(
any(DatanodeRegistration.class),
anyString(),
captor.capture());
captor.capture(), Mockito.<BlockReportContext>anyObject());
verifyCapturedArguments(captor, 1, BLOCKS_IN_FILE);
}

View File

@ -20,8 +20,10 @@ package org.apache.hadoop.hdfs.server.datanode;
import java.io.IOException;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
import org.apache.hadoop.util.Time;
/**
@ -33,10 +35,13 @@ public class TestNNHandlesBlockReportPerStorage extends BlockReportTestBase {
@Override
protected void sendBlockReports(DatanodeRegistration dnR, String poolId,
StorageBlockReport[] reports) throws IOException {
int i = 0;
for (StorageBlockReport report : reports) {
LOG.info("Sending block report for storage " + report.getStorage().getStorageID());
StorageBlockReport[] singletonReport = { report };
cluster.getNameNodeRpc().blockReport(dnR, poolId, singletonReport);
cluster.getNameNodeRpc().blockReport(dnR, poolId, singletonReport,
new BlockReportContext(reports.length, i, System.nanoTime()));
i++;
}
}
}

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.datanode;
import java.io.IOException;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
@ -34,6 +35,7 @@ public class TestNNHandlesCombinedBlockReport extends BlockReportTestBase {
protected void sendBlockReports(DatanodeRegistration dnR, String poolId,
StorageBlockReport[] reports) throws IOException {
LOG.info("Sending combined block reports for " + dnR);
cluster.getNameNodeRpc().blockReport(dnR, poolId, reports);
cluster.getNameNodeRpc().blockReport(dnR, poolId, reports,
new BlockReportContext(1, 0, System.nanoTime()));
}
}

View File

@ -46,6 +46,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataStorage;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@ -938,7 +939,8 @@ public class NNThroughputBenchmark implements Tool {
new BlockListAsLongs(null, null).getBlockListAsLongs())
};
nameNodeProto.blockReport(dnRegistration,
nameNode.getNamesystem().getBlockPoolId(), reports);
nameNode.getNamesystem().getBlockPoolId(), reports,
new BlockReportContext(1, 0, System.nanoTime()));
}
/**
@ -1181,8 +1183,9 @@ public class NNThroughputBenchmark implements Tool {
long start = Time.now();
StorageBlockReport[] report = { new StorageBlockReport(
dn.storage, dn.getBlockReportList()) };
nameNodeProto.blockReport(dn.dnRegistration, nameNode.getNamesystem()
.getBlockPoolId(), report);
nameNodeProto.blockReport(dn.dnRegistration,
nameNode.getNamesystem().getBlockPoolId(), report,
new BlockReportContext(1, 0, System.nanoTime()));
long end = Time.now();
return end-start;
}

View File

@ -34,6 +34,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@ -132,7 +133,8 @@ public class TestDeadDatanode {
new DatanodeStorage(reg.getDatanodeUuid()),
new long[] { 0L, 0L, 0L }) };
try {
dnp.blockReport(reg, poolId, report);
dnp.blockReport(reg, poolId, report,
new BlockReportContext(1, 0, System.nanoTime()));
fail("Expected IOException is not thrown");
} catch (IOException ex) {
// Expected

View File

@ -53,6 +53,7 @@ import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
import org.apache.hadoop.io.IOUtils;
@ -537,7 +538,8 @@ public class TestDNFencing {
.when(spy).blockReport(
Mockito.<DatanodeRegistration>anyObject(),
Mockito.anyString(),
Mockito.<StorageBlockReport[]>anyObject());
Mockito.<StorageBlockReport[]>anyObject(),
Mockito.<BlockReportContext>anyObject());
dn.scheduleAllBlockReport(0);
delayer.waitForCall();