HDFS-13671. Namenode deletes large dir slowly caused by FoldedTreeSet#removeAndGet (#3113)

This commit is contained in:
Hui Fei 2021-06-18 14:41:02 +08:00 committed by GitHub
parent 577b96ef42
commit e55d76e26c
No known key found for this signature in database
37 changed files with 620 additions and 2619 deletions

View File

@ -291,18 +291,6 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY = "dfs.namenode.replication.max-streams-hard-limit";
= "dfs.namenode.storageinfo.defragment.interval.ms";
public static final int
= "dfs.namenode.storageinfo.defragment.timeout.ms";
public static final int
= "dfs.namenode.storageinfo.defragment.ratio";
public static final double
= "dfs.namenode.blockreport.queue.size";

View File

@ -967,8 +967,8 @@ public static JournalInfoProto convert(JournalInfo j) {
public static BlockReportContext convert(BlockReportContextProto proto) {
return new BlockReportContext(proto.getTotalRpcs(), proto.getCurRpc(),
proto.getId(), proto.getLeaseId(), proto.getSorted());
return new BlockReportContext(proto.getTotalRpcs(),
proto.getCurRpc(), proto.getId(), proto.getLeaseId());
public static BlockReportContextProto convert(BlockReportContext context) {
@ -977,7 +977,6 @@ public static BlockReportContextProto convert(BlockReportContext context) {

View File

@ -19,8 +19,8 @@
import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.NoSuchElementException;
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
import org.apache.hadoop.classification.InterfaceAudience;
@ -57,9 +57,19 @@ public abstract class BlockInfo extends Block
/** For implementing {@link LightWeightGSet.LinkedElement} interface. */
private LightWeightGSet.LinkedElement nextLinkedElement;
// Storages this block is replicated on
protected DatanodeStorageInfo[] storages;
* This array contains triplets of references. For each i-th storage, the
* block belongs to triplets[3*i] is the reference to the
* {@link DatanodeStorageInfo} and triplets[3*i+1] and triplets[3*i+2] are
* references to the previous and the next blocks, respectively, in the list
* of blocks belonging to this storage.
* Using previous and next in Object triplets is done instead of a
* {@link LinkedList} list to efficiently use memory. With LinkedList the cost
* per replica is 42 bytes (LinkedList#Entry object per replica) versus 16
* bytes using the triplets.
protected Object[] triplets;
private BlockUnderConstructionFeature uc;
@ -69,14 +79,14 @@ public abstract class BlockInfo extends Block
* in the block group
public BlockInfo(short size) {
this.storages = new DatanodeStorageInfo[size];
this.triplets = new Object[3 * size];
this.replication = isStriped() ? 0 : size;
public BlockInfo(Block blk, short size) {
this.storages = new DatanodeStorageInfo[size];
this.triplets = new Object[3*size];
this.replication = isStriped() ? 0 : size;
@ -106,31 +116,7 @@ public boolean isDeleted() {
public Iterator<DatanodeStorageInfo> getStorageInfos() {
return new Iterator<DatanodeStorageInfo>() {
private int index = 0;
public boolean hasNext() {
while (index < storages.length && storages[index] == null) {
return index < storages.length;
public DatanodeStorageInfo next() {
if (!hasNext()) {
throw new NoSuchElementException();
return storages[index++];
public void remove() {
throw new UnsupportedOperationException("Sorry. can't remove.");
return new BlocksMap.StorageIterator(this);
public DatanodeDescriptor getDatanode(int index) {
@ -139,18 +125,73 @@ public DatanodeDescriptor getDatanode(int index) {
DatanodeStorageInfo getStorageInfo(int index) {
assert this.storages != null : "BlockInfo is not initialized";
return storages[index];
assert this.triplets != null : "BlockInfo is not initialized";
assert index >= 0 && index*3 < triplets.length : "Index is out of bound";
return (DatanodeStorageInfo)triplets[index*3];
BlockInfo getPrevious(int index) {
assert this.triplets != null : "BlockInfo is not initialized";
assert index >= 0 && index*3+1 < triplets.length : "Index is out of bound";
BlockInfo info = (BlockInfo)triplets[index*3+1];
assert info == null ||
info.getClass().getName().startsWith(BlockInfo.class.getName()) :
"BlockInfo is expected at " + index*3;
return info;
BlockInfo getNext(int index) {
assert this.triplets != null : "BlockInfo is not initialized";
assert index >= 0 && index*3+2 < triplets.length : "Index is out of bound";
BlockInfo info = (BlockInfo)triplets[index*3+2];
assert info == null || info.getClass().getName().startsWith(
BlockInfo.class.getName()) :
"BlockInfo is expected at " + index*3;
return info;
void setStorageInfo(int index, DatanodeStorageInfo storage) {
assert this.storages != null : "BlockInfo is not initialized";
this.storages[index] = storage;
assert this.triplets != null : "BlockInfo is not initialized";
assert index >= 0 && index*3 < triplets.length : "Index is out of bound";
triplets[index*3] = storage;
* Return the previous block on the block list for the datanode at
* position index. Set the previous block on the list to "to".
* @param index - the datanode index
* @param to - block to be set to previous on the list of blocks
* @return current previous block on the list of blocks
BlockInfo setPrevious(int index, BlockInfo to) {
assert this.triplets != null : "BlockInfo is not initialized";
assert index >= 0 && index*3+1 < triplets.length : "Index is out of bound";
BlockInfo info = (BlockInfo) triplets[index*3+1];
triplets[index*3+1] = to;
return info;
* Return the next block on the block list for the datanode at
* position index. Set the next block on the list to "to".
* @param index - the datanode index
* @param to - block to be set to next on the list of blocks
* @return current next block on the list of blocks
BlockInfo setNext(int index, BlockInfo to) {
assert this.triplets != null : "BlockInfo is not initialized";
assert index >= 0 && index*3+2 < triplets.length : "Index is out of bound";
BlockInfo info = (BlockInfo) triplets[index*3+2];
triplets[index*3+2] = to;
return info;
public int getCapacity() {
assert this.storages != null : "BlockInfo is not initialized";
return storages.length;
assert this.triplets != null : "BlockInfo is not initialized";
assert triplets.length % 3 == 0 : "Malformed BlockInfo";
return triplets.length / 3;
@ -227,6 +268,80 @@ int findStorageInfo(DatanodeStorageInfo storageInfo) {
return -1;
* Insert this block into the head of the list of blocks
* related to the specified DatanodeStorageInfo.
* If the head is null then form a new list.
* @return current block as the new head of the list.
BlockInfo listInsert(BlockInfo head, DatanodeStorageInfo storage) {
int dnIndex = this.findStorageInfo(storage);
assert dnIndex >= 0 : "Data node is not found: current";
assert getPrevious(dnIndex) == null && getNext(dnIndex) == null :
"Block is already in the list and cannot be inserted.";
this.setPrevious(dnIndex, null);
this.setNext(dnIndex, head);
if (head != null) {
head.setPrevious(head.findStorageInfo(storage), this);
return this;
* Remove this block from the list of blocks
* related to the specified DatanodeStorageInfo.
* If this block is the head of the list then return the next block as
* the new head.
* @return the new head of the list or null if the list becomes
* empy after deletion.
BlockInfo listRemove(BlockInfo head, DatanodeStorageInfo storage) {
if (head == null) {
return null;
int dnIndex = this.findStorageInfo(storage);
if (dnIndex < 0) { // this block is not on the data-node list
return head;
BlockInfo next = this.getNext(dnIndex);
BlockInfo prev = this.getPrevious(dnIndex);
this.setNext(dnIndex, null);
this.setPrevious(dnIndex, null);
if (prev != null) {
prev.setNext(prev.findStorageInfo(storage), next);
if (next != null) {
next.setPrevious(next.findStorageInfo(storage), prev);
if (this == head) { // removing the head
head = next;
return head;
* Remove this block from the list of blocks related to the specified
* DatanodeDescriptor. Insert it into the head of the list of blocks.
* @return the new head of the list.
public BlockInfo moveBlockToHead(BlockInfo head, DatanodeStorageInfo storage,
int curIndex, int headIndex) {
if (head == this) {
return this;
BlockInfo next = this.setNext(curIndex, head);
BlockInfo prev = this.setPrevious(curIndex, null);
head.setPrevious(headIndex, this);
prev.setNext(prev.findStorageInfo(storage), next);
if (next != null) {
next.setPrevious(next.findStorageInfo(storage), prev);
return this;
public int hashCode() {
// Super implementation is sufficient

View File

@ -38,20 +38,20 @@ public BlockInfoContiguous(Block blk, short size) {
* Ensure that there is enough space to include num more storages.
* @return first free storage index.
* Ensure that there is enough space to include num more triplets.
* @return first free triplet index.
private int ensureCapacity(int num) {
assert this.storages != null : "BlockInfo is not initialized";
assert this.triplets != null : "BlockInfo is not initialized";
int last = numNodes();
if (storages.length >= (last+num)) {
if (triplets.length >= (last+num)*3) {
return last;
/* Not enough space left. Create a new array. Should normally
* happen only when replication is manually increased by the user. */
DatanodeStorageInfo[] old = storages;
storages = new DatanodeStorageInfo[(last+num)];
System.arraycopy(old, 0, storages, 0, last);
Object[] old = triplets;
triplets = new Object[(last+num)*3];
System.arraycopy(old, 0, triplets, 0, last * 3);
return last;
@ -63,6 +63,8 @@ boolean addStorage(DatanodeStorageInfo storage, Block reportedBlock) {
// find the last null node
int lastNode = ensureCapacity(1);
setStorageInfo(lastNode, storage);
setNext(lastNode, null);
setPrevious(lastNode, null);
return true;
@ -72,12 +74,18 @@ boolean removeStorage(DatanodeStorageInfo storage) {
if (dnIndex < 0) { // the node is not found
return false;
assert getPrevious(dnIndex) == null && getNext(dnIndex) == null :
"Block is still in the list and must be removed first.";
// find the last not null node
int lastNode = numNodes()-1;
// replace current node entry by the lastNode one
// replace current node triplet by the lastNode one
setStorageInfo(dnIndex, getStorageInfo(lastNode));
// set the last entry to null
setNext(dnIndex, getNext(lastNode));
setPrevious(dnIndex, getPrevious(lastNode));
// set the last triplet to null
setStorageInfo(lastNode, null);
setNext(lastNode, null);
setPrevious(lastNode, null);
return true;
@ -96,7 +104,8 @@ boolean isProvided() {
public int numNodes() {
assert this.storages != null : "BlockInfo is not initialized";
assert this.triplets != null : "BlockInfo is not initialized";
assert triplets.length % 3 == 0 : "Malformed BlockInfo";
for (int idx = getCapacity()-1; idx >= 0; idx--) {
if (getDatanode(idx) != null) {

View File

@ -32,20 +32,21 @@
* Subclass of {@link BlockInfo}, presenting a block group in erasure coding.
* We still use a storage array to store DatanodeStorageInfo for each block in
* the block group. For a (m+k) block group, the first (m+k) storage units
* We still use triplets to store DatanodeStorageInfo for each block in the
* block group, as well as the previous/next block in the corresponding
* DatanodeStorageInfo. For a (m+k) block group, the first (m+k) triplet units
* are sorted and strictly mapped to the corresponding block.
* Normally each block belonging to group is stored in only one DataNode.
* However, it is possible that some block is over-replicated. Thus the storage
* However, it is possible that some block is over-replicated. Thus the triplet
* array's size can be larger than (m+k). Thus currently we use an extra byte
* array to record the block index for each entry.
* array to record the block index for each triplet.
public class BlockInfoStriped extends BlockInfo {
private final ErasureCodingPolicy ecPolicy;
* Always the same size with storage. Record the block index for each entry
* Always the same size with triplets. Record the block index for each triplet
* TODO: actually this is only necessary for over-replicated block. Thus can
* be further optimized to save memory usage.
@ -109,7 +110,7 @@ private int findSlot() {
return i;
// need to expand the storage size
// need to expand the triplet size
ensureCapacity(i + 1, true);
return i;
@ -141,6 +142,8 @@ boolean addStorage(DatanodeStorageInfo storage, Block reportedBlock) {
private void addStorage(DatanodeStorageInfo storage, int index,
int blockIndex) {
setStorageInfo(index, storage);
setNext(index, null);
setPrevious(index, null);
indices[index] = (byte) blockIndex;
@ -183,22 +186,26 @@ boolean removeStorage(DatanodeStorageInfo storage) {
if (dnIndex < 0) { // the node is not found
return false;
// set the entry to null
assert getPrevious(dnIndex) == null && getNext(dnIndex) == null :
"Block is still in the list and must be removed first.";
// set the triplet to null
setStorageInfo(dnIndex, null);
setNext(dnIndex, null);
setPrevious(dnIndex, null);
indices[dnIndex] = -1;
return true;
private void ensureCapacity(int totalSize, boolean keepOld) {
if (getCapacity() < totalSize) {
DatanodeStorageInfo[] old = storages;
Object[] old = triplets;
byte[] oldIndices = indices;
storages = new DatanodeStorageInfo[totalSize];
triplets = new Object[totalSize * 3];
indices = new byte[totalSize];
if (keepOld) {
System.arraycopy(old, 0, storages, 0, old.length);
System.arraycopy(old, 0, triplets, 0, old.length);
System.arraycopy(oldIndices, 0, indices, 0, oldIndices.length);
@ -225,7 +232,8 @@ public BlockType getBlockType() {
public int numNodes() {
assert this.storages != null : "BlockInfo is not initialized";
assert this.triplets != null : "BlockInfo is not initialized";
assert triplets.length % 3 == 0 : "Malformed BlockInfo";
int num = 0;
for (int idx = getCapacity()-1; idx >= 0; idx--) {
if (getStorageInfo(idx) != null) {
@ -304,7 +312,8 @@ public StorageAndBlockIndex next() {
throw new NoSuchElementException();
int i = index++;
return new StorageAndBlockIndex(storages[i], indices[i]);
return new StorageAndBlockIndex(
(DatanodeStorageInfo) triplets[i * 3], indices[i]);

View File

@ -35,6 +35,7 @@
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
@ -68,7 +69,6 @@
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
@ -109,7 +109,6 @@
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
import org.apache.hadoop.hdfs.util.FoldedTreeSet;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.server.namenode.CacheManager;
@ -124,7 +123,6 @@
import org.apache.hadoop.util.LightWeightGSet;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.VersionInfo;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
@ -311,11 +309,6 @@ public long getTotalECBlockGroups() {
private int replQueueResetToHeadThreshold;
private int replQueueCallsSinceReset = 0;
/** How often to check and the limit for the storageinfo efficiency. */
private final long storageInfoDefragmentInterval;
private final long storageInfoDefragmentTimeout;
private final double storageInfoDefragmentRatio;
* Mapping: Block {@literal ->} { BlockCollection, datanodes, self ref }
* Updated only in response to client-sent information.
@ -330,10 +323,6 @@ public long getTotalECBlockGroups() {
* {@link #redundancyThread} has run at least one full iteration.
private final AtomicLong lastRedundancyCycleTS = new AtomicLong(-1);
/** StorageInfoDefragmenter thread. */
private final Daemon storageInfoDefragmenterThread =
new Daemon(new StorageInfoDefragmenter());
/** Block report thread for handling async reports. */
private final BlockReportProcessingThread blockReportThread;
@ -547,19 +536,6 @@ public BlockManager(final Namesystem namesystem, boolean haEnabled,
this.storageInfoDefragmentInterval =
this.storageInfoDefragmentTimeout =
this.storageInfoDefragmentRatio =
this.encryptDataTransfer =
@ -748,8 +724,6 @@ public void activate(Configuration conf, long blockTotal) {
mxBeanName = MBeans.register("NameNode", "BlockStats", this);
@ -762,10 +736,8 @@ public void close() {
try {
} catch (InterruptedException ie) {
@ -1678,18 +1650,9 @@ public BlocksWithLocations getBlocksWithLocations(final DatanodeID datanode,
/** Remove the blocks associated to the given datanode. */
void removeBlocksAssociatedTo(final DatanodeDescriptor node) {
for (DatanodeStorageInfo storage : node.getStorageInfos()) {
final Iterator<BlockInfo> it = storage.getBlockIterator();
//add the BlockInfos to a new collection as the
//returned iterator is not modifiable.
Collection<BlockInfo> toRemove = new ArrayList<>();
final Iterator<BlockInfo> it = node.getBlockIterator();
while(it.hasNext()) {
for (BlockInfo b : toRemove) {
removeStoredBlock(b, node);
removeStoredBlock(it.next(), node);
// Remove all pending DN messages referencing this DN.
@ -1703,11 +1666,8 @@ void removeBlocksAssociatedTo(final DatanodeStorageInfo storageInfo) {
assert namesystem.hasWriteLock();
final Iterator<BlockInfo> it = storageInfo.getBlockIterator();
DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
Collection<BlockInfo> toRemove = new ArrayList<>();
while(it.hasNext()) {
for (BlockInfo block : toRemove) {
BlockInfo block = it.next();
removeStoredBlock(block, node);
final Block b = getBlockOnStorage(block, storageInfo);
if (b != null) {
@ -1871,7 +1831,7 @@ private void markBlockAsCorrupt(BlockToMarkCorrupt b,
// stale storage due to failover or any other reason.
corruptReplicas.removeFromCorruptReplicasMap(b.getStored(), node);
BlockInfoStriped blk = (BlockInfoStriped) getStoredBlock(b.getStored());
// the block is over-replicated so invalidate the replicas immediately
invalidateBlock(b, node, numberOfReplicas);
@ -2785,7 +2745,7 @@ public boolean processReport(final DatanodeID nodeID,
// Block reports for provided storage are not
// maintained by DN heartbeats
if (!StorageType.PROVIDED.equals(storageInfo.getStorageType())) {
invalidatedBlocks = processReport(storageInfo, newReport, context);
invalidatedBlocks = processReport(storageInfo, newReport);
@ -2882,8 +2842,7 @@ void rescanPostponedMisreplicatedBlocks() {
Collection<Block> processReport(
final DatanodeStorageInfo storageInfo,
final BlockListAsLongs report,
BlockReportContext context) throws IOException {
final BlockListAsLongs report) throws IOException {
// Normal case:
// Modify the (block-->datanode) map, according to the difference
// between the old and new block report.
@ -2893,37 +2852,9 @@ Collection<Block> processReport(
Collection<Block> toInvalidate = new ArrayList<>();
Collection<BlockToMarkCorrupt> toCorrupt = new ArrayList<>();
Collection<StatefulBlockInfo> toUC = new ArrayList<>();
boolean sorted = false;
String strBlockReportId = "";
if (context != null) {
sorted = context.isSorted();
strBlockReportId = Long.toHexString(context.getReportId());
Iterable<BlockReportReplica> sortedReport;
if (!sorted) {
blockLog.warn("BLOCK* processReport 0x{}: Report from the DataNode ({}) "
+ "is unsorted. This will cause overhead on the NameNode "
+ "which needs to sort the Full BR. Please update the "
+ "DataNode to the same version of Hadoop HDFS as the "
+ "NameNode ({}).",
Set<BlockReportReplica> set = new FoldedTreeSet<>();
for (BlockReportReplica iblk : report) {
set.add(new BlockReportReplica(iblk));
sortedReport = set;
} else {
sortedReport = report;
reportDiffSorted(storageInfo, sortedReport,
reportDiff(storageInfo, report,
toAdd, toRemove, toInvalidate, toCorrupt, toUC);
DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
// Process the blocks on each queue
for (StatefulBlockInfo b : toUC) {
@ -2939,8 +2870,8 @@ Collection<Block> processReport(
if (numBlocksLogged > maxNumBlocksToLog) {
blockLog.info("BLOCK* processReport 0x{}: logged info for {} of {} " +
"reported.", strBlockReportId, maxNumBlocksToLog, numBlocksLogged);
blockLog.info("BLOCK* processReport: logged info for {} of {} " +
"reported.", maxNumBlocksToLog, numBlocksLogged);
for (Block b : toInvalidate) {
addToInvalidates(b, node);
@ -3072,106 +3003,127 @@ void processFirstBlockReport(
private void reportDiffSorted(DatanodeStorageInfo storageInfo,
Iterable<BlockReportReplica> newReport,
private void reportDiff(DatanodeStorageInfo storageInfo,
BlockListAsLongs newReport,
Collection<BlockInfoToAdd> toAdd, // add to DatanodeDescriptor
Collection<BlockInfo> toRemove, // remove from DatanodeDescriptor
Collection<Block> toInvalidate, // should be removed from DN
Collection<BlockToMarkCorrupt> toCorrupt, // add to corrupt replicas list
Collection<StatefulBlockInfo> toUC) { // add to under-construction list
// The blocks must be sorted and the storagenodes blocks must be sorted
Iterator<BlockInfo> storageBlocksIterator = storageInfo.getBlockIterator();
// place a delimiter in the list which separates blocks
// that have been reported from those that have not
DatanodeDescriptor dn = storageInfo.getDatanodeDescriptor();
BlockInfo storageBlock = null;
Block delimiterBlock = new Block();
BlockInfo delimiter = new BlockInfoContiguous(delimiterBlock,
(short) 1);
AddBlockResult result = storageInfo.addBlock(delimiter, delimiterBlock);
assert result == AddBlockResult.ADDED
: "Delimiting block cannot be present in the node";
int headIndex = 0; //currently the delimiter is in the head of the list
int curIndex;
for (BlockReportReplica replica : newReport) {
long replicaID = replica.getBlockId();
if (BlockIdManager.isStripedBlockID(replicaID)
&& (!hasNonEcBlockUsingStripedID ||
!blocksMap.containsBlock(replica))) {
replicaID = BlockIdManager.convertToStripedID(replicaID);
if (newReport == null) {
newReport = BlockListAsLongs.EMPTY;
// scan the report and process newly reported blocks
for (BlockReportReplica iblk : newReport) {
ReplicaState iState = iblk.getState();
LOG.debug("Reported block {} on {} size {} replicaState = {}", iblk, dn,
iblk.getNumBytes(), iState);
BlockInfo storedBlock = processReportedBlock(storageInfo,
iblk, iState, toAdd, toInvalidate, toCorrupt, toUC);
ReplicaState reportedState = replica.getState();
LOG.debug("Reported block {} on {} size {} replicaState = {}",
replica, dn, replica.getNumBytes(), reportedState);
if (shouldPostponeBlocksFromFuture
&& isGenStampInFuture(replica)) {
queueReportedBlock(storageInfo, replica, reportedState,
// move block to the head of the list
if (storedBlock != null) {
curIndex = storedBlock.findStorageInfo(storageInfo);
if (curIndex >= 0) {
headIndex =
storageInfo.moveBlockToHead(storedBlock, curIndex, headIndex);
if (storageBlock == null && storageBlocksIterator.hasNext()) {
storageBlock = storageBlocksIterator.next();
do {
int cmp;
if (storageBlock == null ||
(cmp = Long.compare(replicaID, storageBlock.getBlockId())) < 0) {
// Check if block is available in NN but not yet on this storage
BlockInfo nnBlock = blocksMap.getStoredBlock(new Block(replicaID));
if (nnBlock != null) {
reportDiffSortedInner(storageInfo, replica, reportedState,
nnBlock, toAdd, toCorrupt, toUC);
} else {
// Replica not found anywhere so it should be invalidated
toInvalidate.add(new Block(replica));
} else if (cmp == 0) {
// Replica matched current storageblock
reportDiffSortedInner(storageInfo, replica, reportedState,
storageBlock, toAdd, toCorrupt, toUC);
storageBlock = null;
} else {
// replica has higher ID than storedBlock
// Remove all stored blocks with IDs lower than replica
do {
storageBlock = storageBlocksIterator.hasNext()
? storageBlocksIterator.next() : null;
} while (storageBlock != null &&
Long.compare(replicaID, storageBlock.getBlockId()) > 0);
} while (storageBlock != null);
// Iterate any remaining blocks that have not been reported and remove them
while (storageBlocksIterator.hasNext()) {
private void reportDiffSortedInner(
// collect blocks that have not been reported
// all of them are next to the delimiter
Iterator<BlockInfo> it =
storageInfo.new BlockIterator(delimiter.getNext(0));
while (it.hasNext()) {
* Process a block replica reported by the data-node.
* No side effects except adding to the passed-in Collections.
* <ol>
* <li>If the block is not known to the system (not in blocksMap) then the
* data-node should be notified to invalidate this block.</li>
* <li>If the reported replica is valid that is has the same generation stamp
* and length as recorded on the name-node, then the replica location should
* be added to the name-node.</li>
* <li>If the reported replica is not valid, then it is marked as corrupt,
* which triggers replication of the existing valid replicas.
* Corrupt replicas are removed from the system when the block
* is fully replicated.</li>
* <li>If the reported replica is for a block currently marked "under
* construction" in the NN, then it should be added to the
* BlockUnderConstructionFeature's list of replicas.</li>
* </ol>
* @param storageInfo DatanodeStorageInfo that sent the report.
* @param block reported block replica
* @param reportedState reported replica state
* @param toAdd add to DatanodeDescriptor
* @param toInvalidate missing blocks (not in the blocks map)
* should be removed from the data-node
* @param toCorrupt replicas with unexpected length or generation stamp;
* add to corrupt replicas
* @param toUC replicas of blocks currently under construction
* @return the up-to-date stored block, if it should be kept.
* Otherwise, null.
private BlockInfo processReportedBlock(
final DatanodeStorageInfo storageInfo,
final BlockReportReplica replica, final ReplicaState reportedState,
final BlockInfo storedBlock,
final Block block, final ReplicaState reportedState,
final Collection<BlockInfoToAdd> toAdd,
final Collection<Block> toInvalidate,
final Collection<BlockToMarkCorrupt> toCorrupt,
final Collection<StatefulBlockInfo> toUC) {
assert replica != null;
assert storedBlock != null;
DatanodeDescriptor dn = storageInfo.getDatanodeDescriptor();
LOG.debug("Reported block {} on {} size {} replicaState = {}", block, dn,
block.getNumBytes(), reportedState);
if (shouldPostponeBlocksFromFuture && isGenStampInFuture(block)) {
queueReportedBlock(storageInfo, block, reportedState,
return null;
// find block by blockId
BlockInfo storedBlock = getStoredBlock(block);
if(storedBlock == null) {
// If blocksMap does not contain reported block id,
// the replica should be removed from the data-node.
toInvalidate.add(new Block(block));
return null;
BlockUCState ucState = storedBlock.getBlockUCState();
// Block is on the NN
LOG.debug("In memory blockUCState = {}", ucState);
// Ignore replicas already scheduled to be removed from the DN
if (invalidateBlocks.contains(dn, replica)) {
if(invalidateBlocks.contains(dn, block)) {
return storedBlock;
BlockToMarkCorrupt c = checkReplicaCorrupt(replica, reportedState,
storedBlock, ucState, dn);
BlockToMarkCorrupt c = checkReplicaCorrupt(
block, reportedState, storedBlock, ucState, dn);
if (c != null) {
if (shouldPostponeBlocksFromFuture) {
// If the block is an out-of-date generation stamp or state,
@ -3181,21 +3133,28 @@ private void reportDiffSortedInner(
// comes from the IBR / FBR and hence what we should use to compare
// against the memory state.
// See HDFS-6289 and HDFS-15422 for more context.
queueReportedBlock(storageInfo, replica, reportedState,
queueReportedBlock(storageInfo, block, reportedState,
} else {
} else if (isBlockUnderConstruction(storedBlock, ucState, reportedState)) {
toUC.add(new StatefulBlockInfo(storedBlock, new Block(replica),
} else if (reportedState == ReplicaState.FINALIZED &&
(storedBlock.findStorageInfo(storageInfo) == -1 ||
corruptReplicas.isReplicaCorrupt(storedBlock, dn))) {
return storedBlock;
if (isBlockUnderConstruction(storedBlock, ucState, reportedState)) {
toUC.add(new StatefulBlockInfo(storedBlock,
new Block(block), reportedState));
return storedBlock;
// Add replica if appropriate. If the replica was previously corrupt
// but now okay, it might need to be updated.
toAdd.add(new BlockInfoToAdd(storedBlock, new Block(replica)));
if (reportedState == ReplicaState.FINALIZED
&& (storedBlock.findStorageInfo(storageInfo) == -1 ||
corruptReplicas.isReplicaCorrupt(storedBlock, dn))) {
toAdd.add(new BlockInfoToAdd(storedBlock, new Block(block)));
return storedBlock;
@ -3438,7 +3397,7 @@ private void addStoredBlockImmediate(BlockInfo storedBlock, Block reported,
// just add it
AddBlockResult result = storageInfo.addBlockInitial(storedBlock, reported);
AddBlockResult result = storageInfo.addBlock(storedBlock, reported);
// Now check for completion of blocks and safe block count
int numCurrentReplica = countLiveNodes(storedBlock);
@ -4203,6 +4162,12 @@ private boolean processAndHandleReportedBlock(
DatanodeStorageInfo storageInfo, Block block,
ReplicaState reportedState, DatanodeDescriptor delHintNode)
throws IOException {
// blockReceived reports a finalized block
Collection<BlockInfoToAdd> toAdd = new LinkedList<>();
Collection<Block> toInvalidate = new LinkedList<Block>();
Collection<BlockToMarkCorrupt> toCorrupt =
new LinkedList<BlockToMarkCorrupt>();
Collection<StatefulBlockInfo> toUC = new LinkedList<StatefulBlockInfo>();
final DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
@ -4216,58 +4181,33 @@ private boolean processAndHandleReportedBlock(
return false;
// find block by blockId
BlockInfo storedBlock = getStoredBlock(block);
if(storedBlock == null) {
// If blocksMap does not contain reported block id,
// the replica should be removed from the data-node.
processReportedBlock(storageInfo, block, reportedState, toAdd, toInvalidate,
toCorrupt, toUC);
// the block is only in one of the to-do lists
// if it is in none then data-node already has it
assert toUC.size() + toAdd.size() + toInvalidate.size() + toCorrupt
.size() <= 1 : "The block should be only in one of the lists.";
for (StatefulBlockInfo b : toUC) {
addStoredBlockUnderConstruction(b, storageInfo);
long numBlocksLogged = 0;
for (BlockInfoToAdd b : toAdd) {
addStoredBlock(b.stored, b.reported, storageInfo, delHintNode,
numBlocksLogged < maxNumBlocksToLog);
if (numBlocksLogged > maxNumBlocksToLog) {
blockLog.debug("BLOCK* addBlock: logged info for {} of {} reported.",
maxNumBlocksToLog, numBlocksLogged);
for (Block b : toInvalidate) {
blockLog.debug("BLOCK* addBlock: block {} on node {} size {} does not " +
"belong to any file", block, node, block.getNumBytes());
addToInvalidates(new Block(block), node);
return true;
"belong to any file", b, node, b.getNumBytes());
addToInvalidates(b, node);
BlockUCState ucState = storedBlock.getBlockUCState();
// Block is on the NN
LOG.debug("In memory blockUCState = {}", ucState);
// Ignore replicas already scheduled to be removed from the DN
if(invalidateBlocks.contains(node, block)) {
return true;
BlockToMarkCorrupt c = checkReplicaCorrupt(
block, reportedState, storedBlock, ucState, node);
if (c != null) {
if (shouldPostponeBlocksFromFuture) {
// If the block is an out-of-date generation stamp or state,
// but we're the standby, we shouldn't treat it as corrupt,
// but instead just queue it for later processing.
// Storing the reported block for later processing, as that is what
// comes from the IBR / FBR and hence what we should use to compare
// against the memory state.
// See HDFS-6289 and HDFS-15422 for more context.
queueReportedBlock(storageInfo, block, reportedState,
} else {
markBlockAsCorrupt(c, storageInfo, node);
return true;
if (isBlockUnderConstruction(storedBlock, ucState, reportedState)) {
new StatefulBlockInfo(storedBlock, new Block(block), reportedState),
return true;
// Add replica if appropriate. If the replica was previously corrupt
// but now okay, it might need to be updated.
if (reportedState == ReplicaState.FINALIZED
&& (storedBlock.findStorageInfo(storageInfo) == -1 ||
corruptReplicas.isReplicaCorrupt(storedBlock, node))) {
addStoredBlock(storedBlock, block, storageInfo, delHintNode, true);
for (BlockToMarkCorrupt b : toCorrupt) {
markBlockAsCorrupt(b, storageInfo, node);
return true;
@ -4967,91 +4907,6 @@ public void run() {
* Runnable that monitors the fragmentation of the StorageInfo TreeSet and
* compacts it when it falls under a certain threshold.
private class StorageInfoDefragmenter implements Runnable {
public void run() {
while (namesystem.isRunning()) {
try {
// Check storage efficiency only when active NN is out of safe mode.
if (isPopulatingReplQueues()) {
} catch (Throwable t) {
if (!namesystem.isRunning()) {
LOG.info("Stopping thread.");
if (!(t instanceof InterruptedException)) {
LOG.info("Received an exception while shutting down.", t);
} else if (!checkNSRunning && t instanceof InterruptedException) {
LOG.info("Stopping for testing.");
LOG.error("Thread received Runtime exception.", t);
terminate(1, t);
private void scanAndCompactStorages() throws InterruptedException {
ArrayList<String> datanodesAndStorages = new ArrayList<>();
for (DatanodeDescriptor node
: datanodeManager.getDatanodeListForReport(DatanodeReportType.ALL)) {
for (DatanodeStorageInfo storage : node.getStorageInfos()) {
try {
double ratio = storage.treeSetFillRatio();
if (ratio < storageInfoDefragmentRatio) {
LOG.debug("StorageInfo TreeSet fill ratio {} : {}{}",
storage.getStorageID(), ratio,
(ratio < storageInfoDefragmentRatio)
? " (queued for defragmentation)" : "");
} finally {
if (!datanodesAndStorages.isEmpty()) {
for (int i = 0; i < datanodesAndStorages.size(); i += 2) {
try {
final DatanodeDescriptor dn = datanodeManager.
if (dn == null) {
final DatanodeStorageInfo storage = dn.
getStorageInfo(datanodesAndStorages.get(i + 1));
if (storage != null) {
boolean aborted =
if (aborted) {
// Compaction timed out, reset iterator to continue with
// the same storage next iteration.
i -= 2;
LOG.info("StorageInfo TreeSet defragmented {} : {}{}",
storage.getStorageID(), storage.treeSetFillRatio(),
aborted ? " (aborted)" : "");
} finally {
// Wait between each iteration
* Compute block replication and block invalidation work that can be scheduled

View File

@ -17,7 +17,6 @@
package org.apache.hadoop.hdfs.server.blockmanagement;
import java.util.Collections;
import java.util.Iterator;
import java.util.concurrent.atomic.LongAdder;
@ -32,6 +31,37 @@
* the datanodes that store the block.
class BlocksMap {
public static class StorageIterator implements Iterator<DatanodeStorageInfo> {
private final BlockInfo blockInfo;
private int nextIdx = 0;
StorageIterator(BlockInfo blkInfo) {
this.blockInfo = blkInfo;
public boolean hasNext() {
if (blockInfo == null) {
return false;
while (nextIdx < blockInfo.getCapacity() &&
blockInfo.getDatanode(nextIdx) == null) {
// note that for striped blocks there may be null in the triplets
return nextIdx < blockInfo.getCapacity();
public DatanodeStorageInfo next() {
return blockInfo.getStorageInfo(nextIdx++);
public void remove() {
throw new UnsupportedOperationException("Sorry. can't remove.");
/** Constant {@link LightWeightGSet} capacity. */
private final int capacity;
@ -111,16 +141,6 @@ void removeBlock(BlockInfo block) {
* Check if BlocksMap contains the block.
* @param b Block to check
* @return true if block is in the map, otherwise false
boolean containsBlock(Block b) {
return blocks.contains(b);
/** Returns the block object if it exists in the map. */
BlockInfo getStoredBlock(Block b) {
return blocks.get(b);
@ -131,9 +151,7 @@ BlockInfo getStoredBlock(Block b) {
* returns {@link Iterable} of the storages the block belongs to.
Iterable<DatanodeStorageInfo> getStorages(Block b) {
BlockInfo block = blocks.get(b);
return block != null ? getStorages(block)
: Collections.<DatanodeStorageInfo>emptyList();
return getStorages(blocks.get(b));
@ -141,17 +159,13 @@ Iterable<DatanodeStorageInfo> getStorages(Block b) {
* returns {@link Iterable} of the storages the block belongs to.
Iterable<DatanodeStorageInfo> getStorages(final BlockInfo storedBlock) {
if (storedBlock == null) {
return Collections.emptyList();
} else {
return new Iterable<DatanodeStorageInfo>() {
public Iterator<DatanodeStorageInfo> iterator() {
return storedBlock.getStorageInfos();
return new StorageIterator(storedBlock);
/** counts number of containing nodes. Better than using iterator. */
int numNodes(Block b) {
@ -169,7 +183,7 @@ boolean removeNode(Block b, DatanodeDescriptor node) {
if (info == null)
return false;
// remove block from the data-node set and the node from the block info
// remove block from the data-node list and the node from the block info
boolean removed = removeBlock(node, info);
if (info.hasNoStorage() // no datanodes left
@ -181,7 +195,7 @@ boolean removeNode(Block b, DatanodeDescriptor node) {
* Remove block from the set of blocks belonging to the data-node. Remove
* Remove block from the list of blocks belonging to the data-node. Remove
* data-node from the block.
static boolean removeBlock(DatanodeDescriptor dn, BlockInfo b) {

View File

@ -18,7 +18,6 @@
package org.apache.hadoop.hdfs.server.blockmanagement;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
@ -28,7 +27,6 @@
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.util.FoldedTreeSet;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
@ -87,6 +85,32 @@ public void updateFromStorage(DatanodeStorage storage) {
storageType = storage.getStorageType();
* Iterates over the list of blocks belonging to the data-node.
class BlockIterator implements Iterator<BlockInfo> {
private BlockInfo current;
BlockIterator(BlockInfo head) {
this.current = head;
public boolean hasNext() {
return current != null;
public BlockInfo next() {
BlockInfo res = current;
current =
return res;
public void remove() {
throw new UnsupportedOperationException("Sorry. can't remove.");
private final DatanodeDescriptor dn;
private final String storageID;
private StorageType storageType;
@ -98,7 +122,8 @@ public void updateFromStorage(DatanodeStorage storage) {
private volatile long remaining;
private long blockPoolUsed;
private final FoldedTreeSet<BlockInfo> blocks = new FoldedTreeSet<>();
private volatile BlockInfo blockList = null;
private int numBlocks = 0;
/** The number of block reports received */
private int blockReportCount = 0;
@ -182,7 +207,7 @@ void setHeartbeatedSinceFailover(boolean value) {
boolean areBlocksOnFailedStorage() {
return getState() == State.FAILED && !blocks.isEmpty();
return getState() == State.FAILED && numBlocks != 0;
@ -213,36 +238,6 @@ long getRemaining() {
long getBlockPoolUsed() {
return blockPoolUsed;
* For use during startup. Expects block to be added in sorted order
* to enable fast insert in to the DatanodeStorageInfo
* @param b Block to add to DatanodeStorageInfo
* @param reportedBlock The reported replica
* @return Enum describing if block was added, replaced or already existed
public AddBlockResult addBlockInitial(BlockInfo b, Block reportedBlock) {
// First check whether the block belongs to a different storage
// on the same DN.
AddBlockResult result = AddBlockResult.ADDED;
DatanodeStorageInfo otherStorage =
if (otherStorage != null) {
if (otherStorage != this) {
// The block belongs to a different storage. Remove it first.
result = AddBlockResult.REPLACED;
} else {
// The block is already associated with this storage.
return AddBlockResult.ALREADY_EXIST;
b.addStorage(this, reportedBlock);
return result;
public AddBlockResult addBlock(BlockInfo b, Block reportedBlock) {
// First check whether the block belongs to a different storage
@ -262,8 +257,9 @@ public AddBlockResult addBlock(BlockInfo b, Block reportedBlock) {
// add to the head of the data-node list
b.addStorage(this, reportedBlock);
return result;
@ -271,21 +267,45 @@ AddBlockResult addBlock(BlockInfo b) {
return addBlock(b, b);
public void insertToList(BlockInfo b) {
blockList = b.listInsert(blockList, this);
boolean removeBlock(BlockInfo b) {
return b.removeStorage(this);
blockList = b.listRemove(blockList, this);
if (b.removeStorage(this)) {
return true;
} else {
return false;
int numBlocks() {
return blocks.size();
return numBlocks;
Iterator<BlockInfo> getBlockIterator() {
return new BlockIterator(blockList);
* @return iterator to an unmodifiable set of blocks
* related to this {@link DatanodeStorageInfo}
* Move block to the head of the list of blocks belonging to the data-node.
* @return the index of the head of the blockList
Iterator<BlockInfo> getBlockIterator() {
return Collections.unmodifiableSet(blocks).iterator();
int moveBlockToHead(BlockInfo b, int curIndex, int headIndex) {
blockList = b.moveBlockToHead(blockList, this, curIndex, headIndex);
return curIndex;
* Used for testing only.
* @return the head of the blockList
BlockInfo getBlockListHeadForTesting(){
return blockList;
void updateState(StorageReport r) {
@ -344,27 +364,6 @@ StorageReport toStorageReport() {
false, capacity, dfsUsed, remaining, blockPoolUsed, nonDfsUsed);
* The fill ratio of the underlying TreeSet holding blocks.
* @return the fill ratio of the tree
public double treeSetFillRatio() {
return blocks.fillRatio();
* Compact the underlying TreeSet holding blocks.
* @param timeout Maximum time to spend compacting the tree set in
* milliseconds.
* @return true if compaction completed, false if aborted
public boolean treeSetCompact(long timeout) {
return blocks.compact(timeout);
static Iterable<StorageType> toStorageTypes(
final Iterable<DatanodeStorageInfo> infos) {
return new Iterable<StorageType>() {

View File

@ -402,7 +402,7 @@ List<DatanodeCommand> blockReport(long fullBrLeaseId) throws IOException {
// Below split threshold, send all reports in a single message.
DatanodeCommand cmd = bpNamenode.blockReport(
bpRegistration, bpos.getBlockPoolId(), reports,
new BlockReportContext(1, 0, reportId, fullBrLeaseId, true));
new BlockReportContext(1, 0, reportId, fullBrLeaseId));
calculateBlockReportPBSize(useBlocksBuffer, reports));
numRPCs = 1;
@ -417,7 +417,7 @@ List<DatanodeCommand> blockReport(long fullBrLeaseId) throws IOException {
DatanodeCommand cmd = bpNamenode.blockReport(
bpRegistration, bpos.getBlockPoolId(), singleReport,
new BlockReportContext(reports.length, r, reportId,
fullBrLeaseId, true));
calculateBlockReportPBSize(useBlocksBuffer, singleReport));

View File

@ -479,8 +479,8 @@ private void scan() {
Collection<ScanInfo> diffRecord = new ArrayList<>();
statsRecord.totalBlocks = blockpoolReport.size();
final List<ReplicaInfo> bl;
bl = dataset.getSortedFinalizedBlocks(bpid);
final List<ReplicaInfo> bl = dataset.getFinalizedBlocks(bpid);
Collections.sort(bl); // Sort based on blockId
int d = 0; // index for blockpoolReport
int m = 0; // index for memReprot

View File

@ -237,17 +237,16 @@ StorageReport[] getStorageReports(String bpid)
VolumeFailureSummary getVolumeFailureSummary();
* Gets a sorted list of references to the finalized blocks for the given
* block pool. The list is sorted by blockID.
* Gets a list of references to the finalized blocks for the given block pool.
* <p>
* Callers of this function should call
* {@link FsDatasetSpi#acquireDatasetLock} to avoid blocks' status being
* changed during list iteration.
* </p>
* @return a list of references to the finalized blocks for the given block
* pool. The list is sorted by blockID.
* pool.
List<ReplicaInfo> getSortedFinalizedBlocks(String bpid);
List<ReplicaInfo> getFinalizedBlocks(String bpid);
* Check whether the in-memory block record matches the block on the disk,

View File

@ -1954,18 +1954,17 @@ public Map<DatanodeStorage, BlockListAsLongs> getBlockReports(String bpid) {
* Gets a list of references to the finalized blocks for the given block pool,
* sorted by blockID.
* Gets a list of references to the finalized blocks for the given block pool.
* <p>
* Callers of this function should call
* {@link FsDatasetSpi#acquireDatasetLock()} to avoid blocks' status being
* changed during list iteration.
* </p>
* @return a list of references to the finalized blocks for the given block
* pool. The list is sorted by blockID.
* pool.
public List<ReplicaInfo> getSortedFinalizedBlocks(String bpid) {
public List<ReplicaInfo> getFinalizedBlocks(String bpid) {
try (AutoCloseableLock lock = datasetReadLock.acquire()) {
final List<ReplicaInfo> finalized = new ArrayList<ReplicaInfo>(

View File

@ -18,7 +18,6 @@
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReadWriteLock;
@ -26,7 +25,7 @@
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
import org.apache.hadoop.hdfs.util.FoldedTreeSet;
import org.apache.hadoop.util.LightWeightResizableGSet;
import org.apache.hadoop.util.AutoCloseableLock;
@ -37,20 +36,9 @@ class ReplicaMap {
private final AutoCloseableLock readLock;
private final AutoCloseableLock writeLock;
// Map of block pool Id to a set of ReplicaInfo.
private final Map<String, FoldedTreeSet<ReplicaInfo>> map = new HashMap<>();
// Special comparator used to compare Long to Block ID in the TreeSet.
private static final Comparator<Object> LONG_AND_BLOCK_COMPARATOR
= new Comparator<Object>() {
public int compare(Object o1, Object o2) {
long lookup = (long) o1;
long stored = ((Block) o2).getBlockId();
return lookup > stored ? 1 : lookup < stored ? -1 : 0;
// Map of block pool Id to another map of block Id to ReplicaInfo.
private final Map<String, LightWeightResizableGSet<Block, ReplicaInfo>> map =
new HashMap<>();
ReplicaMap(AutoCloseableLock readLock, AutoCloseableLock writeLock) {
if (readLock == null || writeLock == null) {
@ -113,11 +101,8 @@ ReplicaInfo get(String bpid, Block block) {
ReplicaInfo get(String bpid, long blockId) {
try (AutoCloseableLock l = readLock.acquire()) {
FoldedTreeSet<ReplicaInfo> set = map.get(bpid);
if (set == null) {
return null;
return set.get(blockId, LONG_AND_BLOCK_COMPARATOR);
LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
return m != null ? m.get(new Block(blockId)) : null;
@ -133,13 +118,13 @@ ReplicaInfo add(String bpid, ReplicaInfo replicaInfo) {
try (AutoCloseableLock l = writeLock.acquire()) {
FoldedTreeSet<ReplicaInfo> set = map.get(bpid);
if (set == null) {
LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
if (m == null) {
// Add an entry for block pool if it does not exist already
set = new FoldedTreeSet<>();
map.put(bpid, set);
m = new LightWeightResizableGSet<Block, ReplicaInfo>();
map.put(bpid, m);
return set.addOrReplace(replicaInfo);
return m.put(replicaInfo);
@ -151,18 +136,17 @@ ReplicaInfo addAndGet(String bpid, ReplicaInfo replicaInfo) {
try (AutoCloseableLock l = writeLock.acquire()) {
FoldedTreeSet<ReplicaInfo> set = map.get(bpid);
if (set == null) {
LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
if (m == null) {
// Add an entry for block pool if it does not exist already
set = new FoldedTreeSet<>();
map.put(bpid, set);
m = new LightWeightResizableGSet<Block, ReplicaInfo>();
map.put(bpid, m);
ReplicaInfo oldReplicaInfo = set.get(replicaInfo.getBlockId(),
ReplicaInfo oldReplicaInfo = m.get(replicaInfo);
if (oldReplicaInfo != null) {
return oldReplicaInfo;
} else {
return replicaInfo;
@ -201,13 +185,12 @@ ReplicaInfo remove(String bpid, Block block) {
try (AutoCloseableLock l = writeLock.acquire()) {
FoldedTreeSet<ReplicaInfo> set = map.get(bpid);
if (set != null) {
ReplicaInfo replicaInfo =
set.get(block.getBlockId(), LONG_AND_BLOCK_COMPARATOR);
LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
if (m != null) {
ReplicaInfo replicaInfo = m.get(block);
if (replicaInfo != null &&
block.getGenerationStamp() == replicaInfo.getGenerationStamp()) {
return set.removeAndGet(replicaInfo);
return m.remove(block);
@ -224,9 +207,9 @@ ReplicaInfo remove(String bpid, Block block) {
ReplicaInfo remove(String bpid, long blockId) {
try (AutoCloseableLock l = writeLock.acquire()) {
FoldedTreeSet<ReplicaInfo> set = map.get(bpid);
if (set != null) {
return set.removeAndGet(blockId, LONG_AND_BLOCK_COMPARATOR);
LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
if (m != null) {
return m.remove(new Block(blockId));
return null;
@ -239,8 +222,8 @@ ReplicaInfo remove(String bpid, long blockId) {
int size(String bpid) {
try (AutoCloseableLock l = readLock.acquire()) {
FoldedTreeSet<ReplicaInfo> set = map.get(bpid);
return set != null ? set.size() : 0;
LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
return m != null ? m.size() : 0;
@ -255,17 +238,19 @@ int size(String bpid) {
* @return a collection of the replicas belonging to the block pool
Collection<ReplicaInfo> replicas(String bpid) {
return map.get(bpid);
LightWeightResizableGSet<Block, ReplicaInfo> m = null;
m = map.get(bpid);
return m != null ? m.values() : null;
void initBlockPool(String bpid) {
try (AutoCloseableLock l = writeLock.acquire()) {
FoldedTreeSet<ReplicaInfo> set = map.get(bpid);
if (set == null) {
LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
if (m == null) {
// Add an entry for block pool if it does not exist already
set = new FoldedTreeSet<>();
map.put(bpid, set);
m = new LightWeightResizableGSet<Block, ReplicaInfo>();
map.put(bpid, m);

View File

@ -1943,7 +1943,12 @@ public BatchedListEntries<OpenFileEntry> getFilesBlockingDecom(long prevId,
LightWeightHashSet<Long> openFileIds = new LightWeightHashSet<>();
for (DatanodeDescriptor dataNode :
blockManager.getDatanodeManager().getDatanodes()) {
for (long ucFileId : dataNode.getLeavingServiceStatus().getOpenFiles()) {
// Sort open files
LightWeightHashSet<Long> dnOpenFiles =
Long[] dnOpenFileIds = new Long[dnOpenFiles.size()];
for (Long ucFileId : dnOpenFileIds) {
INode ucFile = getFSDirectory().getInode(ucFileId);
if (ucFile == null || ucFileId <= prevId ||
openFileIds.contains(ucFileId)) {

View File

@ -52,16 +52,12 @@ public class BlockReportContext {
private final long leaseId;
private final boolean sorted;
public BlockReportContext(int totalRpcs, int curRpc,
long reportId, long leaseId,
boolean sorted) {
long reportId, long leaseId) {
this.totalRpcs = totalRpcs;
this.curRpc = curRpc;
this.reportId = reportId;
this.leaseId = leaseId;
this.sorted = sorted;
public int getTotalRpcs() {
@ -79,8 +75,4 @@ public long getReportId() {
public long getLeaseId() {
return leaseId;
public boolean isSorted() {
return sorted;

View File

@ -140,6 +140,7 @@ public HeartbeatResponse sendHeartbeat(DatanodeRegistration registration,
* Each finalized block is represented as 3 longs. Each under-
* construction replica is represented as 4 longs.
* This is done instead of Block[] to reduce memory used by block reports.
* @param reports report of blocks per storage
* @param context Context information for this block report.
* @return - the next command for DN to process.

View File

@ -257,8 +257,9 @@ message BlockReportContextProto {
// bypass rate-limiting.
optional uint64 leaseId = 4 [ default = 0 ];
// for compatibility, field number 5 should not be reused, see HDFS-13671.
// True if the reported blocks are sorted by increasing block IDs
optional bool sorted = 5 [default = false];
// optional bool sorted = 5 [default = false];

View File

@ -5098,32 +5098,6 @@
Timeout value in ms for the StorageInfo compaction run.
The thread for checking the StorageInfo for defragmentation will
run periodically. The time between runs is determined by this
The defragmentation threshold for the StorageInfo.

View File

@ -173,7 +173,7 @@ private void thistest(Configuration conf, DFSTestUtil util) throws Exception {
final DataNode dn = cluster.getDataNodes().get(dnIdx);
final String bpid = cluster.getNamesystem().getBlockPoolId();
List<ReplicaInfo> replicas =
assertTrue("Replicas do not exist", !replicas.isEmpty());
for (int idx = 0; idx < replicas.size(); idx++) {

View File

@ -567,7 +567,7 @@ private void testErasureCodingWorkerXmitsWeight(
writeFile(fs, "/ec-xmits-weight", fileLen);
DataNode dn = cluster.getDataNodes().get(0);
int corruptBlocks = dn.getFSDataset().getSortedFinalizedBlocks(
int corruptBlocks = dn.getFSDataset().getFinalizedBlocks(
int expectedXmits = corruptBlocks * expectedWeight;

View File

@ -240,7 +240,7 @@ public BlockReportResponseProto answer(InvocationOnMock invocation) {
nn.blockReport(reg, "pool", sbr,
new BlockReportContext(1, 0, System.nanoTime(), 0L, true));
new BlockReportContext(1, 0, System.nanoTime(), 0L));
BlockReportRequestProto proto = request.get();
@ -253,7 +253,7 @@ public BlockReportResponseProto answer(InvocationOnMock invocation) {
StorageBlockReport[] obp = new StorageBlockReport[] {
new StorageBlockReport(new DatanodeStorage("s1"), blockList) };
nn.blockReport(reg, "pool", obp,
new BlockReportContext(1, 0, System.nanoTime(), 0L, true));
new BlockReportContext(1, 0, System.nanoTime(), 0L));
proto = request.get();

View File

@ -19,6 +19,12 @@
import static org.apache.hadoop.hdfs.server.namenode.INodeId.INVALID_INODE_ID;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Random;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ -28,6 +34,7 @@
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo.AddBlockResult;
import org.apache.hadoop.hdfs.server.common.GenerationStamp;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.junit.Assert;
import org.junit.Test;
@ -133,4 +140,85 @@ public void testAddStorageWithDifferentBlock() throws Exception {
"storageID", "");
blockInfo1.addStorage(storage, blockInfo2);
public void testBlockListMoveToHead() throws Exception {
LOG.info("BlockInfo moveToHead tests...");
final int maxBlocks = 10;
DatanodeStorageInfo dd =
DFSTestUtil.createDatanodeStorageInfo("s1", "");
ArrayList<Block> blockList = new ArrayList<Block>(maxBlocks);
ArrayList<BlockInfo> blockInfoList = new ArrayList<BlockInfo>();
int headIndex;
int curIndex;
LOG.info("Building block list...");
for (int i = 0; i < maxBlocks; i++) {
blockList.add(new Block(i, 0, GenerationStamp.LAST_RESERVED_STAMP));
blockInfoList.add(new BlockInfoContiguous(blockList.get(i), (short) 3));
// index of the datanode should be 0
assertEquals("Find datanode should be 0", 0, blockInfoList.get(i)
// list length should be equal to the number of blocks we inserted
LOG.info("Checking list length...");
assertEquals("Length should be MAX_BLOCK", maxBlocks, dd.numBlocks());
Iterator<BlockInfo> it = dd.getBlockIterator();
int len = 0;
while (it.hasNext()) {
assertEquals("There should be MAX_BLOCK blockInfo's", maxBlocks, len);
headIndex = dd.getBlockListHeadForTesting().findStorageInfo(dd);
LOG.info("Moving each block to the head of the list...");
for (int i = 0; i < maxBlocks; i++) {
curIndex = blockInfoList.get(i).findStorageInfo(dd);
headIndex = dd.moveBlockToHead(blockInfoList.get(i), curIndex, headIndex);
// the moved element must be at the head of the list
assertEquals("Block should be at the head of the list now.",
blockInfoList.get(i), dd.getBlockListHeadForTesting());
// move head of the list to the head - this should not change the list
LOG.info("Moving head to the head...");
BlockInfo temp = dd.getBlockListHeadForTesting();
curIndex = 0;
headIndex = 0;
dd.moveBlockToHead(temp, curIndex, headIndex);
"Moving head to the head of the list shopuld not change the list",
temp, dd.getBlockListHeadForTesting());
// check all elements of the list against the original blockInfoList
LOG.info("Checking elements of the list...");
temp = dd.getBlockListHeadForTesting();
assertNotNull("Head should not be null", temp);
int c = maxBlocks - 1;
while (temp != null) {
assertEquals("Expected element is not on the list",
blockInfoList.get(c--), temp);
temp = temp.getNext(0);
LOG.info("Moving random blocks to the head of the list...");
headIndex = dd.getBlockListHeadForTesting().findStorageInfo(dd);
Random rand = new Random();
for (int i = 0; i < maxBlocks; i++) {
int j = rand.nextInt(maxBlocks);
curIndex = blockInfoList.get(j).findStorageInfo(dd);
headIndex = dd.moveBlockToHead(blockInfoList.get(j), curIndex, headIndex);
// the moved element must be at the head of the list
assertEquals("Block should be at the head of the list now.",
blockInfoList.get(j), dd.getBlockListHeadForTesting());

View File

@ -65,7 +65,6 @@
import org.apache.hadoop.hdfs.server.namenode.TestINodeFile;
import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
import org.apache.hadoop.hdfs.server.namenode.ha.HAState;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
@ -97,7 +96,6 @@
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.LinkedList;
@ -1037,8 +1035,7 @@ public void testSafeModeIBRBeforeFirstFullBR() throws Exception {
// Make sure it's the first full report
assertEquals(0, ds.getBlockReportCount());
bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
new BlockReportContext(1, 0, System.nanoTime(), 0, true));
builder.build(), null);
assertEquals(1, ds.getBlockReportCount());
// verify the storage info is correct
@ -1104,67 +1101,6 @@ public void testSafeModeWithProvidedStorageBR() throws Exception {
assertEquals(1, ds1.getBlockReportCount());
public void testFullBR() throws Exception {
DatanodeDescriptor node = nodes.get(0);
DatanodeStorageInfo ds = node.getStorageInfos()[0];
DatanodeRegistration nodeReg = new DatanodeRegistration(node, null, null, "");
// register new node
assertEquals(node, bm.getDatanodeManager().getDatanode(node));
assertEquals(0, ds.getBlockReportCount());
ArrayList<BlockInfo> blocks = new ArrayList<>();
for (int id = 24; id > 0; id--) {
// Make sure it's the first full report
assertEquals(0, ds.getBlockReportCount());
bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
new BlockReportContext(1, 0, System.nanoTime(), 0, false));
assertEquals(1, ds.getBlockReportCount());
// verify the storage info is correct
for (BlockInfo block : blocks) {
assertTrue(bm.getStoredBlock(block).findStorageInfo(ds) >= 0);
// Send unsorted report
bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
new BlockReportContext(1, 0, System.nanoTime(), 0, false));
assertEquals(2, ds.getBlockReportCount());
// verify the storage info is correct
for (BlockInfo block : blocks) {
assertTrue(bm.getStoredBlock(block).findStorageInfo(ds) >= 0);
// Sort list and send a sorted report
bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
new BlockReportContext(1, 0, System.nanoTime(), 0, true));
assertEquals(3, ds.getBlockReportCount());
// verify the storage info is correct
for (BlockInfo block : blocks) {
assertTrue(bm.getStoredBlock(block).findStorageInfo(ds) >= 0);
private BlockListAsLongs generateReport(List<BlockInfo> blocks) {
BlockListAsLongs.Builder builder = BlockListAsLongs.builder();
for (BlockInfo block : blocks) {
builder.add(new FinalizedReplica(block, null, null));
return builder.build();
public void testUCBlockNotConsideredMissing() throws Exception {
DatanodeDescriptor node = nodes.get(0);
@ -1695,8 +1631,8 @@ private void verifyPlacementPolicy(final MiniDFSCluster cluster,
LocatedBlock lb = DFSTestUtil.getAllBlocks(dfs, file).get(0);
BlockInfo blockInfo =
Iterator<DatanodeStorageInfo> itr = blockInfo.getStorageInfos();
LOG.info("Block " + blockInfo + " storages: ");
Iterator<DatanodeStorageInfo> itr = blockInfo.getStorageInfos();
while (itr.hasNext()) {
DatanodeStorageInfo dn = itr.next();
LOG.info(" Rack: " + dn.getDatanodeDescriptor().getNetworkLocation()

View File

@ -97,14 +97,13 @@ public void testCheckBlockReportLease() throws Exception {
DelayAnswer delayer = new DelayAnswer(BlockManager.LOG);
ExecutorService pool = Executors.newFixedThreadPool(1);
// Trigger sendBlockReport
BlockReportContext brContext = new BlockReportContext(1, 0,
rand.nextLong(), hbResponse.getFullBlockReportLeaseId(), true);
rand.nextLong(), hbResponse.getFullBlockReportLeaseId());
Future<DatanodeCommand> sendBRfuturea = pool.submit(() -> {
// Build every storage with 100 blocks for sending report
DatanodeStorage[] datanodeStorages

View File

@ -46,6 +46,7 @@
import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
public class TestReconstructStripedBlocksWithRackAwareness {
@ -172,7 +173,9 @@ public void testReconstructForNotEnoughRacks() throws Exception {
// we now should have 9 internal blocks distributed in 5 racks
Set<String> rackSet = new HashSet<>();
for (DatanodeStorageInfo storage : blockInfo.storages) {
Iterator<DatanodeStorageInfo> it = blockInfo.getStorageInfos();
while (it.hasNext()){
DatanodeStorageInfo storage = it.next();
Assert.assertEquals("rackSet size is wrong: " + rackSet, dataBlocks - 1,
@ -203,7 +206,9 @@ public void testReconstructForNotEnoughRacks() throws Exception {
// check if redundancy monitor correctly schedule the reconstruction work.
boolean scheduled = false;
for (int i = 0; i < 5; i++) { // retry 5 times
for (DatanodeStorageInfo storage : blockInfo.storages) {
it = blockInfo.getStorageInfos();
while (it.hasNext()){
DatanodeStorageInfo storage = it.next();
if (storage != null) {
DatanodeDescriptor dn = storage.getDatanodeDescriptor();
Assert.assertEquals("Block to be erasure coded is wrong for datanode:"

View File

@ -1507,7 +1507,7 @@ public StorageReport[] getStorageReports(String bpid) {
public List<ReplicaInfo> getSortedFinalizedBlocks(String bpid) {
public List<ReplicaInfo> getFinalizedBlocks(String bpid) {
throw new UnsupportedOperationException();

View File

@ -19,8 +19,6 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -118,13 +116,12 @@ public void testBlockHasMultipleReplicasOnSameDN() throws IOException {
StorageBlockReport reports[] =
new StorageBlockReport[cluster.getStoragesPerDatanode()];
ArrayList<ReplicaInfo> blocks = new ArrayList<>();
ArrayList<Replica> blocks = new ArrayList<>();
for (LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()) {
Block localBlock = locatedBlock.getBlock().getLocalBlock();
blocks.add(new FinalizedReplica(localBlock, null, null));
try (FsDatasetSpi.FsVolumeReferences volumes =
dn.getFSDataset().getFsVolumeReferences()) {
@ -137,7 +134,7 @@ public void testBlockHasMultipleReplicasOnSameDN() throws IOException {
// Should not assert!
cluster.getNameNodeRpc().blockReport(dnReg, bpid, reports,
new BlockReportContext(1, 0, System.nanoTime(), 0L, true));
new BlockReportContext(1, 0, System.nanoTime(), 0L));
// Get the block locations once again.
locatedBlocks = client.getLocatedBlocks(filename, 0, BLOCK_SIZE * NUM_BLOCKS);

View File

@ -58,7 +58,6 @@ public class TestLargeBlockReport {
private DatanodeStorage dnStorage;
private final long reportId = 1;
private final long fullBrLeaseId = 0;
private final boolean sorted = true;
public static void init() {
@ -84,7 +83,7 @@ public void testBlockReportExceedsLengthLimit() throws Exception {
StorageBlockReport[] reports = createReports(6000000);
try {
nnProxy.blockReport(bpRegistration, bpId, reports,
new BlockReportContext(1, 0, reportId, fullBrLeaseId, sorted));
new BlockReportContext(1, 0, reportId, fullBrLeaseId));
fail("Should have failed because of the too long RPC data length");
} catch (Exception e) {
// Expected. We can't reliably assert anything about the exception type
@ -99,7 +98,7 @@ public void testBlockReportSucceedsWithLargerLengthLimit() throws Exception {
StorageBlockReport[] reports = createReports(6000000);
nnProxy.blockReport(bpRegistration, bpId, reports,
new BlockReportContext(1, 0, reportId, fullBrLeaseId, sorted));
new BlockReportContext(1, 0, reportId, fullBrLeaseId));

View File

@ -39,7 +39,7 @@ protected void sendBlockReports(DatanodeRegistration dnR, String poolId,
LOG.info("Sending block report for storage " + report.getStorage().getStorageID());
StorageBlockReport[] singletonReport = { report };
cluster.getNameNodeRpc().blockReport(dnR, poolId, singletonReport,
new BlockReportContext(reports.length, i, System.nanoTime(), 0L, true));
new BlockReportContext(reports.length, i, System.nanoTime(), 0L));

View File

@ -36,6 +36,6 @@ protected void sendBlockReports(DatanodeRegistration dnR, String poolId,
StorageBlockReport[] reports) throws IOException {
LOG.info("Sending combined block reports for " + dnR);
cluster.getNameNodeRpc().blockReport(dnR, poolId, reports,
new BlockReportContext(1, 0, System.nanoTime(), 0L, true));
new BlockReportContext(1, 0, System.nanoTime(), 0L));

View File

@ -90,7 +90,7 @@ public Map<String, Object> getVolumeInfoMap() {
public List<ReplicaInfo> getSortedFinalizedBlocks(String bpid) {
public List<ReplicaInfo> getFinalizedBlocks(String bpid) {
return null;

View File

@ -81,7 +81,6 @@
import java.io.Writer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.HashSet;
import java.util.List;
@ -572,41 +571,6 @@ public void testAddVolumeFailureReleasesInUseLock() throws IOException {
* This test is here primarily to catch any case where the datanode replica
* map structure is changed to a new structure which is not sorted and hence
* reading the blocks from it directly would not be sorted.
public void testSortedFinalizedBlocksAreSorted() throws IOException {
this.conf = new HdfsConfiguration();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
try {
DataNode dn = cluster.getDataNodes().get(0);
FsDatasetSpi<?> ds = DataNodeTestUtils.getFSDataset(dn);
ds.addBlockPool(BLOCKPOOL, conf);
// Load 1000 blocks with random blockIDs
for (int i=0; i<=1000; i++) {
ExtendedBlock eb = new ExtendedBlock(
BLOCKPOOL, new Random().nextInt(), 1000, 1000 + i);
// Get the sorted blocks and validate the arrayList is sorted
List<ReplicaInfo> replicaList = ds.getSortedFinalizedBlocks(BLOCKPOOL);
for (int i=0; i<replicaList.size() - 1; i++) {
if (replicaList.get(i).compareTo(replicaList.get(i+1)) > 0) {
// Not sorted so fail the test
fail("ArrayList is not sorted, and it should be");
} finally {
public void testDeletingBlocks() throws IOException {
HdfsConfiguration conf = new HdfsConfiguration();

View File

@ -959,7 +959,7 @@ void register() throws IOException {
new StorageBlockReport(storage, BlockListAsLongs.EMPTY)
dataNodeProto.blockReport(dnRegistration, bpid, reports,
new BlockReportContext(1, 0, System.nanoTime(), 0L, true));
new BlockReportContext(1, 0, System.nanoTime(), 0L));
@ -1241,7 +1241,7 @@ long executeOp(int daemonId, int inputIdx, String ignore) throws IOException {
StorageBlockReport[] report = { new StorageBlockReport(
dn.storage, dn.getBlockReportList()) };
dataNodeProto.blockReport(dn.dnRegistration, bpid, report,
new BlockReportContext(1, 0, System.nanoTime(), 0L, true));
new BlockReportContext(1, 0, System.nanoTime(), 0L));
long end = Time.now();
return end-start;

View File

@ -45,7 +45,6 @@
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus;
@ -351,8 +350,7 @@ public void testAddUCReplica() throws Exception {
StorageBlockReport[] reports = {new StorageBlockReport(storage,
bpId, reports,
new BlockReportContext(1, 0, System.nanoTime(), 0, true));
bpId, reports, null);
DatanodeStorageInfo[] locs = lastBlock.getUnderConstructionFeature()

View File

@ -127,7 +127,7 @@ public void testDeadDatanode() throws Exception {
BlockListAsLongs.EMPTY) };
try {
dnp.blockReport(reg, poolId, report,
new BlockReportContext(1, 0, System.nanoTime(), 0L, true));
new BlockReportContext(1, 0, System.nanoTime(), 0L));
fail("Expected IOException is not thrown");
} catch (IOException ex) {
// Expected

View File

@ -1,644 +0,0 @@
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.hadoop.hdfs.util;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.junit.Assert.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
import java.util.Random;
* Test of TreeSet
public class FoldedTreeSetTest {
private static Random srand;
public FoldedTreeSetTest() {
public static void setUpClass() {
long seed = System.nanoTime();
System.out.println("This run uses the random seed " + seed);
srand = new Random(seed);
public static void tearDownClass() {
public void setUp() {
public void tearDown() {
* Test of comparator method, of class TreeSet.
public void testComparator() {
Comparator<String> comparator = new Comparator<String>() {
public int compare(String o1, String o2) {
return o1.compareTo(o2);
assertEquals(null, new FoldedTreeSet<>().comparator());
assertEquals(comparator, new FoldedTreeSet<>(comparator).comparator());
FoldedTreeSet<String> set = new FoldedTreeSet<>(comparator);
assertEquals(5, set.size());
assertEquals("apa", set.get("apa"));
* Test of first method, of class TreeSet.
public void testFirst() {
FoldedTreeSet<Integer> tree = new FoldedTreeSet<>();
for (int i = 0; i < 256; i++) {
tree.add(1024 + i);
assertEquals(1024, tree.first().intValue());
for (int i = 1; i < 256; i++) {
tree.remove(1024 + i);
assertEquals(1024, tree.first().intValue());
* Test of last method, of class TreeSet.
public void testLast() {
FoldedTreeSet<Integer> tree = new FoldedTreeSet<>();
for (int i = 0; i < 256; i++) {
tree.add(1024 + i);
assertEquals(1024 + i, tree.last().intValue());
for (int i = 0; i < 255; i++) {
tree.remove(1024 + i);
assertEquals(1279, tree.last().intValue());
* Test of size method, of class TreeSet.
public void testSize() {
FoldedTreeSet<String> instance = new FoldedTreeSet<>();
String entry = "apa";
assertEquals(0, instance.size());
assertEquals(1, instance.size());
assertEquals(0, instance.size());
* Test of isEmpty method, of class TreeSet.
public void testIsEmpty() {
FoldedTreeSet<String> instance = new FoldedTreeSet<>();
boolean expResult = true;
boolean result = instance.isEmpty();
assertEquals(expResult, result);
assertEquals(expResult, result);
* Test of contains method, of class TreeSet.
public void testContains() {
FoldedTreeSet<String> instance = new FoldedTreeSet<>();
String entry = "apa";
assertEquals(false, instance.contains(entry));
assertEquals(true, instance.contains(entry));
assertEquals(false, instance.contains(entry + entry));
* Test of iterator method, of class TreeSet.
public void testIterator() {
for (int iter = 0; iter < 10; iter++) {
FoldedTreeSet<Holder> set = new FoldedTreeSet<>();
long[] longs = new long[64723];
for (int i = 0; i < longs.length; i++) {
Holder val = new Holder(srand.nextLong());
while (set.contains(val)) {
val = new Holder(srand.nextLong());
longs[i] = val.getId();
assertEquals(longs.length, set.size());
Iterator<Holder> it = set.iterator();
for (int i = 0; i < longs.length; i++) {
Holder val = it.next();
assertEquals(longs[i], val.getId());
// remove randomly to force non linear removes
if (srand.nextBoolean()) {
* Test of toArray method, of class TreeSet.
public void testToArray() {
FoldedTreeSet<Integer> tree = new FoldedTreeSet<>();
ArrayList<Integer> list = new ArrayList<>(256);
for (int i = 0; i < 256; i++) {
list.add(1024 + i);
assertArrayEquals(list.toArray(), tree.toArray());
* Test of toArray method, of class TreeSet.
public void testToArray_GenericType() {
FoldedTreeSet<Integer> tree = new FoldedTreeSet<>();
ArrayList<Integer> list = new ArrayList<>(256);
for (int i = 0; i < 256; i++) {
list.add(1024 + i);
assertArrayEquals(list.toArray(new Integer[tree.size()]), tree.toArray(new Integer[tree.size()]));
assertArrayEquals(list.toArray(new Integer[tree.size() + 100]), tree.toArray(new Integer[tree.size() + 100]));
* Test of add method, of class TreeSet.
public void testAdd() {
FoldedTreeSet<String> simpleSet = new FoldedTreeSet<>();
String entry = "apa";
FoldedTreeSet<Integer> intSet = new FoldedTreeSet<>();
for (int i = 512; i < 1024; i++) {
for (int i = -1024; i < -512; i++) {
for (int i = 0; i < 512; i++) {
for (int i = -512; i < 0; i++) {
assertEquals(2048, intSet.size());
FoldedTreeSet<Holder> set = new FoldedTreeSet<>();
long[] longs = new long[23432];
for (int i = 0; i < longs.length; i++) {
Holder val = new Holder(srand.nextLong());
while (set.contains(val)) {
val = new Holder(srand.nextLong());
longs[i] = val.getId();
assertEquals(longs.length, set.size());
Iterator<Holder> it = set.iterator();
for (int i = 0; i < longs.length; i++) {
Holder val = it.next();
assertEquals(longs[i], val.getId());
// Specially constructed adds to exercise all code paths
FoldedTreeSet<Integer> specialAdds = new FoldedTreeSet<>();
// Fill node with even numbers
for (int i = 0; i < 128; i += 2) {
// Remove left and add left
// Add right and shift everything left
// Empty at both ends
// Add in the middle left to slide entries left
// Add in the middle right to slide entries right
// Add existing entry in the middle of a node
public void testAddOrReplace() {
FoldedTreeSet<String> simpleSet = new FoldedTreeSet<>();
String entry = "apa";
assertEquals(entry, simpleSet.addOrReplace(entry));
FoldedTreeSet<Integer> intSet = new FoldedTreeSet<>();
for (int i = 0; i < 1024; i++) {
for (int i = 0; i < 1024; i++) {
assertEquals(i, intSet.addOrReplace(i).intValue());
private static class Holder implements Comparable<Holder> {
private final long id;
public Holder(long id) {
this.id = id;
public long getId() {
return id;
public int compareTo(Holder o) {
return id < o.getId() ? -1
: id > o.getId() ? 1 : 0;
public void testRemoveWithComparator() {
FoldedTreeSet<Holder> set = new FoldedTreeSet<>();
long[] longs = new long[98327];
for (int i = 0; i < longs.length; i++) {
Holder val = new Holder(srand.nextLong());
while (set.contains(val)) {
val = new Holder(srand.nextLong());
longs[i] = val.getId();
assertEquals(longs.length, set.size());
Comparator<Object> cmp = new Comparator<Object>() {
public int compare(Object o1, Object o2) {
long lookup = (long) o1;
long stored = ((Holder) o2).getId();
return lookup < stored ? -1
: lookup > stored ? 1 : 0;
for (long val : longs) {
set.remove(val, cmp);
assertEquals(0, set.size());
public void testGetWithComparator() {
FoldedTreeSet<Holder> set = new FoldedTreeSet<>();
long[] longs = new long[32147];
for (int i = 0; i < longs.length; i++) {
Holder val = new Holder(srand.nextLong());
while (set.contains(val)) {
val = new Holder(srand.nextLong());
longs[i] = val.getId();
assertEquals(longs.length, set.size());
Comparator<Object> cmp = new Comparator<Object>() {
public int compare(Object o1, Object o2) {
long lookup = (long) o1;
long stored = ((Holder) o2).getId();
return lookup < stored ? -1
: lookup > stored ? 1 : 0;
for (long val : longs) {
assertEquals(val, set.get(val, cmp).getId());
public void testGet() {
FoldedTreeSet<Holder> set = new FoldedTreeSet<>();
long[] longs = new long[43277];
for (int i = 0; i < longs.length; i++) {
Holder val = new Holder(srand.nextLong());
while (set.contains(val)) {
val = new Holder(srand.nextLong());
longs[i] = val.getId();
assertEquals(longs.length, set.size());
for (long val : longs) {
assertEquals(val, set.get(new Holder(val)).getId());
* Test of remove method, of class TreeSet.
public void testRemove() {
FoldedTreeSet<String> instance = new FoldedTreeSet<>();
assertEquals(false, instance.remove("apa"));
assertEquals(true, instance.remove("apa"));
public void removeLeft() {
FoldedTreeSet<Integer> set = new FoldedTreeSet<>();
for (int i = 1; i <= 320; i++) {
for (int i = 193; i < 225; i++) {
assertEquals(true, set.remove(i));
assertEquals(false, set.remove(i));
for (int i = 129; i < 161; i++) {
assertEquals(true, set.remove(i));
assertEquals(false, set.remove(i));
for (int i = 256; i > 224; i--) {
assertEquals(true, set.remove(i));
assertEquals(false, set.remove(i));
for (int i = 257; i < 289; i++) {
assertEquals(true, set.remove(i));
assertEquals(false, set.remove(i));
while (!set.isEmpty()) {
public void removeRight() {
FoldedTreeSet<Integer> set = new FoldedTreeSet<>();
for (int i = 1; i <= 320; i++) {
for (int i = 193; i < 225; i++) {
assertEquals(true, set.remove(i));
assertEquals(false, set.remove(i));
for (int i = 192; i > 160; i--) {
assertEquals(true, set.remove(i));
assertEquals(false, set.remove(i));
for (int i = 256; i > 224; i--) {
assertEquals(true, set.remove(i));
assertEquals(false, set.remove(i));
for (int i = 320; i > 288; i--) {
assertEquals(true, set.remove(i));
assertEquals(false, set.remove(i));
while (!set.isEmpty()) {
public void removeAt() {
FoldedTreeSet<Integer> set = new FoldedTreeSet<>();
for (int i = 1; i <= 320; i++) {
for (int i = 193; i < 225; i++) {
assertEquals(true, set.remove(i));
assertEquals(false, set.remove(i));
for (int i = 160; i < 192; i++) {
assertEquals(true, set.remove(i));
assertEquals(false, set.remove(i));
for (int i = 225; i < 257; i++) {
assertEquals(true, set.remove(i));
assertEquals(false, set.remove(i));
for (int i = 288; i < 320; i++) {
assertEquals(true, set.remove(i));
assertEquals(false, set.remove(i));
public void removeRandom() {
FoldedTreeSet<Integer> set = new FoldedTreeSet<>();
int[] integers = new int[2048];
for (int i = 0; i < 2048; i++) {
int val = srand.nextInt();
while (set.contains(val)) {
val = srand.nextInt();
integers[i] = val;
assertEquals(2048, set.size());
for (int val : integers) {
assertEquals(true, set.remove(val));
assertEquals(false, set.remove(val));
assertEquals(true, set.isEmpty());
* Test of containsAll method, of class TreeSet.
public void testContainsAll() {
Collection<String> list = Arrays.asList(new String[]{"apa", "apa2", "apa"});
FoldedTreeSet<String> instance = new FoldedTreeSet<>();
assertEquals(false, instance.containsAll(list));
assertEquals(true, instance.containsAll(list));
* Test of addAll method, of class TreeSet.
public void testAddAll() {
Collection<String> list = Arrays.asList(new String[]{"apa", "apa2", "apa"});
FoldedTreeSet<String> instance = new FoldedTreeSet<>();
assertEquals(true, instance.addAll(list));
assertEquals(false, instance.addAll(list)); // add same entries again
* Test of retainAll method, of class TreeSet.
public void testRetainAll() {
Collection<String> list = Arrays.asList(new String[]{"apa", "apa2", "apa"});
FoldedTreeSet<String> instance = new FoldedTreeSet<>();
assertEquals(false, instance.retainAll(list));
assertEquals(2, instance.size());
Collection<String> list2 = Arrays.asList(new String[]{"apa"});
assertEquals(true, instance.retainAll(list2));
assertEquals(1, instance.size());
* Test of removeAll method, of class TreeSet.
public void testRemoveAll() {
Collection<String> list = Arrays.asList(new String[]{"apa", "apa2", "apa"});
FoldedTreeSet<String> instance = new FoldedTreeSet<>();
assertEquals(false, instance.removeAll(list));
assertEquals(true, instance.removeAll(list));
assertEquals(true, instance.isEmpty());
* Test of clear method, of class TreeSet.
public void testClear() {
FoldedTreeSet<String> instance = new FoldedTreeSet<>();
assertEquals(true, instance.isEmpty());
assertEquals(false, instance.isEmpty());
assertEquals(true, instance.isEmpty());
public void testFillRatio() {
FoldedTreeSet<Integer> set = new FoldedTreeSet<>();
final int size = 1024;
for (int i = 1; i <= size; i++) {
assertEquals("Iteration: " + i, 1.0, set.fillRatio(), 0.0);
for (int i = 1; i <= size / 2; i++) {
set.remove(i * 2);
// Need the max since all the removes from the last node doesn't
// affect the fill ratio
assertEquals("Iteration: " + i,
Math.max((size - i) / (double) size, 0.53125),
set.fillRatio(), 0.0);
public void testCompact() {
FoldedTreeSet<Holder> set = new FoldedTreeSet<>();
long[] longs = new long[24553];
for (int i = 0; i < longs.length; i++) {
Holder val = new Holder(srand.nextLong());
while (set.contains(val)) {
val = new Holder(srand.nextLong());
longs[i] = val.getId();
assertEquals(longs.length, set.size());
long[] longs2 = new long[longs.length];
for (int i = 0; i < longs2.length; i++) {
Holder val = new Holder(srand.nextLong());
while (set.contains(val)) {
val = new Holder(srand.nextLong());
longs2[i] = val.getId();
assertEquals(longs.length + longs2.length, set.size());
// Create fragementation
for (long val : longs) {
assertTrue(set.remove(new Holder(val)));
assertEquals(longs2.length, set.size());
assertEquals(longs2.length, set.size());
for (long val : longs) {
assertFalse(set.remove(new Holder(val)));
for (long val : longs2) {
assertEquals(val, set.get(new Holder(val)).getId());