HDFS-7960. The full block report should prune zombie storages even if they're not empty. Contributed by Colin McCabe and Eddy Xu.
(cherry picked from commit 50ee8f4e67
)
This commit is contained in:
parent
87079cde7d
commit
af0af28afc
|
@ -916,6 +916,9 @@ Release 2.7.0 - UNRELEASED
|
||||||
provided by the client is larger than the one stored in the datanode.
|
provided by the client is larger than the one stored in the datanode.
|
||||||
(Brahma Reddy Battula via szetszwo)
|
(Brahma Reddy Battula via szetszwo)
|
||||||
|
|
||||||
|
HDFS-7960. The full block report should prune zombie storages even if
|
||||||
|
they're not empty. (cmccabe and Eddy Xu via wang)
|
||||||
|
|
||||||
BREAKDOWN OF HDFS-7584 SUBTASKS AND RELATED JIRAS
|
BREAKDOWN OF HDFS-7584 SUBTASKS AND RELATED JIRAS
|
||||||
|
|
||||||
HDFS-7720. Quota by Storage Type API, tools and ClientNameNode
|
HDFS-7720. Quota by Storage Type API, tools and ClientNameNode
|
||||||
|
|
|
@ -47,6 +47,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReportBadBlo
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageBlockReportProto;
|
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageBlockReportProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageReceivedDeletedBlocksProto;
|
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageReceivedDeletedBlocksProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.VersionRequestProto;
|
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.VersionRequestProto;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
||||||
|
@ -169,7 +170,8 @@ public class DatanodeProtocolClientSideTranslatorPB implements
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public DatanodeCommand blockReport(DatanodeRegistration registration,
|
public DatanodeCommand blockReport(DatanodeRegistration registration,
|
||||||
String poolId, StorageBlockReport[] reports) throws IOException {
|
String poolId, StorageBlockReport[] reports, BlockReportContext context)
|
||||||
|
throws IOException {
|
||||||
BlockReportRequestProto.Builder builder = BlockReportRequestProto
|
BlockReportRequestProto.Builder builder = BlockReportRequestProto
|
||||||
.newBuilder().setRegistration(PBHelper.convert(registration))
|
.newBuilder().setRegistration(PBHelper.convert(registration))
|
||||||
.setBlockPoolId(poolId);
|
.setBlockPoolId(poolId);
|
||||||
|
@ -191,6 +193,7 @@ public class DatanodeProtocolClientSideTranslatorPB implements
|
||||||
}
|
}
|
||||||
builder.addReports(reportBuilder.build());
|
builder.addReports(reportBuilder.build());
|
||||||
}
|
}
|
||||||
|
builder.setContext(PBHelper.convert(context));
|
||||||
BlockReportResponseProto resp;
|
BlockReportResponseProto resp;
|
||||||
try {
|
try {
|
||||||
resp = rpcProxy.blockReport(NULL_CONTROLLER, builder.build());
|
resp = rpcProxy.blockReport(NULL_CONTROLLER, builder.build());
|
||||||
|
|
|
@ -161,7 +161,9 @@ public class DatanodeProtocolServerSideTranslatorPB implements
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
cmd = impl.blockReport(PBHelper.convert(request.getRegistration()),
|
cmd = impl.blockReport(PBHelper.convert(request.getRegistration()),
|
||||||
request.getBlockPoolId(), report);
|
request.getBlockPoolId(), report,
|
||||||
|
request.hasContext() ?
|
||||||
|
PBHelper.convert(request.getContext()) : null);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw new ServiceException(e);
|
throw new ServiceException(e);
|
||||||
}
|
}
|
||||||
|
|
|
@ -111,6 +111,7 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.Rollin
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SafeModeActionProto;
|
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SafeModeActionProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmIdProto;
|
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmIdProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmSlotProto;
|
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmSlotProto;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BalancerBandwidthCommandProto;
|
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BalancerBandwidthCommandProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockCommandProto;
|
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockCommandProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockIdCommandProto;
|
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockIdCommandProto;
|
||||||
|
@ -123,6 +124,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.NNHAStatusHe
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReceivedDeletedBlockInfoProto;
|
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReceivedDeletedBlockInfoProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.RegisterCommandProto;
|
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.RegisterCommandProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.VolumeFailureSummaryProto;
|
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.VolumeFailureSummaryProto;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportContextProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
|
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockKeyProto;
|
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockKeyProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
|
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
|
||||||
|
@ -195,6 +197,7 @@ import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.BlockIdCommand;
|
import org.apache.hadoop.hdfs.server.protocol.BlockIdCommand;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
|
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
|
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
|
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
|
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.CheckpointCommand;
|
import org.apache.hadoop.hdfs.server.protocol.CheckpointCommand;
|
||||||
|
@ -3010,4 +3013,16 @@ public class PBHelper {
|
||||||
return targetPinnings;
|
return targetPinnings;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static BlockReportContext convert(BlockReportContextProto proto) {
|
||||||
|
return new BlockReportContext(proto.getTotalRpcs(),
|
||||||
|
proto.getCurRpc(), proto.getId());
|
||||||
|
}
|
||||||
|
|
||||||
|
public static BlockReportContextProto convert(BlockReportContext context) {
|
||||||
|
return BlockReportContextProto.newBuilder().
|
||||||
|
setTotalRpcs(context.getTotalRpcs()).
|
||||||
|
setCurRpc(context.getCurRpc()).
|
||||||
|
setId(context.getReportId()).
|
||||||
|
build();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -69,6 +69,7 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
|
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
|
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
|
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
|
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
|
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
|
||||||
|
@ -1773,7 +1774,8 @@ public class BlockManager {
|
||||||
*/
|
*/
|
||||||
public boolean processReport(final DatanodeID nodeID,
|
public boolean processReport(final DatanodeID nodeID,
|
||||||
final DatanodeStorage storage,
|
final DatanodeStorage storage,
|
||||||
final BlockListAsLongs newReport) throws IOException {
|
final BlockListAsLongs newReport, BlockReportContext context,
|
||||||
|
boolean lastStorageInRpc) throws IOException {
|
||||||
namesystem.writeLock();
|
namesystem.writeLock();
|
||||||
final long startTime = Time.monotonicNow(); //after acquiring write lock
|
final long startTime = Time.monotonicNow(); //after acquiring write lock
|
||||||
final long endTime;
|
final long endTime;
|
||||||
|
@ -1812,6 +1814,29 @@ public class BlockManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
storageInfo.receivedBlockReport();
|
storageInfo.receivedBlockReport();
|
||||||
|
if (context != null) {
|
||||||
|
storageInfo.setLastBlockReportId(context.getReportId());
|
||||||
|
if (lastStorageInRpc) {
|
||||||
|
int rpcsSeen = node.updateBlockReportContext(context);
|
||||||
|
if (rpcsSeen >= context.getTotalRpcs()) {
|
||||||
|
List<DatanodeStorageInfo> zombies = node.removeZombieStorages();
|
||||||
|
if (zombies.isEmpty()) {
|
||||||
|
LOG.debug("processReport 0x{}: no zombie storages found.",
|
||||||
|
Long.toHexString(context.getReportId()));
|
||||||
|
} else {
|
||||||
|
for (DatanodeStorageInfo zombie : zombies) {
|
||||||
|
removeZombieReplicas(context, zombie);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
node.clearBlockReportContext();
|
||||||
|
} else {
|
||||||
|
LOG.debug("processReport 0x{}: {} more RPCs remaining in this " +
|
||||||
|
"report.", Long.toHexString(context.getReportId()),
|
||||||
|
(context.getTotalRpcs() - rpcsSeen)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
} finally {
|
} finally {
|
||||||
endTime = Time.monotonicNow();
|
endTime = Time.monotonicNow();
|
||||||
namesystem.writeUnlock();
|
namesystem.writeUnlock();
|
||||||
|
@ -1836,6 +1861,32 @@ public class BlockManager {
|
||||||
return !node.hasStaleStorages();
|
return !node.hasStaleStorages();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void removeZombieReplicas(BlockReportContext context,
|
||||||
|
DatanodeStorageInfo zombie) {
|
||||||
|
LOG.warn("processReport 0x{}: removing zombie storage {}, which no " +
|
||||||
|
"longer exists on the DataNode.",
|
||||||
|
Long.toHexString(context.getReportId()), zombie.getStorageID());
|
||||||
|
assert(namesystem.hasWriteLock());
|
||||||
|
Iterator<BlockInfoContiguous> iter = zombie.getBlockIterator();
|
||||||
|
int prevBlocks = zombie.numBlocks();
|
||||||
|
while (iter.hasNext()) {
|
||||||
|
BlockInfoContiguous block = iter.next();
|
||||||
|
// We assume that a block can be on only one storage in a DataNode.
|
||||||
|
// That's why we pass in the DatanodeDescriptor rather than the
|
||||||
|
// DatanodeStorageInfo.
|
||||||
|
// TODO: remove this assumption in case we want to put a block on
|
||||||
|
// more than one storage on a datanode (and because it's a difficult
|
||||||
|
// assumption to really enforce)
|
||||||
|
removeStoredBlock(block, zombie.getDatanodeDescriptor());
|
||||||
|
invalidateBlocks.remove(zombie.getDatanodeDescriptor(), block);
|
||||||
|
}
|
||||||
|
assert(zombie.numBlocks() == 0);
|
||||||
|
LOG.warn("processReport 0x{}: removed {} replicas from storage {}, " +
|
||||||
|
"which no longer exists on the DataNode.",
|
||||||
|
Long.toHexString(context.getReportId()), prevBlocks,
|
||||||
|
zombie.getStorageID());
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Rescan the list of blocks which were previously postponed.
|
* Rescan the list of blocks which were previously postponed.
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
package org.apache.hadoop.hdfs.server.blockmanagement;
|
package org.apache.hadoop.hdfs.server.blockmanagement;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.BitSet;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
@ -31,6 +32,7 @@ import java.util.Set;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
|
import com.google.common.collect.ImmutableList;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
@ -40,6 +42,7 @@ import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
|
import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
|
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
|
||||||
|
@ -64,7 +67,25 @@ public class DatanodeDescriptor extends DatanodeInfo {
|
||||||
// Stores status of decommissioning.
|
// Stores status of decommissioning.
|
||||||
// If node is not decommissioning, do not use this object for anything.
|
// If node is not decommissioning, do not use this object for anything.
|
||||||
public final DecommissioningStatus decommissioningStatus = new DecommissioningStatus();
|
public final DecommissioningStatus decommissioningStatus = new DecommissioningStatus();
|
||||||
|
|
||||||
|
private long curBlockReportId = 0;
|
||||||
|
|
||||||
|
private BitSet curBlockReportRpcsSeen = null;
|
||||||
|
|
||||||
|
public int updateBlockReportContext(BlockReportContext context) {
|
||||||
|
if (curBlockReportId != context.getReportId()) {
|
||||||
|
curBlockReportId = context.getReportId();
|
||||||
|
curBlockReportRpcsSeen = new BitSet(context.getTotalRpcs());
|
||||||
|
}
|
||||||
|
curBlockReportRpcsSeen.set(context.getCurRpc());
|
||||||
|
return curBlockReportRpcsSeen.cardinality();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void clearBlockReportContext() {
|
||||||
|
curBlockReportId = 0;
|
||||||
|
curBlockReportRpcsSeen = null;
|
||||||
|
}
|
||||||
|
|
||||||
/** Block and targets pair */
|
/** Block and targets pair */
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
@InterfaceStability.Evolving
|
@InterfaceStability.Evolving
|
||||||
|
@ -284,6 +305,34 @@ public class DatanodeDescriptor extends DatanodeInfo {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static final private List<DatanodeStorageInfo> EMPTY_STORAGE_INFO_LIST =
|
||||||
|
ImmutableList.of();
|
||||||
|
|
||||||
|
List<DatanodeStorageInfo> removeZombieStorages() {
|
||||||
|
List<DatanodeStorageInfo> zombies = null;
|
||||||
|
synchronized (storageMap) {
|
||||||
|
Iterator<Map.Entry<String, DatanodeStorageInfo>> iter =
|
||||||
|
storageMap.entrySet().iterator();
|
||||||
|
while (iter.hasNext()) {
|
||||||
|
Map.Entry<String, DatanodeStorageInfo> entry = iter.next();
|
||||||
|
DatanodeStorageInfo storageInfo = entry.getValue();
|
||||||
|
if (storageInfo.getLastBlockReportId() != curBlockReportId) {
|
||||||
|
LOG.info(storageInfo.getStorageID() + " had lastBlockReportId 0x" +
|
||||||
|
Long.toHexString(storageInfo.getLastBlockReportId()) +
|
||||||
|
", but curBlockReportId = 0x" +
|
||||||
|
Long.toHexString(curBlockReportId));
|
||||||
|
iter.remove();
|
||||||
|
if (zombies == null) {
|
||||||
|
zombies = new LinkedList<DatanodeStorageInfo>();
|
||||||
|
}
|
||||||
|
zombies.add(storageInfo);
|
||||||
|
}
|
||||||
|
storageInfo.setLastBlockReportId(0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return zombies == null ? EMPTY_STORAGE_INFO_LIST : zombies;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Remove block from the list of blocks belonging to the data-node. Remove
|
* Remove block from the list of blocks belonging to the data-node. Remove
|
||||||
* data-node from the block.
|
* data-node from the block.
|
||||||
|
|
|
@ -115,6 +115,9 @@ public class DatanodeStorageInfo {
|
||||||
private volatile BlockInfoContiguous blockList = null;
|
private volatile BlockInfoContiguous blockList = null;
|
||||||
private int numBlocks = 0;
|
private int numBlocks = 0;
|
||||||
|
|
||||||
|
// The ID of the last full block report which updated this storage.
|
||||||
|
private long lastBlockReportId = 0;
|
||||||
|
|
||||||
/** The number of block reports received */
|
/** The number of block reports received */
|
||||||
private int blockReportCount = 0;
|
private int blockReportCount = 0;
|
||||||
|
|
||||||
|
@ -178,7 +181,15 @@ public class DatanodeStorageInfo {
|
||||||
this.remaining = remaining;
|
this.remaining = remaining;
|
||||||
this.blockPoolUsed = blockPoolUsed;
|
this.blockPoolUsed = blockPoolUsed;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
long getLastBlockReportId() {
|
||||||
|
return lastBlockReportId;
|
||||||
|
}
|
||||||
|
|
||||||
|
void setLastBlockReportId(long lastBlockReportId) {
|
||||||
|
this.lastBlockReportId = lastBlockReportId;
|
||||||
|
}
|
||||||
|
|
||||||
State getState() {
|
State getState() {
|
||||||
return this.state;
|
return this.state;
|
||||||
}
|
}
|
||||||
|
|
|
@ -44,6 +44,7 @@ import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
|
||||||
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
|
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
|
||||||
import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
|
import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
||||||
|
@ -434,6 +435,17 @@ class BPServiceActor implements Runnable {
|
||||||
return sendImmediateIBR;
|
return sendImmediateIBR;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private long prevBlockReportId = 0;
|
||||||
|
|
||||||
|
private long generateUniqueBlockReportId() {
|
||||||
|
long id = System.nanoTime();
|
||||||
|
if (id <= prevBlockReportId) {
|
||||||
|
id = prevBlockReportId + 1;
|
||||||
|
}
|
||||||
|
prevBlockReportId = id;
|
||||||
|
return id;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Report the list blocks to the Namenode
|
* Report the list blocks to the Namenode
|
||||||
* @return DatanodeCommands returned by the NN. May be null.
|
* @return DatanodeCommands returned by the NN. May be null.
|
||||||
|
@ -476,11 +488,13 @@ class BPServiceActor implements Runnable {
|
||||||
int numRPCs = 0;
|
int numRPCs = 0;
|
||||||
boolean success = false;
|
boolean success = false;
|
||||||
long brSendStartTime = monotonicNow();
|
long brSendStartTime = monotonicNow();
|
||||||
|
long reportId = generateUniqueBlockReportId();
|
||||||
try {
|
try {
|
||||||
if (totalBlockCount < dnConf.blockReportSplitThreshold) {
|
if (totalBlockCount < dnConf.blockReportSplitThreshold) {
|
||||||
// Below split threshold, send all reports in a single message.
|
// Below split threshold, send all reports in a single message.
|
||||||
DatanodeCommand cmd = bpNamenode.blockReport(
|
DatanodeCommand cmd = bpNamenode.blockReport(
|
||||||
bpRegistration, bpos.getBlockPoolId(), reports);
|
bpRegistration, bpos.getBlockPoolId(), reports,
|
||||||
|
new BlockReportContext(1, 0, reportId));
|
||||||
numRPCs = 1;
|
numRPCs = 1;
|
||||||
numReportsSent = reports.length;
|
numReportsSent = reports.length;
|
||||||
if (cmd != null) {
|
if (cmd != null) {
|
||||||
|
@ -488,10 +502,11 @@ class BPServiceActor implements Runnable {
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// Send one block report per message.
|
// Send one block report per message.
|
||||||
for (StorageBlockReport report : reports) {
|
for (int r = 0; r < reports.length; r++) {
|
||||||
StorageBlockReport singleReport[] = { report };
|
StorageBlockReport singleReport[] = { reports[r] };
|
||||||
DatanodeCommand cmd = bpNamenode.blockReport(
|
DatanodeCommand cmd = bpNamenode.blockReport(
|
||||||
bpRegistration, bpos.getBlockPoolId(), singleReport);
|
bpRegistration, bpos.getBlockPoolId(), singleReport,
|
||||||
|
new BlockReportContext(reports.length, r, reportId));
|
||||||
numReportsSent++;
|
numReportsSent++;
|
||||||
numRPCs++;
|
numRPCs++;
|
||||||
if (cmd != null) {
|
if (cmd != null) {
|
||||||
|
@ -507,11 +522,12 @@ class BPServiceActor implements Runnable {
|
||||||
dn.getMetrics().addBlockReport(brSendCost);
|
dn.getMetrics().addBlockReport(brSendCost);
|
||||||
final int nCmds = cmds.size();
|
final int nCmds = cmds.size();
|
||||||
LOG.info((success ? "S" : "Uns") +
|
LOG.info((success ? "S" : "Uns") +
|
||||||
"uccessfully sent " + numReportsSent +
|
"uccessfully sent block report 0x" +
|
||||||
" of " + reports.length +
|
Long.toHexString(reportId) + ", containing " + reports.length +
|
||||||
" blockreports for " + totalBlockCount +
|
" storage report(s), of which we sent " + numReportsSent + "." +
|
||||||
" total blocks using " + numRPCs +
|
" The reports had " + totalBlockCount +
|
||||||
" RPCs. This took " + brCreateCost +
|
" total blocks and used " + numRPCs +
|
||||||
|
" RPC(s). This took " + brCreateCost +
|
||||||
" msec to generate and " + brSendCost +
|
" msec to generate and " + brSendCost +
|
||||||
" msecs for RPC and NN processing." +
|
" msecs for RPC and NN processing." +
|
||||||
" Got back " +
|
" Got back " +
|
||||||
|
|
|
@ -121,6 +121,7 @@ import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
|
import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
|
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods;
|
import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
|
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
|
||||||
|
@ -1286,7 +1287,8 @@ class NameNodeRpcServer implements NamenodeProtocols {
|
||||||
|
|
||||||
@Override // DatanodeProtocol
|
@Override // DatanodeProtocol
|
||||||
public DatanodeCommand blockReport(DatanodeRegistration nodeReg,
|
public DatanodeCommand blockReport(DatanodeRegistration nodeReg,
|
||||||
String poolId, StorageBlockReport[] reports) throws IOException {
|
String poolId, StorageBlockReport[] reports,
|
||||||
|
BlockReportContext context) throws IOException {
|
||||||
checkNNStartup();
|
checkNNStartup();
|
||||||
verifyRequest(nodeReg);
|
verifyRequest(nodeReg);
|
||||||
if(blockStateChangeLog.isDebugEnabled()) {
|
if(blockStateChangeLog.isDebugEnabled()) {
|
||||||
|
@ -1295,14 +1297,15 @@ class NameNodeRpcServer implements NamenodeProtocols {
|
||||||
}
|
}
|
||||||
final BlockManager bm = namesystem.getBlockManager();
|
final BlockManager bm = namesystem.getBlockManager();
|
||||||
boolean noStaleStorages = false;
|
boolean noStaleStorages = false;
|
||||||
for(StorageBlockReport r : reports) {
|
for (int r = 0; r < reports.length; r++) {
|
||||||
final BlockListAsLongs blocks = r.getBlocks();
|
final BlockListAsLongs blocks = reports[r].getBlocks();
|
||||||
//
|
//
|
||||||
// BlockManager.processReport accumulates information of prior calls
|
// BlockManager.processReport accumulates information of prior calls
|
||||||
// for the same node and storage, so the value returned by the last
|
// for the same node and storage, so the value returned by the last
|
||||||
// call of this loop is the final updated value for noStaleStorage.
|
// call of this loop is the final updated value for noStaleStorage.
|
||||||
//
|
//
|
||||||
noStaleStorages = bm.processReport(nodeReg, r.getStorage(), blocks);
|
noStaleStorages = bm.processReport(nodeReg, reports[r].getStorage(),
|
||||||
|
blocks, context, (r == reports.length - 1));
|
||||||
metrics.incrStorageBlockReportOps();
|
metrics.incrStorageBlockReportOps();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,52 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hdfs.server.protocol;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The context of the block report.
|
||||||
|
*
|
||||||
|
* This is a set of fields that the Datanode sends to provide context about a
|
||||||
|
* block report RPC. The context includes a unique 64-bit ID which
|
||||||
|
* identifies the block report as a whole. It also includes the total number
|
||||||
|
* of RPCs which this block report is split into, and the index into that
|
||||||
|
* total for the current RPC.
|
||||||
|
*/
|
||||||
|
public class BlockReportContext {
|
||||||
|
private final int totalRpcs;
|
||||||
|
private final int curRpc;
|
||||||
|
private final long reportId;
|
||||||
|
|
||||||
|
public BlockReportContext(int totalRpcs, int curRpc, long reportId) {
|
||||||
|
this.totalRpcs = totalRpcs;
|
||||||
|
this.curRpc = curRpc;
|
||||||
|
this.reportId = reportId;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getTotalRpcs() {
|
||||||
|
return totalRpcs;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getCurRpc() {
|
||||||
|
return curRpc;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getReportId() {
|
||||||
|
return reportId;
|
||||||
|
}
|
||||||
|
}
|
|
@ -23,7 +23,6 @@ import java.util.List;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
|
@ -128,20 +127,23 @@ public interface DatanodeProtocol {
|
||||||
* Each finalized block is represented as 3 longs. Each under-
|
* Each finalized block is represented as 3 longs. Each under-
|
||||||
* construction replica is represented as 4 longs.
|
* construction replica is represented as 4 longs.
|
||||||
* This is done instead of Block[] to reduce memory used by block reports.
|
* This is done instead of Block[] to reduce memory used by block reports.
|
||||||
*
|
* @param reports report of blocks per storage
|
||||||
|
* @param context Context information for this block report.
|
||||||
|
*
|
||||||
* @return - the next command for DN to process.
|
* @return - the next command for DN to process.
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
@Idempotent
|
@Idempotent
|
||||||
public DatanodeCommand blockReport(DatanodeRegistration registration,
|
public DatanodeCommand blockReport(DatanodeRegistration registration,
|
||||||
String poolId, StorageBlockReport[] reports) throws IOException;
|
String poolId, StorageBlockReport[] reports,
|
||||||
|
BlockReportContext context) throws IOException;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Communicates the complete list of locally cached blocks to the NameNode.
|
* Communicates the complete list of locally cached blocks to the NameNode.
|
||||||
*
|
*
|
||||||
* This method is similar to
|
* This method is similar to
|
||||||
* {@link #blockReport(DatanodeRegistration, String, StorageBlockReport[])},
|
* {@link #blockReport(DatanodeRegistration, String, StorageBlockReport[], BlockReportContext)},
|
||||||
* which is used to communicated blocks stored on disk.
|
* which is used to communicated blocks stored on disk.
|
||||||
*
|
*
|
||||||
* @param The datanode registration.
|
* @param The datanode registration.
|
||||||
|
|
|
@ -227,11 +227,25 @@ message HeartbeatResponseProto {
|
||||||
* second long represents length
|
* second long represents length
|
||||||
* third long represents gen stamp
|
* third long represents gen stamp
|
||||||
* fourth long (if under construction) represents replica state
|
* fourth long (if under construction) represents replica state
|
||||||
|
* context - An optional field containing information about the context
|
||||||
|
* of this block report.
|
||||||
*/
|
*/
|
||||||
message BlockReportRequestProto {
|
message BlockReportRequestProto {
|
||||||
required DatanodeRegistrationProto registration = 1;
|
required DatanodeRegistrationProto registration = 1;
|
||||||
required string blockPoolId = 2;
|
required string blockPoolId = 2;
|
||||||
repeated StorageBlockReportProto reports = 3;
|
repeated StorageBlockReportProto reports = 3;
|
||||||
|
optional BlockReportContextProto context = 4;
|
||||||
|
}
|
||||||
|
|
||||||
|
message BlockReportContextProto {
|
||||||
|
// The total number of RPCs this block report is broken into.
|
||||||
|
required int32 totalRpcs = 1;
|
||||||
|
|
||||||
|
// The index of the current RPC (zero-based)
|
||||||
|
required int32 curRpc = 2;
|
||||||
|
|
||||||
|
// The unique 64-bit ID of this block report
|
||||||
|
required int64 id = 3;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -47,6 +47,7 @@ import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.Replica;
|
import org.apache.hadoop.hdfs.server.datanode.Replica;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten;
|
import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.ReplicaWaitingToBeRecovered;
|
import org.apache.hadoop.hdfs.server.datanode.ReplicaWaitingToBeRecovered;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
||||||
|
@ -219,7 +220,8 @@ public class TestBlockListAsLongs {
|
||||||
// check DN sends new-style BR
|
// check DN sends new-style BR
|
||||||
request.set(null);
|
request.set(null);
|
||||||
nsInfo.setCapabilities(Capability.STORAGE_BLOCK_REPORT_BUFFERS.getMask());
|
nsInfo.setCapabilities(Capability.STORAGE_BLOCK_REPORT_BUFFERS.getMask());
|
||||||
nn.blockReport(reg, "pool", sbr);
|
nn.blockReport(reg, "pool", sbr,
|
||||||
|
new BlockReportContext(1, 0, System.nanoTime()));
|
||||||
BlockReportRequestProto proto = request.get();
|
BlockReportRequestProto proto = request.get();
|
||||||
assertNotNull(proto);
|
assertNotNull(proto);
|
||||||
assertTrue(proto.getReports(0).getBlocksList().isEmpty());
|
assertTrue(proto.getReports(0).getBlocksList().isEmpty());
|
||||||
|
@ -228,7 +230,8 @@ public class TestBlockListAsLongs {
|
||||||
// back up to prior version and check DN sends old-style BR
|
// back up to prior version and check DN sends old-style BR
|
||||||
request.set(null);
|
request.set(null);
|
||||||
nsInfo.setCapabilities(Capability.UNKNOWN.getMask());
|
nsInfo.setCapabilities(Capability.UNKNOWN.getMask());
|
||||||
nn.blockReport(reg, "pool", sbr);
|
nn.blockReport(reg, "pool", sbr,
|
||||||
|
new BlockReportContext(1, 0, System.nanoTime()));
|
||||||
proto = request.get();
|
proto = request.get();
|
||||||
assertNotNull(proto);
|
assertNotNull(proto);
|
||||||
assertFalse(proto.getReports(0).getBlocksList().isEmpty());
|
assertFalse(proto.getReports(0).getBlocksList().isEmpty());
|
||||||
|
|
|
@ -555,12 +555,12 @@ public class TestBlockManager {
|
||||||
reset(node);
|
reset(node);
|
||||||
|
|
||||||
bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
|
bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
|
||||||
BlockListAsLongs.EMPTY);
|
BlockListAsLongs.EMPTY, null, false);
|
||||||
assertEquals(1, ds.getBlockReportCount());
|
assertEquals(1, ds.getBlockReportCount());
|
||||||
// send block report again, should NOT be processed
|
// send block report again, should NOT be processed
|
||||||
reset(node);
|
reset(node);
|
||||||
bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
|
bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
|
||||||
BlockListAsLongs.EMPTY);
|
BlockListAsLongs.EMPTY, null, false);
|
||||||
assertEquals(1, ds.getBlockReportCount());
|
assertEquals(1, ds.getBlockReportCount());
|
||||||
|
|
||||||
// re-register as if node restarted, should update existing node
|
// re-register as if node restarted, should update existing node
|
||||||
|
@ -571,7 +571,7 @@ public class TestBlockManager {
|
||||||
// send block report, should be processed after restart
|
// send block report, should be processed after restart
|
||||||
reset(node);
|
reset(node);
|
||||||
bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
|
bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
|
||||||
BlockListAsLongs.EMPTY);
|
BlockListAsLongs.EMPTY, null, false);
|
||||||
// Reinitialize as registration with empty storage list pruned
|
// Reinitialize as registration with empty storage list pruned
|
||||||
// node.storageMap.
|
// node.storageMap.
|
||||||
ds = node.getStorageInfos()[0];
|
ds = node.getStorageInfos()[0];
|
||||||
|
@ -600,7 +600,7 @@ public class TestBlockManager {
|
||||||
reset(node);
|
reset(node);
|
||||||
doReturn(1).when(node).numBlocks();
|
doReturn(1).when(node).numBlocks();
|
||||||
bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
|
bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
|
||||||
BlockListAsLongs.EMPTY);
|
BlockListAsLongs.EMPTY, null, false);
|
||||||
assertEquals(1, ds.getBlockReportCount());
|
assertEquals(1, ds.getBlockReportCount());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -18,26 +18,40 @@
|
||||||
|
|
||||||
package org.apache.hadoop.hdfs.server.blockmanagement;
|
package org.apache.hadoop.hdfs.server.blockmanagement;
|
||||||
|
|
||||||
|
import com.google.common.base.Supplier;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.commons.math3.stat.inference.TestUtils;
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileUtil;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||||
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.FileOutputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
import static org.hamcrest.core.Is.is;
|
import static org.hamcrest.core.Is.is;
|
||||||
import static org.junit.Assert.assertThat;
|
import static org.junit.Assert.assertThat;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
|
|
||||||
public class TestNameNodePrunesMissingStorages {
|
public class TestNameNodePrunesMissingStorages {
|
||||||
|
@ -110,7 +124,9 @@ public class TestNameNodePrunesMissingStorages {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Verify that the NameNode does not prune storages with blocks.
|
* Verify that the NameNode does not prune storages with blocks
|
||||||
|
* simply as a result of a heartbeat being sent missing that storage.
|
||||||
|
*
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
@Test (timeout=300000)
|
@Test (timeout=300000)
|
||||||
|
@ -118,4 +134,119 @@ public class TestNameNodePrunesMissingStorages {
|
||||||
// Run the test with 1 storage, after the text still expect 1 storage.
|
// Run the test with 1 storage, after the text still expect 1 storage.
|
||||||
runTest(GenericTestUtils.getMethodName(), true, 1, 1);
|
runTest(GenericTestUtils.getMethodName(), true, 1, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Regression test for HDFS-7960.<p/>
|
||||||
|
*
|
||||||
|
* Shutting down a datanode, removing a storage directory, and restarting
|
||||||
|
* the DataNode should not produce zombie storages.
|
||||||
|
*/
|
||||||
|
@Test(timeout=300000)
|
||||||
|
public void testRemovingStorageDoesNotProduceZombies() throws Exception {
|
||||||
|
Configuration conf = new HdfsConfiguration();
|
||||||
|
conf.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 1);
|
||||||
|
final int NUM_STORAGES_PER_DN = 2;
|
||||||
|
final MiniDFSCluster cluster = new MiniDFSCluster
|
||||||
|
.Builder(conf).numDataNodes(3)
|
||||||
|
.storagesPerDatanode(NUM_STORAGES_PER_DN)
|
||||||
|
.build();
|
||||||
|
try {
|
||||||
|
cluster.waitActive();
|
||||||
|
for (DataNode dn : cluster.getDataNodes()) {
|
||||||
|
assertEquals(NUM_STORAGES_PER_DN,
|
||||||
|
cluster.getNamesystem().getBlockManager().
|
||||||
|
getDatanodeManager().getDatanode(dn.getDatanodeId()).
|
||||||
|
getStorageInfos().length);
|
||||||
|
}
|
||||||
|
// Create a file which will end up on all 3 datanodes.
|
||||||
|
final Path TEST_PATH = new Path("/foo1");
|
||||||
|
DistributedFileSystem fs = cluster.getFileSystem();
|
||||||
|
DFSTestUtil.createFile(fs, TEST_PATH, 1024, (short) 3, 0xcafecafe);
|
||||||
|
for (DataNode dn : cluster.getDataNodes()) {
|
||||||
|
DataNodeTestUtils.triggerBlockReport(dn);
|
||||||
|
}
|
||||||
|
ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, new Path("/foo1"));
|
||||||
|
cluster.getNamesystem().writeLock();
|
||||||
|
final String storageIdToRemove;
|
||||||
|
String datanodeUuid;
|
||||||
|
// Find the first storage which this block is in.
|
||||||
|
try {
|
||||||
|
Iterator<DatanodeStorageInfo> storageInfoIter =
|
||||||
|
cluster.getNamesystem().getBlockManager().
|
||||||
|
getStorages(block.getLocalBlock()).iterator();
|
||||||
|
assertTrue(storageInfoIter.hasNext());
|
||||||
|
DatanodeStorageInfo info = storageInfoIter.next();
|
||||||
|
storageIdToRemove = info.getStorageID();
|
||||||
|
datanodeUuid = info.getDatanodeDescriptor().getDatanodeUuid();
|
||||||
|
} finally {
|
||||||
|
cluster.getNamesystem().writeUnlock();
|
||||||
|
}
|
||||||
|
// Find the DataNode which holds that first storage.
|
||||||
|
final DataNode datanodeToRemoveStorageFrom;
|
||||||
|
int datanodeToRemoveStorageFromIdx = 0;
|
||||||
|
while (true) {
|
||||||
|
if (datanodeToRemoveStorageFromIdx >= cluster.getDataNodes().size()) {
|
||||||
|
Assert.fail("failed to find datanode with uuid " + datanodeUuid);
|
||||||
|
datanodeToRemoveStorageFrom = null;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
DataNode dn = cluster.getDataNodes().
|
||||||
|
get(datanodeToRemoveStorageFromIdx);
|
||||||
|
if (dn.getDatanodeUuid().equals(datanodeUuid)) {
|
||||||
|
datanodeToRemoveStorageFrom = dn;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
datanodeToRemoveStorageFromIdx++;
|
||||||
|
}
|
||||||
|
// Find the volume within the datanode which holds that first storage.
|
||||||
|
List<? extends FsVolumeSpi> volumes =
|
||||||
|
datanodeToRemoveStorageFrom.getFSDataset().getVolumes();
|
||||||
|
assertEquals(NUM_STORAGES_PER_DN, volumes.size());
|
||||||
|
String volumeDirectoryToRemove = null;
|
||||||
|
for (FsVolumeSpi volume : volumes) {
|
||||||
|
if (volume.getStorageID().equals(storageIdToRemove)) {
|
||||||
|
volumeDirectoryToRemove = volume.getBasePath();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Shut down the datanode and remove the volume.
|
||||||
|
// Replace the volume directory with a regular file, which will
|
||||||
|
// cause a volume failure. (If we merely removed the directory,
|
||||||
|
// it would be re-initialized with a new storage ID.)
|
||||||
|
assertNotNull(volumeDirectoryToRemove);
|
||||||
|
datanodeToRemoveStorageFrom.shutdown();
|
||||||
|
FileUtil.fullyDelete(new File(volumeDirectoryToRemove));
|
||||||
|
FileOutputStream fos = new FileOutputStream(volumeDirectoryToRemove);
|
||||||
|
try {
|
||||||
|
fos.write(1);
|
||||||
|
} finally {
|
||||||
|
fos.close();
|
||||||
|
}
|
||||||
|
cluster.restartDataNode(datanodeToRemoveStorageFromIdx);
|
||||||
|
// Wait for the NameNode to remove the storage.
|
||||||
|
LOG.info("waiting for the datanode to remove " + storageIdToRemove);
|
||||||
|
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||||
|
@Override
|
||||||
|
public Boolean get() {
|
||||||
|
final DatanodeDescriptor dnDescriptor =
|
||||||
|
cluster.getNamesystem().getBlockManager().getDatanodeManager().
|
||||||
|
getDatanode(datanodeToRemoveStorageFrom.getDatanodeUuid());
|
||||||
|
assertNotNull(dnDescriptor);
|
||||||
|
DatanodeStorageInfo[] infos = dnDescriptor.getStorageInfos();
|
||||||
|
for (DatanodeStorageInfo info : infos) {
|
||||||
|
if (info.getStorageID().equals(storageIdToRemove)) {
|
||||||
|
LOG.info("Still found storage " + storageIdToRemove + " on " +
|
||||||
|
info + ".");
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assertEquals(NUM_STORAGES_PER_DN - 1, infos.length);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}, 10, 30000);
|
||||||
|
} finally {
|
||||||
|
if (cluster != null) {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -53,6 +53,7 @@ import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
|
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
|
||||||
|
@ -613,7 +614,8 @@ public abstract class BlockReportTestBase {
|
||||||
.when(spy).blockReport(
|
.when(spy).blockReport(
|
||||||
Mockito.<DatanodeRegistration>anyObject(),
|
Mockito.<DatanodeRegistration>anyObject(),
|
||||||
Mockito.anyString(),
|
Mockito.anyString(),
|
||||||
Mockito.<StorageBlockReport[]>anyObject());
|
Mockito.<StorageBlockReport[]>anyObject(),
|
||||||
|
Mockito.<BlockReportContext>anyObject());
|
||||||
|
|
||||||
// Force a block report to be generated. The block report will have
|
// Force a block report to be generated. The block report will have
|
||||||
// an RBW replica in it. Wait for the RPC to be sent, but block
|
// an RBW replica in it. Wait for the RPC to be sent, but block
|
||||||
|
|
|
@ -43,6 +43,7 @@ import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
|
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
|
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
||||||
|
@ -216,7 +217,8 @@ public class TestBPOfferService {
|
||||||
.when(mockNN2).blockReport(
|
.when(mockNN2).blockReport(
|
||||||
Mockito.<DatanodeRegistration>anyObject(),
|
Mockito.<DatanodeRegistration>anyObject(),
|
||||||
Mockito.eq(FAKE_BPID),
|
Mockito.eq(FAKE_BPID),
|
||||||
Mockito.<StorageBlockReport[]>anyObject());
|
Mockito.<StorageBlockReport[]>anyObject(),
|
||||||
|
Mockito.<BlockReportContext>anyObject());
|
||||||
|
|
||||||
bpos.start();
|
bpos.start();
|
||||||
try {
|
try {
|
||||||
|
@ -406,7 +408,8 @@ public class TestBPOfferService {
|
||||||
Mockito.verify(mockNN).blockReport(
|
Mockito.verify(mockNN).blockReport(
|
||||||
Mockito.<DatanodeRegistration>anyObject(),
|
Mockito.<DatanodeRegistration>anyObject(),
|
||||||
Mockito.eq(FAKE_BPID),
|
Mockito.eq(FAKE_BPID),
|
||||||
Mockito.<StorageBlockReport[]>anyObject());
|
Mockito.<StorageBlockReport[]>anyObject(),
|
||||||
|
Mockito.<BlockReportContext>anyObject());
|
||||||
return true;
|
return true;
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
LOG.info("waiting on block report: " + t.getMessage());
|
LOG.info("waiting on block report: " + t.getMessage());
|
||||||
|
@ -431,7 +434,8 @@ public class TestBPOfferService {
|
||||||
Mockito.verify(mockNN).blockReport(
|
Mockito.verify(mockNN).blockReport(
|
||||||
Mockito.<DatanodeRegistration>anyObject(),
|
Mockito.<DatanodeRegistration>anyObject(),
|
||||||
Mockito.eq(FAKE_BPID),
|
Mockito.eq(FAKE_BPID),
|
||||||
Mockito.<StorageBlockReport[]>anyObject());
|
Mockito.<StorageBlockReport[]>anyObject(),
|
||||||
|
Mockito.<BlockReportContext>anyObject());
|
||||||
return true;
|
return true;
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
LOG.info("waiting on block report: " + t.getMessage());
|
LOG.info("waiting on block report: " + t.getMessage());
|
||||||
|
|
|
@ -28,6 +28,7 @@ import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hdfs.*;
|
import org.apache.hadoop.hdfs.*;
|
||||||
import org.apache.hadoop.hdfs.protocol.*;
|
import org.apache.hadoop.hdfs.protocol.*;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
|
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
|
||||||
|
@ -122,7 +123,8 @@ public class TestBlockHasMultipleReplicasOnSameDN {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Should not assert!
|
// Should not assert!
|
||||||
cluster.getNameNodeRpc().blockReport(dnReg, bpid, reports);
|
cluster.getNameNodeRpc().blockReport(dnReg, bpid, reports,
|
||||||
|
new BlockReportContext(1, 0, System.nanoTime()));
|
||||||
|
|
||||||
// Get the block locations once again.
|
// Get the block locations once again.
|
||||||
locatedBlocks = client.getLocatedBlocks(filename, 0, BLOCK_SIZE * NUM_BLOCKS);
|
locatedBlocks = client.getLocatedBlocks(filename, 0, BLOCK_SIZE * NUM_BLOCKS);
|
||||||
|
|
|
@ -63,6 +63,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
||||||
|
@ -185,7 +186,7 @@ public class TestDataNodeVolumeFailure {
|
||||||
new StorageBlockReport(dnStorage, blockList);
|
new StorageBlockReport(dnStorage, blockList);
|
||||||
}
|
}
|
||||||
|
|
||||||
cluster.getNameNodeRpc().blockReport(dnR, bpid, reports);
|
cluster.getNameNodeRpc().blockReport(dnR, bpid, reports, null);
|
||||||
|
|
||||||
// verify number of blocks and files...
|
// verify number of blocks and files...
|
||||||
verify(filename, filesize);
|
verify(filename, filesize);
|
||||||
|
|
|
@ -42,6 +42,7 @@ import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
|
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
|
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
|
||||||
|
@ -136,7 +137,8 @@ public class TestDatanodeProtocolRetryPolicy {
|
||||||
Mockito.verify(mockNN).blockReport(
|
Mockito.verify(mockNN).blockReport(
|
||||||
Mockito.eq(datanodeRegistration),
|
Mockito.eq(datanodeRegistration),
|
||||||
Mockito.eq(POOL_ID),
|
Mockito.eq(POOL_ID),
|
||||||
Mockito.<StorageBlockReport[]>anyObject());
|
Mockito.<StorageBlockReport[]>anyObject(),
|
||||||
|
Mockito.<BlockReportContext>anyObject());
|
||||||
return true;
|
return true;
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
LOG.info("waiting on block report: " + t.getMessage());
|
LOG.info("waiting on block report: " + t.getMessage());
|
||||||
|
|
|
@ -29,6 +29,7 @@ import org.apache.hadoop.hdfs.*;
|
||||||
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
||||||
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
|
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
|
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY;
|
||||||
|
@ -133,7 +134,7 @@ public class TestDnRespectsBlockReportSplitThreshold {
|
||||||
Mockito.verify(nnSpy, times(cluster.getStoragesPerDatanode())).blockReport(
|
Mockito.verify(nnSpy, times(cluster.getStoragesPerDatanode())).blockReport(
|
||||||
any(DatanodeRegistration.class),
|
any(DatanodeRegistration.class),
|
||||||
anyString(),
|
anyString(),
|
||||||
captor.capture());
|
captor.capture(), Mockito.<BlockReportContext>anyObject());
|
||||||
|
|
||||||
verifyCapturedArguments(captor, 1, BLOCKS_IN_FILE);
|
verifyCapturedArguments(captor, 1, BLOCKS_IN_FILE);
|
||||||
}
|
}
|
||||||
|
@ -165,7 +166,7 @@ public class TestDnRespectsBlockReportSplitThreshold {
|
||||||
Mockito.verify(nnSpy, times(1)).blockReport(
|
Mockito.verify(nnSpy, times(1)).blockReport(
|
||||||
any(DatanodeRegistration.class),
|
any(DatanodeRegistration.class),
|
||||||
anyString(),
|
anyString(),
|
||||||
captor.capture());
|
captor.capture(), Mockito.<BlockReportContext>anyObject());
|
||||||
|
|
||||||
verifyCapturedArguments(captor, cluster.getStoragesPerDatanode(), BLOCKS_IN_FILE);
|
verifyCapturedArguments(captor, cluster.getStoragesPerDatanode(), BLOCKS_IN_FILE);
|
||||||
}
|
}
|
||||||
|
@ -197,7 +198,7 @@ public class TestDnRespectsBlockReportSplitThreshold {
|
||||||
Mockito.verify(nnSpy, times(cluster.getStoragesPerDatanode())).blockReport(
|
Mockito.verify(nnSpy, times(cluster.getStoragesPerDatanode())).blockReport(
|
||||||
any(DatanodeRegistration.class),
|
any(DatanodeRegistration.class),
|
||||||
anyString(),
|
anyString(),
|
||||||
captor.capture());
|
captor.capture(), Mockito.<BlockReportContext>anyObject());
|
||||||
|
|
||||||
verifyCapturedArguments(captor, 1, BLOCKS_IN_FILE);
|
verifyCapturedArguments(captor, 1, BLOCKS_IN_FILE);
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,8 +20,10 @@ package org.apache.hadoop.hdfs.server.datanode;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
|
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
|
||||||
|
import org.apache.hadoop.util.Time;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -33,10 +35,13 @@ public class TestNNHandlesBlockReportPerStorage extends BlockReportTestBase {
|
||||||
@Override
|
@Override
|
||||||
protected void sendBlockReports(DatanodeRegistration dnR, String poolId,
|
protected void sendBlockReports(DatanodeRegistration dnR, String poolId,
|
||||||
StorageBlockReport[] reports) throws IOException {
|
StorageBlockReport[] reports) throws IOException {
|
||||||
|
int i = 0;
|
||||||
for (StorageBlockReport report : reports) {
|
for (StorageBlockReport report : reports) {
|
||||||
LOG.info("Sending block report for storage " + report.getStorage().getStorageID());
|
LOG.info("Sending block report for storage " + report.getStorage().getStorageID());
|
||||||
StorageBlockReport[] singletonReport = { report };
|
StorageBlockReport[] singletonReport = { report };
|
||||||
cluster.getNameNodeRpc().blockReport(dnR, poolId, singletonReport);
|
cluster.getNameNodeRpc().blockReport(dnR, poolId, singletonReport,
|
||||||
|
new BlockReportContext(reports.length, i, System.nanoTime()));
|
||||||
|
i++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.datanode;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
|
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
|
||||||
|
|
||||||
|
@ -34,6 +35,7 @@ public class TestNNHandlesCombinedBlockReport extends BlockReportTestBase {
|
||||||
protected void sendBlockReports(DatanodeRegistration dnR, String poolId,
|
protected void sendBlockReports(DatanodeRegistration dnR, String poolId,
|
||||||
StorageBlockReport[] reports) throws IOException {
|
StorageBlockReport[] reports) throws IOException {
|
||||||
LOG.info("Sending combined block reports for " + dnR);
|
LOG.info("Sending combined block reports for " + dnR);
|
||||||
cluster.getNameNodeRpc().blockReport(dnR, poolId, reports);
|
cluster.getNameNodeRpc().blockReport(dnR, poolId, reports,
|
||||||
|
new BlockReportContext(1, 0, System.nanoTime()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -32,6 +32,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.hdfs.client.BlockReportOptions;
|
import org.apache.hadoop.hdfs.client.BlockReportOptions;
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
|
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
|
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus;
|
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus;
|
||||||
|
@ -76,7 +77,8 @@ public final class TestTriggerBlockReport {
|
||||||
Mockito.verify(spy, times(0)).blockReport(
|
Mockito.verify(spy, times(0)).blockReport(
|
||||||
any(DatanodeRegistration.class),
|
any(DatanodeRegistration.class),
|
||||||
anyString(),
|
anyString(),
|
||||||
any(StorageBlockReport[].class));
|
any(StorageBlockReport[].class),
|
||||||
|
Mockito.<BlockReportContext>anyObject());
|
||||||
Mockito.verify(spy, times(1)).blockReceivedAndDeleted(
|
Mockito.verify(spy, times(1)).blockReceivedAndDeleted(
|
||||||
any(DatanodeRegistration.class),
|
any(DatanodeRegistration.class),
|
||||||
anyString(),
|
anyString(),
|
||||||
|
@ -113,7 +115,8 @@ public final class TestTriggerBlockReport {
|
||||||
Mockito.verify(spy, timeout(60000)).blockReport(
|
Mockito.verify(spy, timeout(60000)).blockReport(
|
||||||
any(DatanodeRegistration.class),
|
any(DatanodeRegistration.class),
|
||||||
anyString(),
|
anyString(),
|
||||||
any(StorageBlockReport[].class));
|
any(StorageBlockReport[].class),
|
||||||
|
Mockito.<BlockReportContext>anyObject());
|
||||||
}
|
}
|
||||||
|
|
||||||
cluster.shutdown();
|
cluster.shutdown();
|
||||||
|
|
|
@ -48,6 +48,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataStorage;
|
import org.apache.hadoop.hdfs.server.datanode.DataStorage;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
|
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
||||||
|
@ -939,7 +940,8 @@ public class NNThroughputBenchmark implements Tool {
|
||||||
new StorageBlockReport(storage, BlockListAsLongs.EMPTY)
|
new StorageBlockReport(storage, BlockListAsLongs.EMPTY)
|
||||||
};
|
};
|
||||||
nameNodeProto.blockReport(dnRegistration,
|
nameNodeProto.blockReport(dnRegistration,
|
||||||
nameNode.getNamesystem().getBlockPoolId(), reports);
|
nameNode.getNamesystem().getBlockPoolId(), reports,
|
||||||
|
new BlockReportContext(1, 0, System.nanoTime()));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1184,8 +1186,9 @@ public class NNThroughputBenchmark implements Tool {
|
||||||
long start = Time.now();
|
long start = Time.now();
|
||||||
StorageBlockReport[] report = { new StorageBlockReport(
|
StorageBlockReport[] report = { new StorageBlockReport(
|
||||||
dn.storage, dn.getBlockReportList()) };
|
dn.storage, dn.getBlockReportList()) };
|
||||||
nameNodeProto.blockReport(dn.dnRegistration, nameNode.getNamesystem()
|
nameNodeProto.blockReport(dn.dnRegistration,
|
||||||
.getBlockPoolId(), report);
|
nameNode.getNamesystem().getBlockPoolId(), report,
|
||||||
|
new BlockReportContext(1, 0, System.nanoTime()));
|
||||||
long end = Time.now();
|
long end = Time.now();
|
||||||
return end-start;
|
return end-start;
|
||||||
}
|
}
|
||||||
|
|
|
@ -33,6 +33,7 @@ import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
||||||
|
@ -107,7 +108,8 @@ public class TestDeadDatanode {
|
||||||
new DatanodeStorage(reg.getDatanodeUuid()),
|
new DatanodeStorage(reg.getDatanodeUuid()),
|
||||||
BlockListAsLongs.EMPTY) };
|
BlockListAsLongs.EMPTY) };
|
||||||
try {
|
try {
|
||||||
dnp.blockReport(reg, poolId, report);
|
dnp.blockReport(reg, poolId, report,
|
||||||
|
new BlockReportContext(1, 0, System.nanoTime()));
|
||||||
fail("Expected IOException is not thrown");
|
fail("Expected IOException is not thrown");
|
||||||
} catch (IOException ex) {
|
} catch (IOException ex) {
|
||||||
// Expected
|
// Expected
|
||||||
|
|
|
@ -53,6 +53,7 @@ import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
|
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
|
@ -547,7 +548,8 @@ public class TestDNFencing {
|
||||||
.when(spy).blockReport(
|
.when(spy).blockReport(
|
||||||
Mockito.<DatanodeRegistration>anyObject(),
|
Mockito.<DatanodeRegistration>anyObject(),
|
||||||
Mockito.anyString(),
|
Mockito.anyString(),
|
||||||
Mockito.<StorageBlockReport[]>anyObject());
|
Mockito.<StorageBlockReport[]>anyObject(),
|
||||||
|
Mockito.<BlockReportContext>anyObject());
|
||||||
dn.scheduleAllBlockReport(0);
|
dn.scheduleAllBlockReport(0);
|
||||||
delayer.waitForCall();
|
delayer.waitForCall();
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue