diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java index c0a959c9268..e873946a520 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java @@ -39,12 +39,12 @@ public interface BlockCollection { public ContentSummary computeContentSummary(BlockStoragePolicySuite bsps); /** - * @return the number of blocks + * @return the number of blocks or block groups */ public int numBlocks(); /** - * Get the blocks. + * Get the blocks or block groups. */ public BlockInfoContiguous[] getBlocks(); @@ -55,8 +55,8 @@ public interface BlockCollection { public long getPreferredBlockSize(); /** - * Get block replication for the collection - * @return block replication value + * Get block replication for the collection. + * @return block replication value. Return 0 if the file is erasure coded. */ public short getPreferredBlockReplication(); @@ -71,7 +71,7 @@ public interface BlockCollection { public String getName(); /** - * Set the block at the given index. + * Set the block/block-group at the given index. */ public void setBlock(int index, BlockInfoContiguous blk); @@ -79,7 +79,8 @@ public interface BlockCollection { * Convert the last block of the collection to an under-construction block * and set the locations. */ - public BlockInfoContiguousUnderConstruction setLastBlock(BlockInfoContiguous lastBlock, + public BlockInfoContiguousUnderConstruction setLastBlock( + BlockInfoContiguous lastBlock, DatanodeStorageInfo[] targets) throws IOException; /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockIdManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockIdManager.java index 230d9a5d1e9..fa800c572f2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockIdManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockIdManager.java @@ -218,6 +218,11 @@ public class BlockIdManager { } public static long convertToGroupID(long id) { - return id & (~(HdfsConstants.MAX_BLOCKS_IN_GROUP - 1)); + return id & (~HdfsConstants.BLOCK_GROUP_INDEX_MASK); + } + + public static int getBlockIndex(Block reportedBlock) { + return (int) (reportedBlock.getBlockId() & + HdfsConstants.BLOCK_GROUP_INDEX_MASK); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java new file mode 100644 index 00000000000..d3ea813b2eb --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java @@ -0,0 +1,343 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.blockmanagement; + +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; +import org.apache.hadoop.util.LightWeightGSet; + +import java.util.LinkedList; + +/** + * For a given block (or an erasure coding block group), BlockInfo class + * maintains 1) the {@link BlockCollection} it is part of, and 2) datanodes + * where the replicas of the block, or blocks belonging to the erasure coding + * block group, are stored. + */ +public abstract class BlockInfo extends Block + implements LightWeightGSet.LinkedElement { + private BlockCollection bc; + + /** For implementing {@link LightWeightGSet.LinkedElement} interface */ + private LightWeightGSet.LinkedElement nextLinkedElement; + + /** + * This array contains triplets of references. For each i-th storage, the + * block belongs to triplets[3*i] is the reference to the + * {@link DatanodeStorageInfo} and triplets[3*i+1] and triplets[3*i+2] are + * references to the previous and the next blocks, respectively, in the list + * of blocks belonging to this storage. + * + * Using previous and next in Object triplets is done instead of a + * {@link LinkedList} list to efficiently use memory. With LinkedList the cost + * per replica is 42 bytes (LinkedList#Entry object per replica) versus 16 + * bytes using the triplets. + */ + protected Object[] triplets; + + /** + * Construct an entry for blocksmap + * @param size the block's replication factor, or the total number of blocks + * in the block group + */ + public BlockInfo(short size) { + this.triplets = new Object[3 * size]; + this.bc = null; + } + + public BlockInfo(Block blk, short size) { + super(blk); + this.triplets = new Object[3 * size]; + this.bc = null; + } + + public BlockCollection getBlockCollection() { + return bc; + } + + public void setBlockCollection(BlockCollection bc) { + this.bc = bc; + } + + public DatanodeDescriptor getDatanode(int index) { + DatanodeStorageInfo storage = getStorageInfo(index); + return storage == null ? null : storage.getDatanodeDescriptor(); + } + + DatanodeStorageInfo getStorageInfo(int index) { + assert this.triplets != null : "BlockInfo is not initialized"; + assert index >= 0 && index*3 < triplets.length : "Index is out of bound"; + return (DatanodeStorageInfo)triplets[index*3]; + } + + BlockInfo getPrevious(int index) { + assert this.triplets != null : "BlockInfo is not initialized"; + assert index >= 0 && index*3+1 < triplets.length : "Index is out of bound"; + return (BlockInfo) triplets[index*3+1]; + } + + BlockInfo getNext(int index) { + assert this.triplets != null : "BlockInfo is not initialized"; + assert index >= 0 && index*3+2 < triplets.length : "Index is out of bound"; + return (BlockInfo) triplets[index*3+2]; + } + + void setStorageInfo(int index, DatanodeStorageInfo storage) { + assert this.triplets != null : "BlockInfo is not initialized"; + assert index >= 0 && index*3 < triplets.length : "Index is out of bound"; + triplets[index*3] = storage; + } + + /** + * Return the previous block on the block list for the datanode at + * position index. Set the previous block on the list to "to". + * + * @param index - the datanode index + * @param to - block to be set to previous on the list of blocks + * @return current previous block on the list of blocks + */ + BlockInfo setPrevious(int index, BlockInfo to) { + assert this.triplets != null : "BlockInfo is not initialized"; + assert index >= 0 && index*3+1 < triplets.length : "Index is out of bound"; + BlockInfo info = (BlockInfo) triplets[index*3+1]; + triplets[index*3+1] = to; + return info; + } + + /** + * Return the next block on the block list for the datanode at + * position index. Set the next block on the list to "to". + * + * @param index - the datanode index + * @param to - block to be set to next on the list of blocks + * @return current next block on the list of blocks + */ + BlockInfo setNext(int index, BlockInfo to) { + assert this.triplets != null : "BlockInfo is not initialized"; + assert index >= 0 && index*3+2 < triplets.length : "Index is out of bound"; + BlockInfo info = (BlockInfo) triplets[index*3+2]; + triplets[index*3+2] = to; + return info; + } + + public int getCapacity() { + assert this.triplets != null : "BlockInfo is not initialized"; + assert triplets.length % 3 == 0 : "Malformed BlockInfo"; + return triplets.length / 3; + } + + /** + * Count the number of data-nodes the block currently belongs to (i.e., NN + * has received block reports from the DN). + */ + public abstract int numNodes(); + + /** + * Add a {@link DatanodeStorageInfo} location for a block + * @param storage The storage to add + * @param reportedBlock The block reported from the datanode. This is only + * used by erasure coded blocks, this block's id contains + * information indicating the index of the block in the + * corresponding block group. + */ + abstract boolean addStorage(DatanodeStorageInfo storage, Block reportedBlock); + + /** + * Remove {@link DatanodeStorageInfo} location for a block + */ + abstract boolean removeStorage(DatanodeStorageInfo storage); + + /** + * Replace the current BlockInfo with the new one in corresponding + * DatanodeStorageInfo's linked list + */ + abstract void replaceBlock(BlockInfo newBlock); + + /** + * Find specified DatanodeDescriptor. + * @return index or -1 if not found. + */ + boolean findDatanode(DatanodeDescriptor dn) { + int len = getCapacity(); + for (int idx = 0; idx < len; idx++) { + DatanodeDescriptor cur = getDatanode(idx); + if(cur == dn) { + return true; + } + } + return false; + } + + /** + * Find specified DatanodeStorageInfo. + * @return DatanodeStorageInfo or null if not found. + */ + DatanodeStorageInfo findStorageInfo(DatanodeDescriptor dn) { + int len = getCapacity(); + for(int idx = 0; idx < len; idx++) { + DatanodeStorageInfo cur = getStorageInfo(idx); + if(cur != null && cur.getDatanodeDescriptor() == dn) { + return cur; + } + } + return null; + } + + /** + * Find specified DatanodeStorageInfo. + * @return index or -1 if not found. + */ + int findStorageInfo(DatanodeStorageInfo storageInfo) { + int len = getCapacity(); + for(int idx = 0; idx < len; idx++) { + DatanodeStorageInfo cur = getStorageInfo(idx); + if (cur == storageInfo) { + return idx; + } + } + return -1; + } + + /** + * Insert this block into the head of the list of blocks + * related to the specified DatanodeStorageInfo. + * If the head is null then form a new list. + * @return current block as the new head of the list. + */ + BlockInfo listInsert(BlockInfo head, DatanodeStorageInfo storage) { + int dnIndex = this.findStorageInfo(storage); + assert dnIndex >= 0 : "Data node is not found: current"; + assert getPrevious(dnIndex) == null && getNext(dnIndex) == null : + "Block is already in the list and cannot be inserted."; + this.setPrevious(dnIndex, null); + this.setNext(dnIndex, head); + if (head != null) { + head.setPrevious(head.findStorageInfo(storage), this); + } + return this; + } + + /** + * Remove this block from the list of blocks + * related to the specified DatanodeStorageInfo. + * If this block is the head of the list then return the next block as + * the new head. + * @return the new head of the list or null if the list becomes + * empy after deletion. + */ + BlockInfo listRemove(BlockInfo head, DatanodeStorageInfo storage) { + if (head == null) { + return null; + } + int dnIndex = this.findStorageInfo(storage); + if (dnIndex < 0) { // this block is not on the data-node list + return head; + } + + BlockInfo next = this.getNext(dnIndex); + BlockInfo prev = this.getPrevious(dnIndex); + this.setNext(dnIndex, null); + this.setPrevious(dnIndex, null); + if (prev != null) { + prev.setNext(prev.findStorageInfo(storage), next); + } + if (next != null) { + next.setPrevious(next.findStorageInfo(storage), prev); + } + if (this == head) { // removing the head + head = next; + } + return head; + } + + /** + * Remove this block from the list of blocks related to the specified + * DatanodeDescriptor. Insert it into the head of the list of blocks. + * + * @return the new head of the list. + */ + public BlockInfo moveBlockToHead(BlockInfo head, DatanodeStorageInfo storage, + int curIndex, int headIndex) { + if (head == this) { + return this; + } + BlockInfo next = this.setNext(curIndex, head); + BlockInfo prev = this.setPrevious(curIndex, null); + + head.setPrevious(headIndex, this); + prev.setNext(prev.findStorageInfo(storage), next); + if (next != null) { + next.setPrevious(next.findStorageInfo(storage), prev); + } + return this; + } + + /** + * BlockInfo represents a block that is not being constructed. + * In order to start modifying the block, the BlockInfo should be converted + * to {@link BlockInfoContiguousUnderConstruction}. + * @return {@link HdfsServerConstants.BlockUCState#COMPLETE} + */ + public HdfsServerConstants.BlockUCState getBlockUCState() { + return HdfsServerConstants.BlockUCState.COMPLETE; + } + + /** + * Is this block complete? + * + * @return true if the state of the block is + * {@link HdfsServerConstants.BlockUCState#COMPLETE} + */ + public boolean isComplete() { + return getBlockUCState().equals(HdfsServerConstants.BlockUCState.COMPLETE); + } + + public boolean isDeleted() { + return (bc == null); + } + + @Override + public int hashCode() { + // Super implementation is sufficient + return super.hashCode(); + } + + @Override + public boolean equals(Object obj) { + // Sufficient to rely on super's implementation + return (this == obj) || super.equals(obj); + } + + @Override + public LightWeightGSet.LinkedElement getNext() { + return nextLinkedElement; + } + + @Override + public void setNext(LightWeightGSet.LinkedElement next) { + this.nextLinkedElement = next; + } + + static BlockInfo copyOf(BlockInfo b) { + if (b instanceof BlockInfoContiguous) { + return new BlockInfoContiguous((BlockInfoContiguous) b); + } else { + return new BlockInfoStriped((BlockInfoStriped) b); + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java index 769046b1f5c..e30e022eaa5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java @@ -17,66 +17,34 @@ */ package org.apache.hadoop.hdfs.server.blockmanagement; -import java.util.LinkedList; - import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; -import org.apache.hadoop.util.LightWeightGSet; /** - * BlockInfo class maintains for a given block - * the {@link BlockCollection} it is part of and datanodes where the replicas of - * the block are stored. + * Subclass of {@link BlockInfo}, used for a block with replication scheme. */ @InterfaceAudience.Private -public class BlockInfoContiguous extends Block - implements LightWeightGSet.LinkedElement { +public class BlockInfoContiguous extends BlockInfo { public static final BlockInfoContiguous[] EMPTY_ARRAY = {}; - private BlockCollection bc; - - /** For implementing {@link LightWeightGSet.LinkedElement} interface */ - private LightWeightGSet.LinkedElement nextLinkedElement; - - /** - * This array contains triplets of references. For each i-th storage, the - * block belongs to triplets[3*i] is the reference to the - * {@link DatanodeStorageInfo} and triplets[3*i+1] and triplets[3*i+2] are - * references to the previous and the next blocks, respectively, in the list - * of blocks belonging to this storage. - * - * Using previous and next in Object triplets is done instead of a - * {@link LinkedList} list to efficiently use memory. With LinkedList the cost - * per replica is 42 bytes (LinkedList#Entry object per replica) versus 16 - * bytes using the triplets. - */ - private Object[] triplets; - - /** - * Construct an entry for blocksmap - * @param replication the block's replication factor - */ - public BlockInfoContiguous(short replication) { - this.triplets = new Object[3*replication]; - this.bc = null; + public BlockInfoContiguous(short size) { + super(size); } - - public BlockInfoContiguous(Block blk, short replication) { - super(blk); - this.triplets = new Object[3*replication]; - this.bc = null; + + public BlockInfoContiguous(Block blk, short size) { + super(blk, size); } /** * Copy construction. - * This is used to convert BlockInfoUnderConstruction - * @param from BlockInfo to copy from. + * This is used to convert BlockReplicationInfoUnderConstruction + * @param from BlockReplicationInfo to copy from. */ protected BlockInfoContiguous(BlockInfoContiguous from) { - super(from); + this(from, from.getBlockCollection().getBlockReplication()); this.triplets = new Object[from.triplets.length]; - this.bc = from.bc; + this.setBlockCollection(from.getBlockCollection()); } public BlockCollection getBlockCollection() { @@ -173,9 +141,10 @@ public class BlockInfoContiguous extends Block private int ensureCapacity(int num) { assert this.triplets != null : "BlockInfo is not initialized"; int last = numNodes(); - if(triplets.length >= (last+num)*3) + if (triplets.length >= (last+num)*3) { return last; - /* Not enough space left. Create a new array. Should normally + } + /* Not enough space left. Create a new array. Should normally * happen only when replication is manually increased by the user. */ Object[] old = triplets; triplets = new Object[(last+num)*3]; @@ -183,23 +152,8 @@ public class BlockInfoContiguous extends Block return last; } - /** - * Count the number of data-nodes the block belongs to. - */ - public int numNodes() { - assert this.triplets != null : "BlockInfo is not initialized"; - assert triplets.length % 3 == 0 : "Malformed BlockInfo"; - for(int idx = getCapacity()-1; idx >= 0; idx--) { - if(getDatanode(idx) != null) - return idx+1; - } - return 0; - } - - /** - * Add a {@link DatanodeStorageInfo} location for a block - */ - boolean addStorage(DatanodeStorageInfo storage) { + @Override + boolean addStorage(DatanodeStorageInfo storage, Block reportedBlock) { // find the last null node int lastNode = ensureCapacity(1); setStorageInfo(lastNode, storage); @@ -208,149 +162,53 @@ public class BlockInfoContiguous extends Block return true; } - /** - * Remove {@link DatanodeStorageInfo} location for a block - */ + @Override boolean removeStorage(DatanodeStorageInfo storage) { int dnIndex = findStorageInfo(storage); - if(dnIndex < 0) // the node is not found + if (dnIndex < 0) { // the node is not found return false; - assert getPrevious(dnIndex) == null && getNext(dnIndex) == null : - "Block is still in the list and must be removed first."; + } + assert getPrevious(dnIndex) == null && getNext(dnIndex) == null : + "Block is still in the list and must be removed first."; // find the last not null node - int lastNode = numNodes()-1; - // replace current node triplet by the lastNode one + int lastNode = numNodes()-1; + // replace current node triplet by the lastNode one setStorageInfo(dnIndex, getStorageInfo(lastNode)); - setNext(dnIndex, getNext(lastNode)); - setPrevious(dnIndex, getPrevious(lastNode)); + setNext(dnIndex, getNext(lastNode)); + setPrevious(dnIndex, getPrevious(lastNode)); // set the last triplet to null setStorageInfo(lastNode, null); - setNext(lastNode, null); - setPrevious(lastNode, null); + setNext(lastNode, null); + setPrevious(lastNode, null); return true; } - /** - * Find specified DatanodeStorageInfo. - * @return DatanodeStorageInfo or null if not found. - */ - DatanodeStorageInfo findStorageInfo(DatanodeDescriptor dn) { - int len = getCapacity(); - for(int idx = 0; idx < len; idx++) { - DatanodeStorageInfo cur = getStorageInfo(idx); - if(cur == null) - break; - if(cur.getDatanodeDescriptor() == dn) - return cur; - } - return null; - } - - /** - * Find specified DatanodeStorageInfo. - * @return index or -1 if not found. - */ - int findStorageInfo(DatanodeStorageInfo storageInfo) { - int len = getCapacity(); - for(int idx = 0; idx < len; idx++) { - DatanodeStorageInfo cur = getStorageInfo(idx); - if (cur == storageInfo) { - return idx; - } - if (cur == null) { - break; + @Override + public int numNodes() { + assert this.triplets != null : "BlockInfo is not initialized"; + assert triplets.length % 3 == 0 : "Malformed BlockInfo"; + + for (int idx = getCapacity()-1; idx >= 0; idx--) { + if (getDatanode(idx) != null) { + return idx + 1; } } - return -1; + return 0; } - /** - * Insert this block into the head of the list of blocks - * related to the specified DatanodeStorageInfo. - * If the head is null then form a new list. - * @return current block as the new head of the list. - */ - BlockInfoContiguous listInsert(BlockInfoContiguous head, - DatanodeStorageInfo storage) { - int dnIndex = this.findStorageInfo(storage); - assert dnIndex >= 0 : "Data node is not found: current"; - assert getPrevious(dnIndex) == null && getNext(dnIndex) == null : - "Block is already in the list and cannot be inserted."; - this.setPrevious(dnIndex, null); - this.setNext(dnIndex, head); - if(head != null) - head.setPrevious(head.findStorageInfo(storage), this); - return this; - } + @Override + void replaceBlock(BlockInfo newBlock) { + assert newBlock instanceof BlockInfoContiguous; + for (int i = this.numNodes() - 1; i >= 0; i--) { + final DatanodeStorageInfo storage = this.getStorageInfo(i); + final boolean removed = storage.removeBlock(this); + assert removed : "currentBlock not found."; - /** - * Remove this block from the list of blocks - * related to the specified DatanodeStorageInfo. - * If this block is the head of the list then return the next block as - * the new head. - * @return the new head of the list or null if the list becomes - * empy after deletion. - */ - BlockInfoContiguous listRemove(BlockInfoContiguous head, - DatanodeStorageInfo storage) { - if(head == null) - return null; - int dnIndex = this.findStorageInfo(storage); - if(dnIndex < 0) // this block is not on the data-node list - return head; - - BlockInfoContiguous next = this.getNext(dnIndex); - BlockInfoContiguous prev = this.getPrevious(dnIndex); - this.setNext(dnIndex, null); - this.setPrevious(dnIndex, null); - if(prev != null) - prev.setNext(prev.findStorageInfo(storage), next); - if(next != null) - next.setPrevious(next.findStorageInfo(storage), prev); - if(this == head) // removing the head - head = next; - return head; - } - - /** - * Remove this block from the list of blocks related to the specified - * DatanodeDescriptor. Insert it into the head of the list of blocks. - * - * @return the new head of the list. - */ - public BlockInfoContiguous moveBlockToHead(BlockInfoContiguous head, - DatanodeStorageInfo storage, int curIndex, int headIndex) { - if (head == this) { - return this; + final DatanodeStorageInfo.AddBlockResult result = storage.addBlock( + newBlock, newBlock); + assert result == DatanodeStorageInfo.AddBlockResult.ADDED : + "newBlock already exists."; } - BlockInfoContiguous next = this.setNext(curIndex, head); - BlockInfoContiguous prev = this.setPrevious(curIndex, null); - - head.setPrevious(headIndex, this); - prev.setNext(prev.findStorageInfo(storage), next); - if (next != null) { - next.setPrevious(next.findStorageInfo(storage), prev); - } - return this; - } - - /** - * BlockInfo represents a block that is not being constructed. - * In order to start modifying the block, the BlockInfo should be converted - * to {@link BlockInfoContiguousUnderConstruction}. - * @return {@link BlockUCState#COMPLETE} - */ - public BlockUCState getBlockUCState() { - return BlockUCState.COMPLETE; - } - - /** - * Is this block complete? - * - * @return true if the state of the block is {@link BlockUCState#COMPLETE} - */ - public boolean isComplete() { - return getBlockUCState().equals(BlockUCState.COMPLETE); } /** @@ -368,32 +226,10 @@ public class BlockInfoContiguous extends Block } // the block is already under construction BlockInfoContiguousUnderConstruction ucBlock = - (BlockInfoContiguousUnderConstruction)this; + (BlockInfoContiguousUnderConstruction) this; ucBlock.setBlockUCState(s); ucBlock.setExpectedLocations(targets); ucBlock.setBlockCollection(getBlockCollection()); return ucBlock; } - - @Override - public int hashCode() { - // Super implementation is sufficient - return super.hashCode(); - } - - @Override - public boolean equals(Object obj) { - // Sufficient to rely on super's implementation - return (this == obj) || super.equals(obj); - } - - @Override - public LightWeightGSet.LinkedElement getNext() { - return nextLinkedElement; - } - - @Override - public void setNext(LightWeightGSet.LinkedElement next) { - this.nextLinkedElement = next; - } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguousUnderConstruction.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguousUnderConstruction.java index 92153abb976..c78c9e2c455 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguousUnderConstruction.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguousUnderConstruction.java @@ -59,101 +59,6 @@ public class BlockInfoContiguousUnderConstruction extends BlockInfoContiguous { */ private Block truncateBlock; - /** - * ReplicaUnderConstruction contains information about replicas while - * they are under construction. - * The GS, the length and the state of the replica is as reported by - * the data-node. - * It is not guaranteed, but expected, that data-nodes actually have - * corresponding replicas. - */ - static class ReplicaUnderConstruction extends Block { - private final DatanodeStorageInfo expectedLocation; - private ReplicaState state; - private boolean chosenAsPrimary; - - ReplicaUnderConstruction(Block block, - DatanodeStorageInfo target, - ReplicaState state) { - super(block); - this.expectedLocation = target; - this.state = state; - this.chosenAsPrimary = false; - } - - /** - * Expected block replica location as assigned when the block was allocated. - * This defines the pipeline order. - * It is not guaranteed, but expected, that the data-node actually has - * the replica. - */ - private DatanodeStorageInfo getExpectedStorageLocation() { - return expectedLocation; - } - - /** - * Get replica state as reported by the data-node. - */ - ReplicaState getState() { - return state; - } - - /** - * Whether the replica was chosen for recovery. - */ - boolean getChosenAsPrimary() { - return chosenAsPrimary; - } - - /** - * Set replica state. - */ - void setState(ReplicaState s) { - state = s; - } - - /** - * Set whether this replica was chosen for recovery. - */ - void setChosenAsPrimary(boolean chosenAsPrimary) { - this.chosenAsPrimary = chosenAsPrimary; - } - - /** - * Is data-node the replica belongs to alive. - */ - boolean isAlive() { - return expectedLocation.getDatanodeDescriptor().isAlive; - } - - @Override // Block - public int hashCode() { - return super.hashCode(); - } - - @Override // Block - public boolean equals(Object obj) { - // Sufficient to rely on super's implementation - return (this == obj) || super.equals(obj); - } - - @Override - public String toString() { - final StringBuilder b = new StringBuilder(50); - appendStringTo(b); - return b.toString(); - } - - @Override - public void appendStringTo(StringBuilder sb) { - sb.append("ReplicaUC[") - .append(expectedLocation) - .append("|") - .append(state) - .append("]"); - } - } - /** * Create block and set its state to * {@link BlockUCState#UNDER_CONSTRUCTION}. @@ -165,7 +70,8 @@ public class BlockInfoContiguousUnderConstruction extends BlockInfoContiguous { /** * Create a block that is currently being constructed. */ - public BlockInfoContiguousUnderConstruction(Block blk, short replication, BlockUCState state, DatanodeStorageInfo[] targets) { + public BlockInfoContiguousUnderConstruction(Block blk, short replication, + BlockUCState state, DatanodeStorageInfo[] targets) { super(blk, replication); assert getBlockUCState() != BlockUCState.COMPLETE : "BlockInfoUnderConstruction cannot be in COMPLETE state"; @@ -191,10 +97,11 @@ public class BlockInfoContiguousUnderConstruction extends BlockInfoContiguous { /** Set expected locations */ public void setExpectedLocations(DatanodeStorageInfo[] targets) { int numLocations = targets == null ? 0 : targets.length; - this.replicas = new ArrayList(numLocations); - for(int i = 0; i < numLocations; i++) - replicas.add( - new ReplicaUnderConstruction(this, targets[i], ReplicaState.RBW)); + this.replicas = new ArrayList<>(numLocations); + for(int i = 0; i < numLocations; i++) { + replicas.add(new ReplicaUnderConstruction(this, targets[i], + ReplicaState.RBW)); + } } /** @@ -204,8 +111,9 @@ public class BlockInfoContiguousUnderConstruction extends BlockInfoContiguous { public DatanodeStorageInfo[] getExpectedStorageLocations() { int numLocations = replicas == null ? 0 : replicas.size(); DatanodeStorageInfo[] storages = new DatanodeStorageInfo[numLocations]; - for(int i = 0; i < numLocations; i++) + for (int i = 0; i < numLocations; i++) { storages[i] = replicas.get(i).getExpectedStorageLocation(); + } return storages; } @@ -293,17 +201,17 @@ public class BlockInfoContiguousUnderConstruction extends BlockInfoContiguous { + " No blocks found, lease removed."); } boolean allLiveReplicasTriedAsPrimary = true; - for (int i = 0; i < replicas.size(); i++) { + for (ReplicaUnderConstruction replica : replicas) { // Check if all replicas have been tried or not. - if (replicas.get(i).isAlive()) { - allLiveReplicasTriedAsPrimary = - (allLiveReplicasTriedAsPrimary && replicas.get(i).getChosenAsPrimary()); + if (replica.isAlive()) { + allLiveReplicasTriedAsPrimary = (allLiveReplicasTriedAsPrimary && + replica.getChosenAsPrimary()); } } if (allLiveReplicasTriedAsPrimary) { // Just set all the replicas to be chosen whether they are alive or not. - for (int i = 0; i < replicas.size(); i++) { - replicas.get(i).setChosenAsPrimary(false); + for (ReplicaUnderConstruction replica : replicas) { + replica.setChosenAsPrimary(false); } } long mostRecentLastUpdate = 0; @@ -324,7 +232,8 @@ public class BlockInfoContiguousUnderConstruction extends BlockInfoContiguous { } } if (primary != null) { - primary.getExpectedStorageLocation().getDatanodeDescriptor().addBlockToBeRecovered(this); + primary.getExpectedStorageLocation().getDatanodeDescriptor() + .addBlockToBeRecovered(this); primary.setChosenAsPrimary(true); NameNode.blockStateChangeLog.info( "BLOCK* {} recovery started, primary={}", this, primary); @@ -357,18 +266,6 @@ public class BlockInfoContiguousUnderConstruction extends BlockInfoContiguous { replicas.add(new ReplicaUnderConstruction(block, storage, rState)); } - @Override // BlockInfo - // BlockInfoUnderConstruction participates in maps the same way as BlockInfo - public int hashCode() { - return super.hashCode(); - } - - @Override // BlockInfo - public boolean equals(Object obj) { - // Sufficient to rely on super's implementation - return (this == obj) || super.equals(obj); - } - @Override public String toString() { final StringBuilder b = new StringBuilder(100); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java new file mode 100644 index 00000000000..5fff41e59f7 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java @@ -0,0 +1,179 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.blockmanagement; + +import org.apache.hadoop.hdfs.protocol.Block; + +/** + * Subclass of {@link BlockInfo}, presenting a block group in erasure coding. + * + * We still use triplets to store DatanodeStorageInfo for each block in the + * block group, as well as the previous/next block in the corresponding + * DatanodeStorageInfo. For a (m+k) block group, the first (m+k) triplet units + * are sorted and strictly mapped to the corresponding block. + * + * Normally each block belonging to group is stored in only one DataNode. + * However, it is possible that some block is over-replicated. Thus the triplet + * array's size can be larger than (m+k). Thus currently we use an extra byte + * array to record the block index for each triplet. + */ +public class BlockInfoStriped extends BlockInfo { + private final short dataBlockNum; + private final short parityBlockNum; + /** + * Always the same size with triplets. Record the block index for each triplet + * TODO: actually this is only necessary for over-replicated block. Thus can + * be further optimized to save memory usage. + */ + private byte[] indices; + + public BlockInfoStriped(Block blk, short dataBlockNum, short parityBlockNum) { + super(blk, (short) (dataBlockNum + parityBlockNum)); + indices = new byte[dataBlockNum + parityBlockNum]; + initIndices(); + this.dataBlockNum = dataBlockNum; + this.parityBlockNum = parityBlockNum; + } + + BlockInfoStriped(BlockInfoStriped b) { + this(b, b.dataBlockNum, b.parityBlockNum); + this.setBlockCollection(b.getBlockCollection()); + } + + private short getTotalBlockNum() { + return (short) (dataBlockNum + parityBlockNum); + } + + private void initIndices() { + for (int i = 0; i < indices.length; i++) { + indices[i] = -1; + } + } + + private int findSlot() { + int i = getTotalBlockNum(); + for (; i < getCapacity(); i++) { + if (getStorageInfo(i) == null) { + return i; + } + } + // need to expand the triplet size + ensureCapacity(i + 1, true); + return i; + } + + @Override + boolean addStorage(DatanodeStorageInfo storage, Block reportedBlock) { + int blockIndex = BlockIdManager.getBlockIndex(reportedBlock); + int index = blockIndex; + DatanodeStorageInfo old = getStorageInfo(index); + if (old != null && !old.equals(storage)) { // over replicated + // check if the storage has been stored + int i = findStorageInfo(storage); + if (i == -1) { + index = findSlot(); + } else { + return true; + } + } + addStorage(storage, index, blockIndex); + return true; + } + + private void addStorage(DatanodeStorageInfo storage, int index, + int blockIndex) { + setStorageInfo(index, storage); + setNext(index, null); + setPrevious(index, null); + indices[index] = (byte) blockIndex; + } + + private int findStorageInfoFromEnd(DatanodeStorageInfo storage) { + final int len = getCapacity(); + for(int idx = len - 1; idx >= 0; idx--) { + DatanodeStorageInfo cur = getStorageInfo(idx); + if (storage.equals(cur)) { + return idx; + } + } + return -1; + } + + @Override + boolean removeStorage(DatanodeStorageInfo storage) { + int dnIndex = findStorageInfoFromEnd(storage); + if (dnIndex < 0) { // the node is not found + return false; + } + assert getPrevious(dnIndex) == null && getNext(dnIndex) == null : + "Block is still in the list and must be removed first."; + // set the triplet to null + setStorageInfo(dnIndex, null); + setNext(dnIndex, null); + setPrevious(dnIndex, null); + indices[dnIndex] = -1; + return true; + } + + private void ensureCapacity(int totalSize, boolean keepOld) { + if (getCapacity() < totalSize) { + Object[] old = triplets; + byte[] oldIndices = indices; + triplets = new Object[totalSize * 3]; + indices = new byte[totalSize]; + initIndices(); + + if (keepOld) { + System.arraycopy(old, 0, triplets, 0, old.length); + System.arraycopy(oldIndices, 0, indices, 0, oldIndices.length); + } + } + } + + @Override + void replaceBlock(BlockInfo newBlock) { + assert newBlock instanceof BlockInfoStriped; + BlockInfoStriped newBlockGroup = (BlockInfoStriped) newBlock; + final int size = getCapacity(); + newBlockGroup.ensureCapacity(size, false); + for (int i = 0; i < size; i++) { + final DatanodeStorageInfo storage = this.getStorageInfo(i); + if (storage != null) { + final int blockIndex = indices[i]; + final boolean removed = storage.removeBlock(this); + assert removed : "currentBlock not found."; + + newBlockGroup.addStorage(storage, i, blockIndex); + storage.insertToList(newBlockGroup); + } + } + } + + @Override + public int numNodes() { + assert this.triplets != null : "BlockInfo is not initialized"; + assert triplets.length % 3 == 0 : "Malformed BlockInfo"; + int num = 0; + for (int idx = getCapacity()-1; idx >= 0; idx--) { + if (getStorageInfo(idx) != null) { + num++; + } + } + return num; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index dda19434b84..6c0fb3031d4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -599,8 +599,8 @@ public class BlockManager { * of replicas reported from data-nodes. */ private static boolean commitBlock( - final BlockInfoContiguousUnderConstruction block, final Block commitBlock) - throws IOException { + final BlockInfoContiguousUnderConstruction block, + final Block commitBlock) throws IOException { if (block.getBlockUCState() == BlockUCState.COMMITTED) return false; assert block.getNumBytes() <= commitBlock.getNumBytes() : @@ -631,7 +631,7 @@ public class BlockManager { return false; // already completed (e.g. by syncBlock) final boolean b = commitBlock( - (BlockInfoContiguousUnderConstruction) lastBlock, commitBlock); + (BlockInfoContiguousUnderConstruction)lastBlock, commitBlock); if(countNodes(lastBlock).liveReplicas() >= minReplication) completeBlock(bc, bc.numBlocks()-1, false); return b; @@ -644,15 +644,16 @@ public class BlockManager { * @throws IOException if the block does not have at least a minimal number * of replicas reported from data-nodes. */ - private BlockInfoContiguous completeBlock(final BlockCollection bc, + private BlockInfo completeBlock(final BlockCollection bc, final int blkIndex, boolean force) throws IOException { if(blkIndex < 0) return null; BlockInfoContiguous curBlock = bc.getBlocks()[blkIndex]; - if(curBlock.isComplete()) + if (curBlock.isComplete()) return curBlock; + // TODO: support BlockInfoStripedUC BlockInfoContiguousUnderConstruction ucBlock = - (BlockInfoContiguousUnderConstruction) curBlock; + (BlockInfoContiguousUnderConstruction)curBlock; int numNodes = ucBlock.numNodes(); if (!force && numNodes < minReplication) throw new IOException("Cannot complete block: " + @@ -678,13 +679,15 @@ public class BlockManager { return blocksMap.replaceBlock(completeBlock); } - private BlockInfoContiguous completeBlock(final BlockCollection bc, - final BlockInfoContiguous block, boolean force) throws IOException { + // TODO: support BlockInfoStrippedUC + private BlockInfo completeBlock(final BlockCollection bc, + final BlockInfo block, boolean force) throws IOException { BlockInfoContiguous[] fileBlocks = bc.getBlocks(); - for(int idx = 0; idx < fileBlocks.length; idx++) - if(fileBlocks[idx] == block) { + for (int idx = 0; idx < fileBlocks.length; idx++) { + if (fileBlocks[idx] == block) { return completeBlock(bc, idx, force); } + } return block; } @@ -693,7 +696,7 @@ public class BlockManager { * regardless of whether enough replicas are present. This is necessary * when tailing edit logs as a Standby. */ - public BlockInfoContiguous forceCompleteBlock(final BlockCollection bc, + public BlockInfo forceCompleteBlock(final BlockCollection bc, final BlockInfoContiguousUnderConstruction block) throws IOException { block.commitBlock(block); return completeBlock(bc, block, true); @@ -725,8 +728,8 @@ public class BlockManager { DatanodeStorageInfo[] targets = getStorages(oldBlock); - BlockInfoContiguousUnderConstruction ucBlock = - bc.setLastBlock(oldBlock, targets); + BlockInfoContiguousUnderConstruction ucBlock = bc.setLastBlock(oldBlock, + targets); blocksMap.replaceBlock(ucBlock); // Remove block from replication queue. @@ -1027,7 +1030,7 @@ public class BlockManager { if(numBlocks == 0) { return new BlocksWithLocations(new BlockWithLocations[0]); } - Iterator iter = node.getBlockIterator(); + Iterator iter = node.getBlockIterator(); // starting from a random block int startBlock = ThreadLocalRandom.current().nextInt(numBlocks); // skip blocks @@ -1036,7 +1039,7 @@ public class BlockManager { } List results = new ArrayList(); long totalSize = 0; - BlockInfoContiguous curBlock; + BlockInfo curBlock; while(totalSizedatanode) map, according to the difference // between the old and new block report. // - Collection toAdd = new LinkedList(); + Collection toAdd = new LinkedList<>(); Collection toRemove = new TreeSet(); Collection toInvalidate = new LinkedList(); Collection toCorrupt = new LinkedList(); @@ -2000,8 +2018,9 @@ public class BlockManager { removeStoredBlock(b, node); } int numBlocksLogged = 0; - for (BlockInfoContiguous b : toAdd) { - addStoredBlock(b, storageInfo, null, numBlocksLogged < maxNumBlocksToLog); + for (BlockInfoToAdd b : toAdd) { + addStoredBlock(b.stored, b.reported, storageInfo, null, + numBlocksLogged < maxNumBlocksToLog); numBlocksLogged++; } if (numBlocksLogged > maxNumBlocksToLog) { @@ -2091,7 +2110,7 @@ public class BlockManager { continue; } - BlockInfoContiguous storedBlock = getStoredBlock(iblk); + BlockInfo storedBlock = getStoredBlock(iblk); // If block does not belong to any file, we are done. if (storedBlock == null) continue; @@ -2114,7 +2133,7 @@ public class BlockManager { // If block is under construction, add this replica to its list if (isBlockUnderConstruction(storedBlock, ucState, reportedState)) { - ((BlockInfoContiguousUnderConstruction)storedBlock) + ((BlockInfoContiguousUnderConstruction) storedBlock) .addReplicaIfNotPresent(storageInfo, iblk, reportedState); // OpenFileBlocks only inside snapshots also will be added to safemode // threshold. So we need to update such blocks to safemode @@ -2129,14 +2148,14 @@ public class BlockManager { } //add replica if appropriate if (reportedState == ReplicaState.FINALIZED) { - addStoredBlockImmediate(storedBlock, storageInfo); + addStoredBlockImmediate(storedBlock, iblk, storageInfo); } } } private void reportDiff(DatanodeStorageInfo storageInfo, BlockListAsLongs newReport, - Collection toAdd, // add to DatanodeDescriptor + Collection toAdd, // add to DatanodeDescriptor Collection toRemove, // remove from DatanodeDescriptor Collection toInvalidate, // should be removed from DN Collection toCorrupt, // add to corrupt replicas list @@ -2144,8 +2163,10 @@ public class BlockManager { // place a delimiter in the list which separates blocks // that have been reported from those that have not - BlockInfoContiguous delimiter = new BlockInfoContiguous(new Block(), (short) 1); - AddBlockResult result = storageInfo.addBlock(delimiter); + Block delimiterBlock = new Block(); + BlockInfoContiguous delimiter = new BlockInfoContiguous(delimiterBlock, + (short) 1); + AddBlockResult result = storageInfo.addBlock(delimiter, delimiterBlock); assert result == AddBlockResult.ADDED : "Delimiting block cannot be present in the node"; int headIndex = 0; //currently the delimiter is in the head of the list @@ -2157,7 +2178,7 @@ public class BlockManager { // scan the report and process newly reported blocks for (BlockReportReplica iblk : newReport) { ReplicaState iState = iblk.getState(); - BlockInfoContiguous storedBlock = processReportedBlock(storageInfo, + BlockInfo storedBlock = processReportedBlock(storageInfo, iblk, iState, toAdd, toInvalidate, toCorrupt, toUC); // move block to the head of the list @@ -2169,8 +2190,7 @@ public class BlockManager { // collect blocks that have not been reported // all of them are next to the delimiter - Iterator it = - storageInfo.new BlockIterator(delimiter.getNext(0)); + Iterator it = storageInfo.new BlockIterator(delimiter.getNext(0)); while(it.hasNext()) toRemove.add(it.next()); storageInfo.removeBlock(delimiter); @@ -2207,10 +2227,10 @@ public class BlockManager { * @return the up-to-date stored block, if it should be kept. * Otherwise, null. */ - private BlockInfoContiguous processReportedBlock( + private BlockInfo processReportedBlock( final DatanodeStorageInfo storageInfo, final Block block, final ReplicaState reportedState, - final Collection toAdd, + final Collection toAdd, final Collection toInvalidate, final Collection toCorrupt, final Collection toUC) { @@ -2231,7 +2251,7 @@ public class BlockManager { } // find block by blockId - BlockInfoContiguous storedBlock = getStoredBlock(block); + BlockInfo storedBlock = getStoredBlock(block); if(storedBlock == null) { // If blocksMap does not contain reported block id, // the replica should be removed from the data-node. @@ -2285,7 +2305,7 @@ public class BlockManager { if (reportedState == ReplicaState.FINALIZED && (storedBlock.findStorageInfo(storageInfo) == -1 || corruptReplicas.isReplicaCorrupt(storedBlock, dn))) { - toAdd.add(storedBlock); + toAdd.add(new BlockInfoToAdd(storedBlock, block)); } return storedBlock; } @@ -2370,7 +2390,7 @@ public class BlockManager { */ private BlockToMarkCorrupt checkReplicaCorrupt( Block reported, ReplicaState reportedState, - BlockInfoContiguous storedBlock, BlockUCState ucState, + BlockInfo storedBlock, BlockUCState ucState, DatanodeDescriptor dn) { switch(reportedState) { case FINALIZED: @@ -2379,12 +2399,12 @@ public class BlockManager { case COMMITTED: if (storedBlock.getGenerationStamp() != reported.getGenerationStamp()) { final long reportedGS = reported.getGenerationStamp(); - return new BlockToMarkCorrupt(storedBlock, reportedGS, + return new BlockToMarkCorrupt(reported, storedBlock, reportedGS, "block is " + ucState + " and reported genstamp " + reportedGS + " does not match genstamp in block map " + storedBlock.getGenerationStamp(), Reason.GENSTAMP_MISMATCH); } else if (storedBlock.getNumBytes() != reported.getNumBytes()) { - return new BlockToMarkCorrupt(storedBlock, + return new BlockToMarkCorrupt(reported, storedBlock, "block is " + ucState + " and reported length " + reported.getNumBytes() + " does not match " + "length in block map " + storedBlock.getNumBytes(), @@ -2395,8 +2415,8 @@ public class BlockManager { case UNDER_CONSTRUCTION: if (storedBlock.getGenerationStamp() > reported.getGenerationStamp()) { final long reportedGS = reported.getGenerationStamp(); - return new BlockToMarkCorrupt(storedBlock, reportedGS, "block is " - + ucState + " and reported state " + reportedState + return new BlockToMarkCorrupt(reported, storedBlock, reportedGS, + "block is " + ucState + " and reported state " + reportedState + ", But reported genstamp " + reportedGS + " does not match genstamp in block map " + storedBlock.getGenerationStamp(), Reason.GENSTAMP_MISMATCH); @@ -2411,7 +2431,7 @@ public class BlockManager { return null; // not corrupt } else if (storedBlock.getGenerationStamp() != reported.getGenerationStamp()) { final long reportedGS = reported.getGenerationStamp(); - return new BlockToMarkCorrupt(storedBlock, reportedGS, + return new BlockToMarkCorrupt(reported, storedBlock, reportedGS, "reported " + reportedState + " replica with genstamp " + reportedGS + " does not match COMPLETE block's genstamp in block map " + storedBlock.getGenerationStamp(), Reason.GENSTAMP_MISMATCH); @@ -2426,7 +2446,7 @@ public class BlockManager { "complete with the same genstamp"); return null; } else { - return new BlockToMarkCorrupt(storedBlock, + return new BlockToMarkCorrupt(reported, storedBlock, "reported replica has invalid state " + reportedState, Reason.INVALID_STATE); } @@ -2439,11 +2459,12 @@ public class BlockManager { " on " + dn + " size " + storedBlock.getNumBytes(); // log here at WARN level since this is really a broken HDFS invariant LOG.warn(msg); - return new BlockToMarkCorrupt(storedBlock, msg, Reason.INVALID_STATE); + return new BlockToMarkCorrupt(reported, storedBlock, msg, + Reason.INVALID_STATE); } } - private boolean isBlockUnderConstruction(BlockInfoContiguous storedBlock, + private boolean isBlockUnderConstruction(BlockInfo storedBlock, BlockUCState ucState, ReplicaState reportedState) { switch(reportedState) { case FINALIZED: @@ -2472,7 +2493,7 @@ public class BlockManager { if (ucBlock.reportedState == ReplicaState.FINALIZED && (block.findStorageInfo(storageInfo) < 0)) { - addStoredBlock(block, storageInfo, null, true); + addStoredBlock(block, ucBlock.reportedBlock, storageInfo, null, true); } } @@ -2487,18 +2508,18 @@ public class BlockManager { * * @throws IOException */ - private void addStoredBlockImmediate(BlockInfoContiguous storedBlock, + private void addStoredBlockImmediate(BlockInfo storedBlock, Block reported, DatanodeStorageInfo storageInfo) throws IOException { assert (storedBlock != null && namesystem.hasWriteLock()); if (!namesystem.isInStartupSafeMode() || namesystem.isPopulatingReplQueues()) { - addStoredBlock(storedBlock, storageInfo, null, false); + addStoredBlock(storedBlock, reported, storageInfo, null, false); return; } // just add it - AddBlockResult result = storageInfo.addBlock(storedBlock); + AddBlockResult result = storageInfo.addBlock(storedBlock, reported); // Now check for completion of blocks and safe block count int numCurrentReplica = countLiveNodes(storedBlock); @@ -2519,13 +2540,14 @@ public class BlockManager { * needed replications if this takes care of the problem. * @return the block that is stored in blockMap. */ - private Block addStoredBlock(final BlockInfoContiguous block, + private Block addStoredBlock(final BlockInfo block, + final Block reportedBlock, DatanodeStorageInfo storageInfo, DatanodeDescriptor delNodeHint, boolean logEveryBlock) throws IOException { assert block != null && namesystem.hasWriteLock(); - BlockInfoContiguous storedBlock; + BlockInfo storedBlock; DatanodeDescriptor node = storageInfo.getDatanodeDescriptor(); if (block instanceof BlockInfoContiguousUnderConstruction) { //refresh our copy in case the block got completed in another thread @@ -2546,7 +2568,7 @@ public class BlockManager { assert bc != null : "Block must belong to a file"; // add block to the datanode - AddBlockResult result = storageInfo.addBlock(storedBlock); + AddBlockResult result = storageInfo.addBlock(storedBlock, reportedBlock); int curReplicaDelta; if (result == AddBlockResult.ADDED) { @@ -2618,13 +2640,13 @@ public class BlockManager { storedBlock + "blockMap has " + numCorruptNodes + " but corrupt replicas map has " + corruptReplicasCount); } - if ((corruptReplicasCount > 0) && (numLiveReplicas >= fileReplication)) - invalidateCorruptReplicas(storedBlock); + if ((corruptReplicasCount > 0) && (numLiveReplicas >= fileReplication)) { + invalidateCorruptReplicas(storedBlock, reportedBlock); + } return storedBlock; } - private void logAddStoredBlock(BlockInfoContiguous storedBlock, - DatanodeDescriptor node) { + private void logAddStoredBlock(BlockInfo storedBlock, DatanodeDescriptor node) { if (!blockLog.isInfoEnabled()) { return; } @@ -2651,7 +2673,7 @@ public class BlockManager { * * @param blk Block whose corrupt replicas need to be invalidated */ - private void invalidateCorruptReplicas(BlockInfoContiguous blk) { + private void invalidateCorruptReplicas(BlockInfo blk, Block reported) { Collection nodes = corruptReplicas.getNodes(blk); boolean removedFromBlocksMap = true; if (nodes == null) @@ -2661,7 +2683,7 @@ public class BlockManager { DatanodeDescriptor[] nodesCopy = nodes.toArray(new DatanodeDescriptor[0]); for (DatanodeDescriptor node : nodesCopy) { try { - if (!invalidateBlock(new BlockToMarkCorrupt(blk, null, + if (!invalidateBlock(new BlockToMarkCorrupt(reported, blk, null, Reason.ANY), node)) { removedFromBlocksMap = false; } @@ -2730,7 +2752,7 @@ public class BlockManager { long nrInvalid = 0, nrOverReplicated = 0; long nrUnderReplicated = 0, nrPostponed = 0, nrUnderConstruction = 0; long startTimeMisReplicatedScan = Time.monotonicNow(); - Iterator blocksItr = blocksMap.getBlocks().iterator(); + Iterator blocksItr = blocksMap.getBlocks().iterator(); long totalBlocks = blocksMap.size(); replicationQueuesInitProgress = 0; long totalProcessed = 0; @@ -2742,7 +2764,7 @@ public class BlockManager { namesystem.writeLockInterruptibly(); try { while (processed < numBlocksPerIteration && blocksItr.hasNext()) { - BlockInfoContiguous block = blocksItr.next(); + BlockInfo block = blocksItr.next(); MisReplicationResult res = processMisReplicatedBlock(block); if (LOG.isTraceEnabled()) { LOG.trace("block " + block + ": " + res); @@ -2817,7 +2839,7 @@ public class BlockManager { * appropriate queues if necessary, and returns a result code indicating * what happened with it. */ - private MisReplicationResult processMisReplicatedBlock(BlockInfoContiguous block) { + private MisReplicationResult processMisReplicatedBlock(BlockInfo block) { if (block.isDeleted()) { // block does not belong to any file addToInvalidates(block); @@ -3157,14 +3179,14 @@ public class BlockManager { ReplicaState reportedState, DatanodeDescriptor delHintNode) throws IOException { // blockReceived reports a finalized block - Collection toAdd = new LinkedList(); + Collection toAdd = new LinkedList<>(); Collection toInvalidate = new LinkedList(); Collection toCorrupt = new LinkedList(); Collection toUC = new LinkedList(); final DatanodeDescriptor node = storageInfo.getDatanodeDescriptor(); - processReportedBlock(storageInfo, block, reportedState, - toAdd, toInvalidate, toCorrupt, toUC); + processReportedBlock(storageInfo, block, reportedState, toAdd, toInvalidate, + toCorrupt, toUC); // the block is only in one of the to-do lists // if it is in none then data-node already has it assert toUC.size() + toAdd.size() + toInvalidate.size() + toCorrupt.size() <= 1 @@ -3174,8 +3196,9 @@ public class BlockManager { addStoredBlockUnderConstruction(b, storageInfo); } long numBlocksLogged = 0; - for (BlockInfoContiguous b : toAdd) { - addStoredBlock(b, storageInfo, delHintNode, numBlocksLogged < maxNumBlocksToLog); + for (BlockInfoToAdd b : toAdd) { + addStoredBlock(b.stored, b.reported, storageInfo, delHintNode, + numBlocksLogged < maxNumBlocksToLog); numBlocksLogged++; } if (numBlocksLogged > maxNumBlocksToLog) { @@ -3301,7 +3324,7 @@ public class BlockManager { * @param b - the block being tested * @return count of live nodes for this block */ - int countLiveNodes(BlockInfoContiguous b) { + int countLiveNodes(BlockInfo b) { if (!namesystem.isInStartupSafeMode()) { return countNodes(b).liveReplicas(); } @@ -3380,7 +3403,7 @@ public class BlockManager { return blocksMap.size(); } - public DatanodeStorageInfo[] getStorages(BlockInfoContiguous block) { + public DatanodeStorageInfo[] getStorages(BlockInfo block) { final DatanodeStorageInfo[] storages = new DatanodeStorageInfo[block.numNodes()]; int i = 0; for(DatanodeStorageInfo s : blocksMap.getStorages(block)) { @@ -3409,8 +3432,8 @@ public class BlockManager { } } - public BlockInfoContiguous getStoredBlock(Block block) { - BlockInfoContiguous info = null; + public BlockInfo getStoredBlock(Block block) { + BlockInfo info = null; if (BlockIdManager.isStripedBlockID(block.getBlockId())) { info = blocksMap.getStoredBlock( new Block(BlockIdManager.convertToGroupID(block.getBlockId()))); @@ -3588,7 +3611,8 @@ public class BlockManager { public BlockInfoContiguous addBlockCollection(BlockInfoContiguous block, BlockCollection bc) { - return blocksMap.addBlockCollection(block, bc); + // TODO + return (BlockInfoContiguous) blocksMap.addBlockCollection(block, bc); } public BlockCollection getBlockCollection(Block b) { @@ -3826,7 +3850,7 @@ public class BlockManager { /** * A simple result enum for the result of - * {@link BlockManager#processMisReplicatedBlock(BlockInfoContiguous)}. + * {@link BlockManager#processMisReplicatedBlock}. */ enum MisReplicationResult { /** The block should be invalidated since it belongs to a deleted file. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java index 5e7d34f0a0e..59ff030b89d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java @@ -20,12 +20,10 @@ package org.apache.hadoop.hdfs.server.blockmanagement; import java.util.Iterator; import org.apache.hadoop.hdfs.protocol.Block; -import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo.AddBlockResult; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.util.GSet; import org.apache.hadoop.util.LightWeightGSet; -import com.google.common.base.Preconditions; import com.google.common.base.Predicate; import com.google.common.collect.Iterables; @@ -36,10 +34,10 @@ import com.google.common.collect.Iterables; */ class BlocksMap { private static class StorageIterator implements Iterator { - private final BlockInfoContiguous blockInfo; + private final BlockInfo blockInfo; private int nextIdx = 0; - StorageIterator(BlockInfoContiguous blkInfo) { + StorageIterator(BlockInfo blkInfo) { this.blockInfo = blkInfo; } @@ -63,14 +61,14 @@ class BlocksMap { /** Constant {@link LightWeightGSet} capacity. */ private final int capacity; - private GSet blocks; + private GSet blocks; BlocksMap(int capacity) { // Use 2% of total memory to size the GSet capacity this.capacity = capacity; - this.blocks = new LightWeightGSet(capacity) { + this.blocks = new LightWeightGSet(capacity) { @Override - public Iterator iterator() { + public Iterator iterator() { SetIterator iterator = new SetIterator(); /* * Not tracking any modifications to set. As this set will be used @@ -97,15 +95,15 @@ class BlocksMap { } BlockCollection getBlockCollection(Block b) { - BlockInfoContiguous info = blocks.get(b); + BlockInfo info = blocks.get(b); return (info != null) ? info.getBlockCollection() : null; } /** * Add block b belonging to the specified block collection to the map. */ - BlockInfoContiguous addBlockCollection(BlockInfoContiguous b, BlockCollection bc) { - BlockInfoContiguous info = blocks.get(b); + BlockInfo addBlockCollection(BlockInfo b, BlockCollection bc) { + BlockInfo info = blocks.get(b); if (info != b) { info = b; blocks.put(info); @@ -120,11 +118,12 @@ class BlocksMap { * and remove all data-node locations associated with the block. */ void removeBlock(Block block) { - BlockInfoContiguous blockInfo = blocks.remove(block); + BlockInfo blockInfo = blocks.remove(block); if (blockInfo == null) return; blockInfo.setBlockCollection(null); + // TODO: fix this logic for block group for(int idx = blockInfo.numNodes()-1; idx >= 0; idx--) { DatanodeDescriptor dn = blockInfo.getDatanode(idx); dn.removeBlock(blockInfo); // remove from the list and wipe the location @@ -132,7 +131,7 @@ class BlocksMap { } /** Returns the block object it it exists in the map. */ - BlockInfoContiguous getStoredBlock(Block b) { + BlockInfo getStoredBlock(Block b) { return blocks.get(b); } @@ -164,7 +163,7 @@ class BlocksMap { * For a block that has already been retrieved from the BlocksMap * returns {@link Iterable} of the storages the block belongs to. */ - Iterable getStorages(final BlockInfoContiguous storedBlock) { + Iterable getStorages(final BlockInfo storedBlock) { return new Iterable() { @Override public Iterator iterator() { @@ -175,7 +174,7 @@ class BlocksMap { /** counts number of containing nodes. Better than using iterator. */ int numNodes(Block b) { - BlockInfoContiguous info = blocks.get(b); + BlockInfo info = blocks.get(b); return info == null ? 0 : info.numNodes(); } @@ -185,7 +184,7 @@ class BlocksMap { * only if it does not belong to any file and data-nodes. */ boolean removeNode(Block b, DatanodeDescriptor node) { - BlockInfoContiguous info = blocks.get(b); + BlockInfo info = blocks.get(b); if (info == null) return false; @@ -203,7 +202,7 @@ class BlocksMap { return blocks.size(); } - Iterable getBlocks() { + Iterable getBlocks() { return blocks; } @@ -218,20 +217,11 @@ class BlocksMap { * @param newBlock - block for replacement * @return new block */ - BlockInfoContiguous replaceBlock(BlockInfoContiguous newBlock) { - BlockInfoContiguous currentBlock = blocks.get(newBlock); + BlockInfo replaceBlock(BlockInfo newBlock) { + BlockInfo currentBlock = blocks.get(newBlock); assert currentBlock != null : "the block if not in blocksMap"; // replace block in data-node lists - for (int i = currentBlock.numNodes() - 1; i >= 0; i--) { - final DatanodeDescriptor dn = currentBlock.getDatanode(i); - final DatanodeStorageInfo storage = currentBlock.findStorageInfo(dn); - final boolean removed = storage.removeBlock(currentBlock); - Preconditions.checkState(removed, "currentBlock not found."); - - final AddBlockResult result = storage.addBlock(newBlock); - Preconditions.checkState(result == AddBlockResult.ADDED, - "newBlock already exists."); - } + currentBlock.replaceBlock(newBlock); // replace block in the map itself blocks.put(newBlock); return newBlock; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java index bf5ece9bc16..79d77137f29 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java @@ -513,8 +513,7 @@ public class CacheReplicationMonitor extends Thread implements Closeable { iter.remove(); } } - BlockInfoContiguous blockInfo = blockManager. - getStoredBlock(new Block(cblock.getBlockId())); + BlockInfoContiguous blockInfo = namesystem.getStoredBlock(new Block(cblock.getBlockId())); String reason = findReasonForNotCaching(cblock, blockInfo); int neededCached = 0; if (reason != null) { @@ -628,8 +627,7 @@ public class CacheReplicationMonitor extends Thread implements Closeable { List pendingCached) { // To figure out which replicas can be cached, we consult the // blocksMap. We don't want to try to cache a corrupt replica, though. - BlockInfoContiguous blockInfo = blockManager. - getStoredBlock(new Block(cachedBlock.getBlockId())); + BlockInfoContiguous blockInfo = namesystem.getStoredBlock(new Block(cachedBlock.getBlockId())); if (blockInfo == null) { LOG.debug("Block {}: can't add new cached replicas," + " because there is no record of this block " + @@ -668,7 +666,7 @@ public class CacheReplicationMonitor extends Thread implements Closeable { while (it.hasNext()) { CachedBlock cBlock = it.next(); BlockInfoContiguous info = - blockManager.getStoredBlock(new Block(cBlock.getBlockId())); + namesystem.getStoredBlock(new Block(cBlock.getBlockId())); if (info != null) { pendingBytes -= info.getNumBytes(); } @@ -678,7 +676,7 @@ public class CacheReplicationMonitor extends Thread implements Closeable { while (it.hasNext()) { CachedBlock cBlock = it.next(); BlockInfoContiguous info = - blockManager.getStoredBlock(new Block(cBlock.getBlockId())); + namesystem.getStoredBlock(new Block(cBlock.getBlockId())); if (info != null) { pendingBytes += info.getNumBytes(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java index 4731ad44c31..415646a5ec9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java @@ -335,7 +335,7 @@ public class DatanodeDescriptor extends DatanodeInfo { * Remove block from the list of blocks belonging to the data-node. Remove * data-node from the block. */ - boolean removeBlock(BlockInfoContiguous b) { + boolean removeBlock(BlockInfo b) { final DatanodeStorageInfo s = b.findStorageInfo(this); // if block exists on this datanode if (s != null) { @@ -348,12 +348,9 @@ public class DatanodeDescriptor extends DatanodeInfo { * Remove block from the list of blocks belonging to the data-node. Remove * data-node from the block. */ - boolean removeBlock(String storageID, BlockInfoContiguous b) { + boolean removeBlock(String storageID, BlockInfo b) { DatanodeStorageInfo s = getStorageInfo(storageID); - if (s != null) { - return s.removeBlock(b); - } - return false; + return s != null && s.removeBlock(b); } public void resetBlocks() { @@ -537,12 +534,12 @@ public class DatanodeDescriptor extends DatanodeInfo { } } - private static class BlockIterator implements Iterator { + private static class BlockIterator implements Iterator { private int index = 0; - private final List> iterators; + private final List> iterators; private BlockIterator(final DatanodeStorageInfo... storages) { - List> iterators = new ArrayList>(); + List> iterators = new ArrayList<>(); for (DatanodeStorageInfo e : storages) { iterators.add(e.getBlockIterator()); } @@ -556,7 +553,7 @@ public class DatanodeDescriptor extends DatanodeInfo { } @Override - public BlockInfoContiguous next() { + public BlockInfo next() { update(); return iterators.get(index).next(); } @@ -573,10 +570,11 @@ public class DatanodeDescriptor extends DatanodeInfo { } } - Iterator getBlockIterator() { + Iterator getBlockIterator() { return new BlockIterator(getStorageInfos()); } - Iterator getBlockIterator(final String storageID) { + + Iterator getBlockIterator(final String storageID) { return new BlockIterator(getStorageInfo(storageID)); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java index c6c9001d1d9..2c8b3eaba70 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java @@ -24,6 +24,7 @@ import java.util.List; import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfoWithStorage; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; @@ -83,10 +84,10 @@ public class DatanodeStorageInfo { /** * Iterates over the list of blocks belonging to the data-node. */ - class BlockIterator implements Iterator { - private BlockInfoContiguous current; + class BlockIterator implements Iterator { + private BlockInfo current; - BlockIterator(BlockInfoContiguous head) { + BlockIterator(BlockInfo head) { this.current = head; } @@ -94,8 +95,8 @@ public class DatanodeStorageInfo { return current != null; } - public BlockInfoContiguous next() { - BlockInfoContiguous res = current; + public BlockInfo next() { + BlockInfo res = current; current = current.getNext(current.findStorageInfo(DatanodeStorageInfo.this)); return res; } @@ -115,7 +116,7 @@ public class DatanodeStorageInfo { private volatile long remaining; private long blockPoolUsed; - private volatile BlockInfoContiguous blockList = null; + private volatile BlockInfo blockList = null; private int numBlocks = 0; // The ID of the last full block report which updated this storage. @@ -229,7 +230,7 @@ public class DatanodeStorageInfo { return blockPoolUsed; } - public AddBlockResult addBlock(BlockInfoContiguous b) { + public AddBlockResult addBlock(BlockInfo b, Block reportedBlock) { // First check whether the block belongs to a different storage // on the same DN. AddBlockResult result = AddBlockResult.ADDED; @@ -248,13 +249,21 @@ public class DatanodeStorageInfo { } // add to the head of the data-node list - b.addStorage(this); - blockList = b.listInsert(blockList, this); - numBlocks++; + b.addStorage(this, reportedBlock); + insertToList(b); return result; } - public boolean removeBlock(BlockInfoContiguous b) { + AddBlockResult addBlock(BlockInfoContiguous b) { + return addBlock(b, b); + } + + public void insertToList(BlockInfo b) { + blockList = b.listInsert(blockList, this); + numBlocks++; + } + + public boolean removeBlock(BlockInfo b) { blockList = b.listRemove(blockList, this); if (b.removeStorage(this)) { numBlocks--; @@ -268,16 +277,15 @@ public class DatanodeStorageInfo { return numBlocks; } - Iterator getBlockIterator() { + Iterator getBlockIterator() { return new BlockIterator(blockList); - } /** * Move block to the head of the list of blocks belonging to the data-node. * @return the index of the head of the blockList */ - int moveBlockToHead(BlockInfoContiguous b, int curIndex, int headIndex) { + int moveBlockToHead(BlockInfo b, int curIndex, int headIndex) { blockList = b.moveBlockToHead(blockList, this, curIndex, headIndex); return curIndex; } @@ -287,7 +295,7 @@ public class DatanodeStorageInfo { * @return the head of the blockList */ @VisibleForTesting - BlockInfoContiguous getBlockListHeadForTesting(){ + BlockInfo getBlockListHeadForTesting(){ return blockList; } @@ -374,6 +382,6 @@ public class DatanodeStorageInfo { } static enum AddBlockResult { - ADDED, REPLACED, ALREADY_EXIST; + ADDED, REPLACED, ALREADY_EXIST } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReplicaUnderConstruction.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReplicaUnderConstruction.java new file mode 100644 index 00000000000..f4600cb74fc --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReplicaUnderConstruction.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.blockmanagement; + +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; + +/** + * ReplicaUnderConstruction contains information about replicas (or blocks + * belonging to a block group) while they are under construction. + * + * The GS, the length and the state of the replica is as reported by the + * datanode. + * + * It is not guaranteed, but expected, that datanodes actually have + * corresponding replicas. + */ +class ReplicaUnderConstruction extends Block { + private final DatanodeStorageInfo expectedLocation; + private HdfsServerConstants.ReplicaState state; + private boolean chosenAsPrimary; + + ReplicaUnderConstruction(Block block, + DatanodeStorageInfo target, + HdfsServerConstants.ReplicaState state) { + super(block); + this.expectedLocation = target; + this.state = state; + this.chosenAsPrimary = false; + } + + /** + * Expected block replica location as assigned when the block was allocated. + * This defines the pipeline order. + * It is not guaranteed, but expected, that the data-node actually has + * the replica. + */ + DatanodeStorageInfo getExpectedStorageLocation() { + return expectedLocation; + } + + /** + * Get replica state as reported by the data-node. + */ + HdfsServerConstants.ReplicaState getState() { + return state; + } + + /** + * Whether the replica was chosen for recovery. + */ + boolean getChosenAsPrimary() { + return chosenAsPrimary; + } + + /** + * Set replica state. + */ + void setState(HdfsServerConstants.ReplicaState s) { + state = s; + } + + /** + * Set whether this replica was chosen for recovery. + */ + void setChosenAsPrimary(boolean chosenAsPrimary) { + this.chosenAsPrimary = chosenAsPrimary; + } + + /** + * Is data-node the replica belongs to alive. + */ + boolean isAlive() { + return expectedLocation.getDatanodeDescriptor().isAlive; + } + + @Override // Block + public int hashCode() { + return super.hashCode(); + } + + @Override // Block + public boolean equals(Object obj) { + // Sufficient to rely on super's implementation + return (this == obj) || super.equals(obj); + } + + @Override + public String toString() { + final StringBuilder b = new StringBuilder(50); + appendStringTo(b); + return b.toString(); + } + + @Override + public void appendStringTo(StringBuilder sb) { + sb.append("ReplicaUC[") + .append(expectedLocation) + .append("|") + .append(state) + .append("]"); + } +} + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 0d229e1af45..d0313dbec30 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -3072,7 +3072,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, src, new BlockInfoContiguous[] { b }); } } - + /** * Change the indicated filename. * @deprecated Use {@link #renameTo(String, String, boolean, @@ -3540,7 +3540,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, throw new AlreadyBeingCreatedException(message); case UNDER_CONSTRUCTION: case UNDER_RECOVERY: - final BlockInfoContiguousUnderConstruction uc = (BlockInfoContiguousUnderConstruction)lastBlock; + final BlockInfoContiguousUnderConstruction uc = + (BlockInfoContiguousUnderConstruction)lastBlock; // determine if last block was intended to be truncated Block recoveryBlock = uc.getTruncateBlock(); boolean truncateRecovery = recoveryBlock != null; @@ -3650,9 +3651,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, blockManager.checkReplication(pendingFile); } - @VisibleForTesting - BlockInfoContiguous getStoredBlock(Block block) { - return blockManager.getStoredBlock(block); + public BlockInfoContiguous getStoredBlock(Block block) { + return (BlockInfoContiguous) blockManager.getStoredBlock(block); } @Override @@ -3811,9 +3811,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, trimmedTargets.get(i).getStorageInfo(trimmedStorages.get(i)); if (storageInfo != null) { if(copyTruncate) { - storageInfo.addBlock(truncatedBlock); + storageInfo.addBlock(truncatedBlock, truncatedBlock); } else { - storageInfo.addBlock(storedBlock); + storageInfo.addBlock(storedBlock, storedBlock); } } } @@ -4163,9 +4163,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, while (it.hasNext()) { Block b = it.next(); - BlockInfoContiguous blockInfo = blockManager.getStoredBlock(b); - if (blockInfo.getBlockCollection().getStoragePolicyID() - == lpPolicy.getId()) { + BlockInfoContiguous blockInfo = getStoredBlock(b); + if (blockInfo.getBlockCollection().getStoragePolicyID() == lpPolicy.getId()) { filesToDelete.add(blockInfo.getBlockCollection()); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java index 0daf3673623..c535bd92587 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java @@ -243,7 +243,8 @@ public class NamenodeFsck implements DataEncryptionKeyFactory { //get blockInfo Block block = new Block(Block.getBlockId(blockId)); //find which file this block belongs to - BlockInfoContiguous blockInfo = bm.getStoredBlock(block); + BlockInfoContiguous blockInfo = namenode.getNamesystem() + .getStoredBlock(block); if(blockInfo == null) { out.println("Block "+ blockId +" " + NONEXISTENT_STATUS); LOG.warn("Block "+ blockId + " " + NONEXISTENT_STATUS); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java index c4cbbc178b1..87b370a9a7d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java @@ -239,10 +239,12 @@ public class FSImageFormatPBSnapshot { FileDiff diff = new FileDiff(pbf.getSnapshotId(), copy, null, pbf.getFileSize()); List bpl = pbf.getBlocksList(); + // TODO: also persist striped blocks BlockInfoContiguous[] blocks = new BlockInfoContiguous[bpl.size()]; for(int j = 0, e = bpl.size(); j < e; ++j) { Block blk = PBHelper.convert(bpl.get(j)); - BlockInfoContiguous storedBlock = fsn.getBlockManager().getStoredBlock(blk); + BlockInfoContiguous storedBlock = + (BlockInfoContiguous) fsn.getBlockManager().getStoredBlock(blk); if(storedBlock == null) { storedBlock = fsn.getBlockManager().addBlockCollection( new BlockInfoContiguous(blk, copy.getFileReplication()), file); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java index a88a4592337..207d1bba8a3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java @@ -1608,8 +1608,8 @@ public class DFSTestUtil { */ public static DatanodeDescriptor getExpectedPrimaryNode(NameNode nn, ExtendedBlock blk) { - BlockManager bm0 = nn.getNamesystem().getBlockManager(); - BlockInfoContiguous storedBlock = bm0.getStoredBlock(blk.getLocalBlock()); + FSNamesystem fsn = nn.getNamesystem(); + BlockInfoContiguous storedBlock = fsn.getStoredBlock(blk.getLocalBlock()); assertTrue("Block " + blk + " should be under construction, " + "got: " + storedBlock, storedBlock instanceof BlockInfoContiguousUnderConstruction); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java index c5662caabaf..0500d347be3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java @@ -63,7 +63,7 @@ public class TestBlockInfo { final DatanodeStorageInfo storage = DFSTestUtil.createDatanodeStorageInfo("storageID", "127.0.0.1"); - boolean added = blockInfo.addStorage(storage); + boolean added = blockInfo.addStorage(storage, blockInfo); Assert.assertTrue(added); Assert.assertEquals(storage, blockInfo.getStorageInfo(0)); @@ -129,7 +129,7 @@ public class TestBlockInfo { // list length should be equal to the number of blocks we inserted LOG.info("Checking list length..."); assertEquals("Length should be MAX_BLOCK", MAX_BLOCKS, dd.numBlocks()); - Iterator it = dd.getBlockIterator(); + Iterator it = dd.getBlockIterator(); int len = 0; while (it.hasNext()) { it.next(); @@ -151,7 +151,7 @@ public class TestBlockInfo { // move head of the list to the head - this should not change the list LOG.info("Moving head to the head..."); - BlockInfoContiguous temp = dd.getBlockListHeadForTesting(); + BlockInfo temp = dd.getBlockListHeadForTesting(); curIndex = 0; headIndex = 0; dd.moveBlockToHead(temp, curIndex, headIndex); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfoStriped.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfoStriped.java new file mode 100644 index 00000000000..74ddac08a2b --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfoStriped.java @@ -0,0 +1,219 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.blockmanagement; + +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo.AddBlockResult; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.internal.util.reflection.Whitebox; + +import static org.apache.hadoop.hdfs.protocol.HdfsConstants.NUM_DATA_BLOCKS; +import static org.apache.hadoop.hdfs.protocol.HdfsConstants.NUM_PARITY_BLOCKS; + +/** + * Test {@link BlockInfoStriped} + */ +public class TestBlockInfoStriped { + private static final int TOTAL_NUM_BLOCKS = NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS; + private static final long BASE_ID = -1600; + private static final Block baseBlock = new Block(BASE_ID); + private BlockInfoStriped info; + + @Before + public void setup() { + info = new BlockInfoStriped(baseBlock, NUM_DATA_BLOCKS, NUM_PARITY_BLOCKS); + } + + private Block[] createReportedBlocks(int num) { + Block[] blocks = new Block[num]; + for (int i = 0; i < num; i++) { + blocks[i] = new Block(BASE_ID + i); + } + return blocks; + } + + /** + * Test adding storage and reported block + */ + @Test + public void testAddStorage() { + // first add NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS storages, i.e., a complete + // group of blocks/storages + DatanodeStorageInfo[] storageInfos = DFSTestUtil.createDatanodeStorageInfos( + TOTAL_NUM_BLOCKS); + Block[] blocks = createReportedBlocks(TOTAL_NUM_BLOCKS); + int i = 0; + for (; i < storageInfos.length; i += 2) { + info.addStorage(storageInfos[i], blocks[i]); + Assert.assertEquals(i/2 + 1, info.numNodes()); + } + i /= 2; + for (int j = 1; j < storageInfos.length; j += 2) { + Assert.assertTrue(info.addStorage(storageInfos[j], blocks[j])); + Assert.assertEquals(i + (j+1)/2, info.numNodes()); + } + + // check + byte[] indices = (byte[]) Whitebox.getInternalState(info, "indices"); + Assert.assertEquals(TOTAL_NUM_BLOCKS, info.getCapacity()); + Assert.assertEquals(TOTAL_NUM_BLOCKS, indices.length); + i = 0; + for (DatanodeStorageInfo storage : storageInfos) { + int index = info.findStorageInfo(storage); + Assert.assertEquals(i++, index); + Assert.assertEquals(index, indices[index]); + } + + // the same block is reported from the same storage twice + i = 0; + for (DatanodeStorageInfo storage : storageInfos) { + Assert.assertTrue(info.addStorage(storage, blocks[i++])); + } + Assert.assertEquals(TOTAL_NUM_BLOCKS, info.getCapacity()); + Assert.assertEquals(TOTAL_NUM_BLOCKS, info.numNodes()); + Assert.assertEquals(TOTAL_NUM_BLOCKS, indices.length); + i = 0; + for (DatanodeStorageInfo storage : storageInfos) { + int index = info.findStorageInfo(storage); + Assert.assertEquals(i++, index); + Assert.assertEquals(index, indices[index]); + } + + // the same block is reported from another storage + DatanodeStorageInfo[] storageInfos2 = DFSTestUtil.createDatanodeStorageInfos( + TOTAL_NUM_BLOCKS * 2); + // only add the second half of info2 + for (i = TOTAL_NUM_BLOCKS; i < storageInfos2.length; i++) { + info.addStorage(storageInfos2[i], blocks[i % TOTAL_NUM_BLOCKS]); + Assert.assertEquals(i + 1, info.getCapacity()); + Assert.assertEquals(i + 1, info.numNodes()); + indices = (byte[]) Whitebox.getInternalState(info, "indices"); + Assert.assertEquals(i + 1, indices.length); + } + for (i = TOTAL_NUM_BLOCKS; i < storageInfos2.length; i++) { + int index = info.findStorageInfo(storageInfos2[i]); + Assert.assertEquals(i++, index); + Assert.assertEquals(index - TOTAL_NUM_BLOCKS, indices[index]); + } + } + + @Test + public void testRemoveStorage() { + // first add TOTAL_NUM_BLOCKS into the BlockInfoStriped + DatanodeStorageInfo[] storages = DFSTestUtil.createDatanodeStorageInfos( + TOTAL_NUM_BLOCKS); + Block[] blocks = createReportedBlocks(TOTAL_NUM_BLOCKS); + for (int i = 0; i < storages.length; i++) { + info.addStorage(storages[i], blocks[i]); + } + + // remove two storages + info.removeStorage(storages[0]); + info.removeStorage(storages[2]); + + // check + Assert.assertEquals(TOTAL_NUM_BLOCKS, info.getCapacity()); + Assert.assertEquals(TOTAL_NUM_BLOCKS - 2, info.numNodes()); + byte[] indices = (byte[]) Whitebox.getInternalState(info, "indices"); + for (int i = 0; i < storages.length; i++) { + int index = info.findStorageInfo(storages[i]); + if (i != 0 && i != 2) { + Assert.assertEquals(i, index); + Assert.assertEquals(index, indices[index]); + } else { + Assert.assertEquals(-1, index); + Assert.assertEquals(-1, indices[i]); + } + } + + // the same block is reported from another storage + DatanodeStorageInfo[] storages2 = DFSTestUtil.createDatanodeStorageInfos( + TOTAL_NUM_BLOCKS * 2); + for (int i = TOTAL_NUM_BLOCKS; i < storages2.length; i++) { + info.addStorage(storages2[i], blocks[i % TOTAL_NUM_BLOCKS]); + } + // now we should have 8 storages + Assert.assertEquals(TOTAL_NUM_BLOCKS * 2 - 2, info.numNodes()); + Assert.assertEquals(TOTAL_NUM_BLOCKS * 2 - 2, info.getCapacity()); + indices = (byte[]) Whitebox.getInternalState(info, "indices"); + Assert.assertEquals(TOTAL_NUM_BLOCKS * 2 - 2, indices.length); + int j = TOTAL_NUM_BLOCKS; + for (int i = TOTAL_NUM_BLOCKS; i < storages2.length; i++) { + int index = info.findStorageInfo(storages2[i]); + if (i == TOTAL_NUM_BLOCKS || i == TOTAL_NUM_BLOCKS + 2) { + Assert.assertEquals(i - TOTAL_NUM_BLOCKS, index); + } else { + Assert.assertEquals(j++, index); + } + } + + // remove the storages from storages2 + for (int i = 0; i < TOTAL_NUM_BLOCKS; i++) { + info.removeStorage(storages2[i + TOTAL_NUM_BLOCKS]); + } + // now we should have 3 storages + Assert.assertEquals(TOTAL_NUM_BLOCKS - 2, info.numNodes()); + Assert.assertEquals(TOTAL_NUM_BLOCKS * 2 - 2, info.getCapacity()); + indices = (byte[]) Whitebox.getInternalState(info, "indices"); + Assert.assertEquals(TOTAL_NUM_BLOCKS * 2 - 2, indices.length); + for (int i = 0; i < TOTAL_NUM_BLOCKS; i++) { + if (i == 0 || i == 2) { + int index = info.findStorageInfo(storages2[i + TOTAL_NUM_BLOCKS]); + Assert.assertEquals(-1, index); + } else { + int index = info.findStorageInfo(storages[i]); + Assert.assertEquals(i, index); + } + } + for (int i = TOTAL_NUM_BLOCKS; i < TOTAL_NUM_BLOCKS * 2 - 2; i++) { + Assert.assertEquals(-1, indices[i]); + Assert.assertNull(info.getDatanode(i)); + } + } + + @Test + public void testReplaceBlock() { + DatanodeStorageInfo[] storages = DFSTestUtil.createDatanodeStorageInfos( + TOTAL_NUM_BLOCKS); + Block[] blocks = createReportedBlocks(TOTAL_NUM_BLOCKS); + // add block/storage 0, 2, 4 into the BlockInfoStriped + for (int i = 0; i < storages.length; i += 2) { + Assert.assertEquals(AddBlockResult.ADDED, + storages[i].addBlock(info, blocks[i])); + } + + BlockInfoStriped newBlockInfo = new BlockInfoStriped(info); + info.replaceBlock(newBlockInfo); + + // make sure the newBlockInfo is correct + byte[] indices = (byte[]) Whitebox.getInternalState(newBlockInfo, "indices"); + for (int i = 0; i < storages.length; i += 2) { + int index = newBlockInfo.findStorageInfo(storages[i]); + Assert.assertEquals(i, index); + Assert.assertEquals(index, indices[i]); + + // make sure the newBlockInfo is added to the linked list of the storage + Assert.assertSame(newBlockInfo, storages[i].getBlockListHeadForTesting()); + Assert.assertEquals(1, storages[i].numBlocks()); + Assert.assertNull(newBlockInfo.getNext()); + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java index 58210c1ffb9..4612797ab9e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java @@ -383,7 +383,7 @@ public class TestBlockManager { for (int i = 1; i < pipeline.length; i++) { DatanodeStorageInfo storage = pipeline[i]; bm.addBlock(storage, blockInfo, null); - blockInfo.addStorage(storage); + blockInfo.addStorage(storage, blockInfo); } } @@ -393,7 +393,7 @@ public class TestBlockManager { for (DatanodeDescriptor dn : nodes) { for (DatanodeStorageInfo storage : dn.getStorageInfos()) { - blockInfo.addStorage(storage); + blockInfo.addStorage(storage, blockInfo); } } return blockInfo; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java index 3226578a5b1..2834aadbe0d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java @@ -1244,7 +1244,7 @@ public class TestReplicationPolicy { when(storage.removeBlock(any(BlockInfoContiguous.class))).thenReturn(true); when(storage.addBlock(any(BlockInfoContiguous.class))).thenReturn (DatanodeStorageInfo.AddBlockResult.ADDED); - ucBlock.addStorage(storage); + ucBlock.addStorage(storage, ucBlock); when(mbc.setLastBlock((BlockInfoContiguous) any(), (DatanodeStorageInfo[]) any())) .thenReturn(ucBlock);