HDFS-7960. The full block report should prune zombie storages even if they're not empty. Contributed by Colin McCabe and Eddy Xu.

This commit is contained in:
Andrew Wang 2015-03-23 22:00:34 -07:00
parent d7e3c3364e
commit 50ee8f4e67
27 changed files with 433 additions and 49 deletions

View File

@ -1241,6 +1241,9 @@ Release 2.7.0 - UNRELEASED
provided by the client is larger than the one stored in the datanode.
(Brahma Reddy Battula via szetszwo)
HDFS-7960. The full block report should prune zombie storages even if
they're not empty. (cmccabe and Eddy Xu via wang)
BREAKDOWN OF HDFS-7584 SUBTASKS AND RELATED JIRAS
HDFS-7720. Quota by Storage Type API, tools and ClientNameNode

View File

@ -47,6 +47,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReportBadBlo
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageBlockReportProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageReceivedDeletedBlocksProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.VersionRequestProto;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@ -169,7 +170,8 @@ public class DatanodeProtocolClientSideTranslatorPB implements
@Override
public DatanodeCommand blockReport(DatanodeRegistration registration,
String poolId, StorageBlockReport[] reports) throws IOException {
String poolId, StorageBlockReport[] reports, BlockReportContext context)
throws IOException {
BlockReportRequestProto.Builder builder = BlockReportRequestProto
.newBuilder().setRegistration(PBHelper.convert(registration))
.setBlockPoolId(poolId);
@ -191,6 +193,7 @@ public class DatanodeProtocolClientSideTranslatorPB implements
}
builder.addReports(reportBuilder.build());
}
builder.setContext(PBHelper.convert(context));
BlockReportResponseProto resp;
try {
resp = rpcProxy.blockReport(NULL_CONTROLLER, builder.build());

View File

@ -161,7 +161,9 @@ public class DatanodeProtocolServerSideTranslatorPB implements
}
try {
cmd = impl.blockReport(PBHelper.convert(request.getRegistration()),
request.getBlockPoolId(), report);
request.getBlockPoolId(), report,
request.hasContext() ?
PBHelper.convert(request.getContext()) : null);
} catch (IOException e) {
throw new ServiceException(e);
}

View File

@ -111,6 +111,7 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.Rollin
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SafeModeActionProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmIdProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmSlotProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BalancerBandwidthCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockIdCommandProto;
@ -123,6 +124,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.NNHAStatusHe
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReceivedDeletedBlockInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.RegisterCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.VolumeFailureSummaryProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportContextProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockKeyProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
@ -194,6 +196,7 @@ import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockIdCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
import org.apache.hadoop.hdfs.server.protocol.CheckpointCommand;
@ -3009,4 +3012,16 @@ public class PBHelper {
return targetPinnings;
}
public static BlockReportContext convert(BlockReportContextProto proto) {
return new BlockReportContext(proto.getTotalRpcs(),
proto.getCurRpc(), proto.getId());
}
public static BlockReportContextProto convert(BlockReportContext context) {
return BlockReportContextProto.newBuilder().
setTotalRpcs(context.getTotalRpcs()).
setCurRpc(context.getCurRpc()).
setId(context.getReportId()).
build();
}
}

View File

@ -69,6 +69,7 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
@ -1770,7 +1771,8 @@ public class BlockManager {
*/
public boolean processReport(final DatanodeID nodeID,
final DatanodeStorage storage,
final BlockListAsLongs newReport) throws IOException {
final BlockListAsLongs newReport, BlockReportContext context,
boolean lastStorageInRpc) throws IOException {
namesystem.writeLock();
final long startTime = Time.monotonicNow(); //after acquiring write lock
final long endTime;
@ -1809,6 +1811,29 @@ public class BlockManager {
}
storageInfo.receivedBlockReport();
if (context != null) {
storageInfo.setLastBlockReportId(context.getReportId());
if (lastStorageInRpc) {
int rpcsSeen = node.updateBlockReportContext(context);
if (rpcsSeen >= context.getTotalRpcs()) {
List<DatanodeStorageInfo> zombies = node.removeZombieStorages();
if (zombies.isEmpty()) {
LOG.debug("processReport 0x{}: no zombie storages found.",
Long.toHexString(context.getReportId()));
} else {
for (DatanodeStorageInfo zombie : zombies) {
removeZombieReplicas(context, zombie);
}
}
node.clearBlockReportContext();
} else {
LOG.debug("processReport 0x{}: {} more RPCs remaining in this " +
"report.", Long.toHexString(context.getReportId()),
(context.getTotalRpcs() - rpcsSeen)
);
}
}
}
} finally {
endTime = Time.monotonicNow();
namesystem.writeUnlock();
@ -1833,6 +1858,32 @@ public class BlockManager {
return !node.hasStaleStorages();
}
private void removeZombieReplicas(BlockReportContext context,
DatanodeStorageInfo zombie) {
LOG.warn("processReport 0x{}: removing zombie storage {}, which no " +
"longer exists on the DataNode.",
Long.toHexString(context.getReportId()), zombie.getStorageID());
assert(namesystem.hasWriteLock());
Iterator<BlockInfoContiguous> iter = zombie.getBlockIterator();
int prevBlocks = zombie.numBlocks();
while (iter.hasNext()) {
BlockInfoContiguous block = iter.next();
// We assume that a block can be on only one storage in a DataNode.
// That's why we pass in the DatanodeDescriptor rather than the
// DatanodeStorageInfo.
// TODO: remove this assumption in case we want to put a block on
// more than one storage on a datanode (and because it's a difficult
// assumption to really enforce)
removeStoredBlock(block, zombie.getDatanodeDescriptor());
invalidateBlocks.remove(zombie.getDatanodeDescriptor(), block);
}
assert(zombie.numBlocks() == 0);
LOG.warn("processReport 0x{}: removed {} replicas from storage {}, " +
"which no longer exists on the DataNode.",
Long.toHexString(context.getReportId()), prevBlocks,
zombie.getStorageID());
}
/**
* Rescan the list of blocks which were previously postponed.
*/

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.blockmanagement;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@ -31,6 +32,7 @@ import java.util.Set;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@ -40,6 +42,7 @@ import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
@ -62,7 +65,25 @@ public class DatanodeDescriptor extends DatanodeInfo {
// Stores status of decommissioning.
// If node is not decommissioning, do not use this object for anything.
public final DecommissioningStatus decommissioningStatus = new DecommissioningStatus();
private long curBlockReportId = 0;
private BitSet curBlockReportRpcsSeen = null;
public int updateBlockReportContext(BlockReportContext context) {
if (curBlockReportId != context.getReportId()) {
curBlockReportId = context.getReportId();
curBlockReportRpcsSeen = new BitSet(context.getTotalRpcs());
}
curBlockReportRpcsSeen.set(context.getCurRpc());
return curBlockReportRpcsSeen.cardinality();
}
public void clearBlockReportContext() {
curBlockReportId = 0;
curBlockReportRpcsSeen = null;
}
/** Block and targets pair */
@InterfaceAudience.Private
@InterfaceStability.Evolving
@ -282,6 +303,34 @@ public class DatanodeDescriptor extends DatanodeInfo {
}
}
static final private List<DatanodeStorageInfo> EMPTY_STORAGE_INFO_LIST =
ImmutableList.of();
List<DatanodeStorageInfo> removeZombieStorages() {
List<DatanodeStorageInfo> zombies = null;
synchronized (storageMap) {
Iterator<Map.Entry<String, DatanodeStorageInfo>> iter =
storageMap.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry<String, DatanodeStorageInfo> entry = iter.next();
DatanodeStorageInfo storageInfo = entry.getValue();
if (storageInfo.getLastBlockReportId() != curBlockReportId) {
LOG.info(storageInfo.getStorageID() + " had lastBlockReportId 0x" +
Long.toHexString(storageInfo.getLastBlockReportId()) +
", but curBlockReportId = 0x" +
Long.toHexString(curBlockReportId));
iter.remove();
if (zombies == null) {
zombies = new LinkedList<DatanodeStorageInfo>();
}
zombies.add(storageInfo);
}
storageInfo.setLastBlockReportId(0);
}
}
return zombies == null ? EMPTY_STORAGE_INFO_LIST : zombies;
}
/**
* Remove block from the list of blocks belonging to the data-node. Remove
* data-node from the block.

View File

@ -115,6 +115,9 @@ public class DatanodeStorageInfo {
private volatile BlockInfoContiguous blockList = null;
private int numBlocks = 0;
// The ID of the last full block report which updated this storage.
private long lastBlockReportId = 0;
/** The number of block reports received */
private int blockReportCount = 0;
@ -178,7 +181,15 @@ public class DatanodeStorageInfo {
this.remaining = remaining;
this.blockPoolUsed = blockPoolUsed;
}
long getLastBlockReportId() {
return lastBlockReportId;
}
void setLastBlockReportId(long lastBlockReportId) {
this.lastBlockReportId = lastBlockReportId;
}
State getState() {
return this.state;
}

View File

@ -44,6 +44,7 @@ import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
@ -434,6 +435,17 @@ class BPServiceActor implements Runnable {
return sendImmediateIBR;
}
private long prevBlockReportId = 0;
private long generateUniqueBlockReportId() {
long id = System.nanoTime();
if (id <= prevBlockReportId) {
id = prevBlockReportId + 1;
}
prevBlockReportId = id;
return id;
}
/**
* Report the list blocks to the Namenode
* @return DatanodeCommands returned by the NN. May be null.
@ -476,11 +488,13 @@ class BPServiceActor implements Runnable {
int numRPCs = 0;
boolean success = false;
long brSendStartTime = monotonicNow();
long reportId = generateUniqueBlockReportId();
try {
if (totalBlockCount < dnConf.blockReportSplitThreshold) {
// Below split threshold, send all reports in a single message.
DatanodeCommand cmd = bpNamenode.blockReport(
bpRegistration, bpos.getBlockPoolId(), reports);
bpRegistration, bpos.getBlockPoolId(), reports,
new BlockReportContext(1, 0, reportId));
numRPCs = 1;
numReportsSent = reports.length;
if (cmd != null) {
@ -488,10 +502,11 @@ class BPServiceActor implements Runnable {
}
} else {
// Send one block report per message.
for (StorageBlockReport report : reports) {
StorageBlockReport singleReport[] = { report };
for (int r = 0; r < reports.length; r++) {
StorageBlockReport singleReport[] = { reports[r] };
DatanodeCommand cmd = bpNamenode.blockReport(
bpRegistration, bpos.getBlockPoolId(), singleReport);
bpRegistration, bpos.getBlockPoolId(), singleReport,
new BlockReportContext(reports.length, r, reportId));
numReportsSent++;
numRPCs++;
if (cmd != null) {
@ -507,11 +522,12 @@ class BPServiceActor implements Runnable {
dn.getMetrics().addBlockReport(brSendCost);
final int nCmds = cmds.size();
LOG.info((success ? "S" : "Uns") +
"uccessfully sent " + numReportsSent +
" of " + reports.length +
" blockreports for " + totalBlockCount +
" total blocks using " + numRPCs +
" RPCs. This took " + brCreateCost +
"uccessfully sent block report 0x" +
Long.toHexString(reportId) + ", containing " + reports.length +
" storage report(s), of which we sent " + numReportsSent + "." +
" The reports had " + totalBlockCount +
" total blocks and used " + numRPCs +
" RPC(s). This took " + brCreateCost +
" msec to generate and " + brSendCost +
" msecs for RPC and NN processing." +
" Got back " +

View File

@ -121,6 +121,7 @@ import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
@ -1292,7 +1293,8 @@ class NameNodeRpcServer implements NamenodeProtocols {
@Override // DatanodeProtocol
public DatanodeCommand blockReport(DatanodeRegistration nodeReg,
String poolId, StorageBlockReport[] reports) throws IOException {
String poolId, StorageBlockReport[] reports,
BlockReportContext context) throws IOException {
checkNNStartup();
verifyRequest(nodeReg);
if(blockStateChangeLog.isDebugEnabled()) {
@ -1301,14 +1303,15 @@ class NameNodeRpcServer implements NamenodeProtocols {
}
final BlockManager bm = namesystem.getBlockManager();
boolean noStaleStorages = false;
for(StorageBlockReport r : reports) {
final BlockListAsLongs blocks = r.getBlocks();
for (int r = 0; r < reports.length; r++) {
final BlockListAsLongs blocks = reports[r].getBlocks();
//
// BlockManager.processReport accumulates information of prior calls
// for the same node and storage, so the value returned by the last
// call of this loop is the final updated value for noStaleStorage.
//
noStaleStorages = bm.processReport(nodeReg, r.getStorage(), blocks);
noStaleStorages = bm.processReport(nodeReg, reports[r].getStorage(),
blocks, context, (r == reports.length - 1));
metrics.incrStorageBlockReportOps();
}

View File

@ -0,0 +1,52 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.protocol;
/**
* The context of the block report.
*
* This is a set of fields that the Datanode sends to provide context about a
* block report RPC. The context includes a unique 64-bit ID which
* identifies the block report as a whole. It also includes the total number
* of RPCs which this block report is split into, and the index into that
* total for the current RPC.
*/
public class BlockReportContext {
private final int totalRpcs;
private final int curRpc;
private final long reportId;
public BlockReportContext(int totalRpcs, int curRpc, long reportId) {
this.totalRpcs = totalRpcs;
this.curRpc = curRpc;
this.reportId = reportId;
}
public int getTotalRpcs() {
return totalRpcs;
}
public int getCurRpc() {
return curRpc;
}
public long getReportId() {
return reportId;
}
}

View File

@ -23,7 +23,6 @@ import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@ -128,20 +127,23 @@ public interface DatanodeProtocol {
* Each finalized block is represented as 3 longs. Each under-
* construction replica is represented as 4 longs.
* This is done instead of Block[] to reduce memory used by block reports.
*
* @param reports report of blocks per storage
* @param context Context information for this block report.
*
* @return - the next command for DN to process.
* @throws IOException
*/
@Idempotent
public DatanodeCommand blockReport(DatanodeRegistration registration,
String poolId, StorageBlockReport[] reports) throws IOException;
String poolId, StorageBlockReport[] reports,
BlockReportContext context) throws IOException;
/**
* Communicates the complete list of locally cached blocks to the NameNode.
*
* This method is similar to
* {@link #blockReport(DatanodeRegistration, String, StorageBlockReport[])},
* {@link #blockReport(DatanodeRegistration, String, StorageBlockReport[], BlockReportContext)},
* which is used to communicated blocks stored on disk.
*
* @param The datanode registration.

View File

@ -224,11 +224,25 @@ message HeartbeatResponseProto {
* second long represents length
* third long represents gen stamp
* fourth long (if under construction) represents replica state
* context - An optional field containing information about the context
* of this block report.
*/
message BlockReportRequestProto {
required DatanodeRegistrationProto registration = 1;
required string blockPoolId = 2;
repeated StorageBlockReportProto reports = 3;
optional BlockReportContextProto context = 4;
}
message BlockReportContextProto {
// The total number of RPCs this block report is broken into.
required int32 totalRpcs = 1;
// The index of the current RPC (zero-based)
required int32 curRpc = 2;
// The unique 64-bit ID of this block report
required int64 id = 3;
}
/**

View File

@ -47,6 +47,7 @@ import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
import org.apache.hadoop.hdfs.server.datanode.Replica;
import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten;
import org.apache.hadoop.hdfs.server.datanode.ReplicaWaitingToBeRecovered;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
@ -219,7 +220,8 @@ public class TestBlockListAsLongs {
// check DN sends new-style BR
request.set(null);
nsInfo.setCapabilities(Capability.STORAGE_BLOCK_REPORT_BUFFERS.getMask());
nn.blockReport(reg, "pool", sbr);
nn.blockReport(reg, "pool", sbr,
new BlockReportContext(1, 0, System.nanoTime()));
BlockReportRequestProto proto = request.get();
assertNotNull(proto);
assertTrue(proto.getReports(0).getBlocksList().isEmpty());
@ -228,7 +230,8 @@ public class TestBlockListAsLongs {
// back up to prior version and check DN sends old-style BR
request.set(null);
nsInfo.setCapabilities(Capability.UNKNOWN.getMask());
nn.blockReport(reg, "pool", sbr);
nn.blockReport(reg, "pool", sbr,
new BlockReportContext(1, 0, System.nanoTime()));
proto = request.get();
assertNotNull(proto);
assertFalse(proto.getReports(0).getBlocksList().isEmpty());

View File

@ -555,12 +555,12 @@ public class TestBlockManager {
reset(node);
bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
BlockListAsLongs.EMPTY);
BlockListAsLongs.EMPTY, null, false);
assertEquals(1, ds.getBlockReportCount());
// send block report again, should NOT be processed
reset(node);
bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
BlockListAsLongs.EMPTY);
BlockListAsLongs.EMPTY, null, false);
assertEquals(1, ds.getBlockReportCount());
// re-register as if node restarted, should update existing node
@ -571,7 +571,7 @@ public class TestBlockManager {
// send block report, should be processed after restart
reset(node);
bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
BlockListAsLongs.EMPTY);
BlockListAsLongs.EMPTY, null, false);
// Reinitialize as registration with empty storage list pruned
// node.storageMap.
ds = node.getStorageInfos()[0];
@ -600,7 +600,7 @@ public class TestBlockManager {
reset(node);
doReturn(1).when(node).numBlocks();
bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
BlockListAsLongs.EMPTY);
BlockListAsLongs.EMPTY, null, false);
assertEquals(1, ds.getBlockReportCount());
}

View File

@ -18,26 +18,40 @@
package org.apache.hadoop.hdfs.server.blockmanagement;
import com.google.common.base.Supplier;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.math3.stat.inference.TestUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Assert;
import org.junit.Test;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertEquals;
public class TestNameNodePrunesMissingStorages {
@ -110,7 +124,9 @@ public class TestNameNodePrunesMissingStorages {
}
/**
* Verify that the NameNode does not prune storages with blocks.
* Verify that the NameNode does not prune storages with blocks
* simply as a result of a heartbeat being sent missing that storage.
*
* @throws IOException
*/
@Test (timeout=300000)
@ -118,4 +134,119 @@ public class TestNameNodePrunesMissingStorages {
// Run the test with 1 storage, after the text still expect 1 storage.
runTest(GenericTestUtils.getMethodName(), true, 1, 1);
}
/**
* Regression test for HDFS-7960.<p/>
*
* Shutting down a datanode, removing a storage directory, and restarting
* the DataNode should not produce zombie storages.
*/
@Test(timeout=300000)
public void testRemovingStorageDoesNotProduceZombies() throws Exception {
Configuration conf = new HdfsConfiguration();
conf.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 1);
final int NUM_STORAGES_PER_DN = 2;
final MiniDFSCluster cluster = new MiniDFSCluster
.Builder(conf).numDataNodes(3)
.storagesPerDatanode(NUM_STORAGES_PER_DN)
.build();
try {
cluster.waitActive();
for (DataNode dn : cluster.getDataNodes()) {
assertEquals(NUM_STORAGES_PER_DN,
cluster.getNamesystem().getBlockManager().
getDatanodeManager().getDatanode(dn.getDatanodeId()).
getStorageInfos().length);
}
// Create a file which will end up on all 3 datanodes.
final Path TEST_PATH = new Path("/foo1");
DistributedFileSystem fs = cluster.getFileSystem();
DFSTestUtil.createFile(fs, TEST_PATH, 1024, (short) 3, 0xcafecafe);
for (DataNode dn : cluster.getDataNodes()) {
DataNodeTestUtils.triggerBlockReport(dn);
}
ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, new Path("/foo1"));
cluster.getNamesystem().writeLock();
final String storageIdToRemove;
String datanodeUuid;
// Find the first storage which this block is in.
try {
Iterator<DatanodeStorageInfo> storageInfoIter =
cluster.getNamesystem().getBlockManager().
getStorages(block.getLocalBlock()).iterator();
assertTrue(storageInfoIter.hasNext());
DatanodeStorageInfo info = storageInfoIter.next();
storageIdToRemove = info.getStorageID();
datanodeUuid = info.getDatanodeDescriptor().getDatanodeUuid();
} finally {
cluster.getNamesystem().writeUnlock();
}
// Find the DataNode which holds that first storage.
final DataNode datanodeToRemoveStorageFrom;
int datanodeToRemoveStorageFromIdx = 0;
while (true) {
if (datanodeToRemoveStorageFromIdx >= cluster.getDataNodes().size()) {
Assert.fail("failed to find datanode with uuid " + datanodeUuid);
datanodeToRemoveStorageFrom = null;
break;
}
DataNode dn = cluster.getDataNodes().
get(datanodeToRemoveStorageFromIdx);
if (dn.getDatanodeUuid().equals(datanodeUuid)) {
datanodeToRemoveStorageFrom = dn;
break;
}
datanodeToRemoveStorageFromIdx++;
}
// Find the volume within the datanode which holds that first storage.
List<? extends FsVolumeSpi> volumes =
datanodeToRemoveStorageFrom.getFSDataset().getVolumes();
assertEquals(NUM_STORAGES_PER_DN, volumes.size());
String volumeDirectoryToRemove = null;
for (FsVolumeSpi volume : volumes) {
if (volume.getStorageID().equals(storageIdToRemove)) {
volumeDirectoryToRemove = volume.getBasePath();
}
}
// Shut down the datanode and remove the volume.
// Replace the volume directory with a regular file, which will
// cause a volume failure. (If we merely removed the directory,
// it would be re-initialized with a new storage ID.)
assertNotNull(volumeDirectoryToRemove);
datanodeToRemoveStorageFrom.shutdown();
FileUtil.fullyDelete(new File(volumeDirectoryToRemove));
FileOutputStream fos = new FileOutputStream(volumeDirectoryToRemove);
try {
fos.write(1);
} finally {
fos.close();
}
cluster.restartDataNode(datanodeToRemoveStorageFromIdx);
// Wait for the NameNode to remove the storage.
LOG.info("waiting for the datanode to remove " + storageIdToRemove);
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
final DatanodeDescriptor dnDescriptor =
cluster.getNamesystem().getBlockManager().getDatanodeManager().
getDatanode(datanodeToRemoveStorageFrom.getDatanodeUuid());
assertNotNull(dnDescriptor);
DatanodeStorageInfo[] infos = dnDescriptor.getStorageInfos();
for (DatanodeStorageInfo info : infos) {
if (info.getStorageID().equals(storageIdToRemove)) {
LOG.info("Still found storage " + storageIdToRemove + " on " +
info + ".");
return false;
}
}
assertEquals(NUM_STORAGES_PER_DN - 1, infos.length);
return true;
}
}, 10, 30000);
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
}

View File

@ -53,6 +53,7 @@ import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
@ -613,7 +614,8 @@ public abstract class BlockReportTestBase {
.when(spy).blockReport(
Mockito.<DatanodeRegistration>anyObject(),
Mockito.anyString(),
Mockito.<StorageBlockReport[]>anyObject());
Mockito.<StorageBlockReport[]>anyObject(),
Mockito.<BlockReportContext>anyObject());
// Force a block report to be generated. The block report will have
// an RBW replica in it. Wait for the RPC to be sent, but block

View File

@ -43,6 +43,7 @@ import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@ -216,7 +217,8 @@ public class TestBPOfferService {
.when(mockNN2).blockReport(
Mockito.<DatanodeRegistration>anyObject(),
Mockito.eq(FAKE_BPID),
Mockito.<StorageBlockReport[]>anyObject());
Mockito.<StorageBlockReport[]>anyObject(),
Mockito.<BlockReportContext>anyObject());
bpos.start();
try {
@ -406,7 +408,8 @@ public class TestBPOfferService {
Mockito.verify(mockNN).blockReport(
Mockito.<DatanodeRegistration>anyObject(),
Mockito.eq(FAKE_BPID),
Mockito.<StorageBlockReport[]>anyObject());
Mockito.<StorageBlockReport[]>anyObject(),
Mockito.<BlockReportContext>anyObject());
return true;
} catch (Throwable t) {
LOG.info("waiting on block report: " + t.getMessage());
@ -431,7 +434,8 @@ public class TestBPOfferService {
Mockito.verify(mockNN).blockReport(
Mockito.<DatanodeRegistration>anyObject(),
Mockito.eq(FAKE_BPID),
Mockito.<StorageBlockReport[]>anyObject());
Mockito.<StorageBlockReport[]>anyObject(),
Mockito.<BlockReportContext>anyObject());
return true;
} catch (Throwable t) {
LOG.info("waiting on block report: " + t.getMessage());

View File

@ -28,6 +28,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.*;
import org.apache.hadoop.hdfs.protocol.*;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
@ -122,7 +123,8 @@ public class TestBlockHasMultipleReplicasOnSameDN {
}
// Should not assert!
cluster.getNameNodeRpc().blockReport(dnReg, bpid, reports);
cluster.getNameNodeRpc().blockReport(dnReg, bpid, reports,
new BlockReportContext(1, 0, System.nanoTime()));
// Get the block locations once again.
locatedBlocks = client.getLocatedBlocks(filename, 0, BLOCK_SIZE * NUM_BLOCKS);

View File

@ -63,6 +63,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
@ -185,7 +186,7 @@ public class TestDataNodeVolumeFailure {
new StorageBlockReport(dnStorage, blockList);
}
cluster.getNameNodeRpc().blockReport(dnR, bpid, reports);
cluster.getNameNodeRpc().blockReport(dnR, bpid, reports, null);
// verify number of blocks and files...
verify(filename, filesize);

View File

@ -42,6 +42,7 @@ import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
@ -136,7 +137,8 @@ public class TestDatanodeProtocolRetryPolicy {
Mockito.verify(mockNN).blockReport(
Mockito.eq(datanodeRegistration),
Mockito.eq(POOL_ID),
Mockito.<StorageBlockReport[]>anyObject());
Mockito.<StorageBlockReport[]>anyObject(),
Mockito.<BlockReportContext>anyObject());
return true;
} catch (Throwable t) {
LOG.info("waiting on block report: " + t.getMessage());

View File

@ -29,6 +29,7 @@ import org.apache.hadoop.hdfs.*;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY;
@ -133,7 +134,7 @@ public class TestDnRespectsBlockReportSplitThreshold {
Mockito.verify(nnSpy, times(cluster.getStoragesPerDatanode())).blockReport(
any(DatanodeRegistration.class),
anyString(),
captor.capture());
captor.capture(), Mockito.<BlockReportContext>anyObject());
verifyCapturedArguments(captor, 1, BLOCKS_IN_FILE);
}
@ -165,7 +166,7 @@ public class TestDnRespectsBlockReportSplitThreshold {
Mockito.verify(nnSpy, times(1)).blockReport(
any(DatanodeRegistration.class),
anyString(),
captor.capture());
captor.capture(), Mockito.<BlockReportContext>anyObject());
verifyCapturedArguments(captor, cluster.getStoragesPerDatanode(), BLOCKS_IN_FILE);
}
@ -197,7 +198,7 @@ public class TestDnRespectsBlockReportSplitThreshold {
Mockito.verify(nnSpy, times(cluster.getStoragesPerDatanode())).blockReport(
any(DatanodeRegistration.class),
anyString(),
captor.capture());
captor.capture(), Mockito.<BlockReportContext>anyObject());
verifyCapturedArguments(captor, 1, BLOCKS_IN_FILE);
}

View File

@ -20,8 +20,10 @@ package org.apache.hadoop.hdfs.server.datanode;
import java.io.IOException;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
import org.apache.hadoop.util.Time;
/**
@ -33,10 +35,13 @@ public class TestNNHandlesBlockReportPerStorage extends BlockReportTestBase {
@Override
protected void sendBlockReports(DatanodeRegistration dnR, String poolId,
StorageBlockReport[] reports) throws IOException {
int i = 0;
for (StorageBlockReport report : reports) {
LOG.info("Sending block report for storage " + report.getStorage().getStorageID());
StorageBlockReport[] singletonReport = { report };
cluster.getNameNodeRpc().blockReport(dnR, poolId, singletonReport);
cluster.getNameNodeRpc().blockReport(dnR, poolId, singletonReport,
new BlockReportContext(reports.length, i, System.nanoTime()));
i++;
}
}
}

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.datanode;
import java.io.IOException;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
@ -34,6 +35,7 @@ public class TestNNHandlesCombinedBlockReport extends BlockReportTestBase {
protected void sendBlockReports(DatanodeRegistration dnR, String poolId,
StorageBlockReport[] reports) throws IOException {
LOG.info("Sending combined block reports for " + dnR);
cluster.getNameNodeRpc().blockReport(dnR, poolId, reports);
cluster.getNameNodeRpc().blockReport(dnR, poolId, reports,
new BlockReportContext(1, 0, System.nanoTime()));
}
}

View File

@ -32,6 +32,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.client.BlockReportOptions;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus;
@ -76,7 +77,8 @@ public final class TestTriggerBlockReport {
Mockito.verify(spy, times(0)).blockReport(
any(DatanodeRegistration.class),
anyString(),
any(StorageBlockReport[].class));
any(StorageBlockReport[].class),
Mockito.<BlockReportContext>anyObject());
Mockito.verify(spy, times(1)).blockReceivedAndDeleted(
any(DatanodeRegistration.class),
anyString(),
@ -113,7 +115,8 @@ public final class TestTriggerBlockReport {
Mockito.verify(spy, timeout(60000)).blockReport(
any(DatanodeRegistration.class),
anyString(),
any(StorageBlockReport[].class));
any(StorageBlockReport[].class),
Mockito.<BlockReportContext>anyObject());
}
cluster.shutdown();

View File

@ -48,6 +48,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataStorage;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@ -939,7 +940,8 @@ public class NNThroughputBenchmark implements Tool {
new StorageBlockReport(storage, BlockListAsLongs.EMPTY)
};
nameNodeProto.blockReport(dnRegistration,
nameNode.getNamesystem().getBlockPoolId(), reports);
nameNode.getNamesystem().getBlockPoolId(), reports,
new BlockReportContext(1, 0, System.nanoTime()));
}
/**
@ -1184,8 +1186,9 @@ public class NNThroughputBenchmark implements Tool {
long start = Time.now();
StorageBlockReport[] report = { new StorageBlockReport(
dn.storage, dn.getBlockReportList()) };
nameNodeProto.blockReport(dn.dnRegistration, nameNode.getNamesystem()
.getBlockPoolId(), report);
nameNodeProto.blockReport(dn.dnRegistration,
nameNode.getNamesystem().getBlockPoolId(), report,
new BlockReportContext(1, 0, System.nanoTime()));
long end = Time.now();
return end-start;
}

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@ -107,7 +108,8 @@ public class TestDeadDatanode {
new DatanodeStorage(reg.getDatanodeUuid()),
BlockListAsLongs.EMPTY) };
try {
dnp.blockReport(reg, poolId, report);
dnp.blockReport(reg, poolId, report,
new BlockReportContext(1, 0, System.nanoTime()));
fail("Expected IOException is not thrown");
} catch (IOException ex) {
// Expected

View File

@ -53,6 +53,7 @@ import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
import org.apache.hadoop.io.IOUtils;
@ -547,7 +548,8 @@ public class TestDNFencing {
.when(spy).blockReport(
Mockito.<DatanodeRegistration>anyObject(),
Mockito.anyString(),
Mockito.<StorageBlockReport[]>anyObject());
Mockito.<StorageBlockReport[]>anyObject(),
Mockito.<BlockReportContext>anyObject());
dn.scheduleAllBlockReport(0);
delayer.waitForCall();