HDFS-7716. Erasure Coding: extend BlockInfo to handle EC info. Contributed by Jing Zhao.

This commit is contained in:
Jing Zhao 2015-02-10 17:54:10 -08:00 committed by Zhe Zhang
parent b29f3bde4d
commit ba93714920
20 changed files with 1122 additions and 503 deletions

View File

@ -39,12 +39,12 @@ public interface BlockCollection {
public ContentSummary computeContentSummary(BlockStoragePolicySuite bsps);
/**
* @return the number of blocks
* @return the number of blocks or block groups
*/
public int numBlocks();
/**
* Get the blocks.
* Get the blocks or block groups.
*/
public BlockInfoContiguous[] getBlocks();
@ -55,8 +55,8 @@ public interface BlockCollection {
public long getPreferredBlockSize();
/**
* Get block replication for the collection
* @return block replication value
* Get block replication for the collection.
* @return block replication value. Return 0 if the file is erasure coded.
*/
public short getPreferredBlockReplication();
@ -71,7 +71,7 @@ public interface BlockCollection {
public String getName();
/**
* Set the block at the given index.
* Set the block/block-group at the given index.
*/
public void setBlock(int index, BlockInfoContiguous blk);
@ -79,7 +79,8 @@ public interface BlockCollection {
* Convert the last block of the collection to an under-construction block
* and set the locations.
*/
public BlockInfoContiguousUnderConstruction setLastBlock(BlockInfoContiguous lastBlock,
public BlockInfoContiguousUnderConstruction setLastBlock(
BlockInfoContiguous lastBlock,
DatanodeStorageInfo[] targets) throws IOException;
/**

View File

@ -218,6 +218,11 @@ public class BlockIdManager {
}
public static long convertToGroupID(long id) {
return id & (~(HdfsConstants.MAX_BLOCKS_IN_GROUP - 1));
return id & (~HdfsConstants.BLOCK_GROUP_INDEX_MASK);
}
public static int getBlockIndex(Block reportedBlock) {
return (int) (reportedBlock.getBlockId() &
HdfsConstants.BLOCK_GROUP_INDEX_MASK);
}
}

View File

@ -0,0 +1,343 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.util.LightWeightGSet;
import java.util.LinkedList;
/**
* For a given block (or an erasure coding block group), BlockInfo class
* maintains 1) the {@link BlockCollection} it is part of, and 2) datanodes
* where the replicas of the block, or blocks belonging to the erasure coding
* block group, are stored.
*/
public abstract class BlockInfo extends Block
implements LightWeightGSet.LinkedElement {
private BlockCollection bc;
/** For implementing {@link LightWeightGSet.LinkedElement} interface */
private LightWeightGSet.LinkedElement nextLinkedElement;
/**
* This array contains triplets of references. For each i-th storage, the
* block belongs to triplets[3*i] is the reference to the
* {@link DatanodeStorageInfo} and triplets[3*i+1] and triplets[3*i+2] are
* references to the previous and the next blocks, respectively, in the list
* of blocks belonging to this storage.
*
* Using previous and next in Object triplets is done instead of a
* {@link LinkedList} list to efficiently use memory. With LinkedList the cost
* per replica is 42 bytes (LinkedList#Entry object per replica) versus 16
* bytes using the triplets.
*/
protected Object[] triplets;
/**
* Construct an entry for blocksmap
* @param size the block's replication factor, or the total number of blocks
* in the block group
*/
public BlockInfo(short size) {
this.triplets = new Object[3 * size];
this.bc = null;
}
public BlockInfo(Block blk, short size) {
super(blk);
this.triplets = new Object[3 * size];
this.bc = null;
}
public BlockCollection getBlockCollection() {
return bc;
}
public void setBlockCollection(BlockCollection bc) {
this.bc = bc;
}
public DatanodeDescriptor getDatanode(int index) {
DatanodeStorageInfo storage = getStorageInfo(index);
return storage == null ? null : storage.getDatanodeDescriptor();
}
DatanodeStorageInfo getStorageInfo(int index) {
assert this.triplets != null : "BlockInfo is not initialized";
assert index >= 0 && index*3 < triplets.length : "Index is out of bound";
return (DatanodeStorageInfo)triplets[index*3];
}
BlockInfo getPrevious(int index) {
assert this.triplets != null : "BlockInfo is not initialized";
assert index >= 0 && index*3+1 < triplets.length : "Index is out of bound";
return (BlockInfo) triplets[index*3+1];
}
BlockInfo getNext(int index) {
assert this.triplets != null : "BlockInfo is not initialized";
assert index >= 0 && index*3+2 < triplets.length : "Index is out of bound";
return (BlockInfo) triplets[index*3+2];
}
void setStorageInfo(int index, DatanodeStorageInfo storage) {
assert this.triplets != null : "BlockInfo is not initialized";
assert index >= 0 && index*3 < triplets.length : "Index is out of bound";
triplets[index*3] = storage;
}
/**
* Return the previous block on the block list for the datanode at
* position index. Set the previous block on the list to "to".
*
* @param index - the datanode index
* @param to - block to be set to previous on the list of blocks
* @return current previous block on the list of blocks
*/
BlockInfo setPrevious(int index, BlockInfo to) {
assert this.triplets != null : "BlockInfo is not initialized";
assert index >= 0 && index*3+1 < triplets.length : "Index is out of bound";
BlockInfo info = (BlockInfo) triplets[index*3+1];
triplets[index*3+1] = to;
return info;
}
/**
* Return the next block on the block list for the datanode at
* position index. Set the next block on the list to "to".
*
* @param index - the datanode index
* @param to - block to be set to next on the list of blocks
* @return current next block on the list of blocks
*/
BlockInfo setNext(int index, BlockInfo to) {
assert this.triplets != null : "BlockInfo is not initialized";
assert index >= 0 && index*3+2 < triplets.length : "Index is out of bound";
BlockInfo info = (BlockInfo) triplets[index*3+2];
triplets[index*3+2] = to;
return info;
}
public int getCapacity() {
assert this.triplets != null : "BlockInfo is not initialized";
assert triplets.length % 3 == 0 : "Malformed BlockInfo";
return triplets.length / 3;
}
/**
* Count the number of data-nodes the block currently belongs to (i.e., NN
* has received block reports from the DN).
*/
public abstract int numNodes();
/**
* Add a {@link DatanodeStorageInfo} location for a block
* @param storage The storage to add
* @param reportedBlock The block reported from the datanode. This is only
* used by erasure coded blocks, this block's id contains
* information indicating the index of the block in the
* corresponding block group.
*/
abstract boolean addStorage(DatanodeStorageInfo storage, Block reportedBlock);
/**
* Remove {@link DatanodeStorageInfo} location for a block
*/
abstract boolean removeStorage(DatanodeStorageInfo storage);
/**
* Replace the current BlockInfo with the new one in corresponding
* DatanodeStorageInfo's linked list
*/
abstract void replaceBlock(BlockInfo newBlock);
/**
* Find specified DatanodeDescriptor.
* @return index or -1 if not found.
*/
boolean findDatanode(DatanodeDescriptor dn) {
int len = getCapacity();
for (int idx = 0; idx < len; idx++) {
DatanodeDescriptor cur = getDatanode(idx);
if(cur == dn) {
return true;
}
}
return false;
}
/**
* Find specified DatanodeStorageInfo.
* @return DatanodeStorageInfo or null if not found.
*/
DatanodeStorageInfo findStorageInfo(DatanodeDescriptor dn) {
int len = getCapacity();
for(int idx = 0; idx < len; idx++) {
DatanodeStorageInfo cur = getStorageInfo(idx);
if(cur != null && cur.getDatanodeDescriptor() == dn) {
return cur;
}
}
return null;
}
/**
* Find specified DatanodeStorageInfo.
* @return index or -1 if not found.
*/
int findStorageInfo(DatanodeStorageInfo storageInfo) {
int len = getCapacity();
for(int idx = 0; idx < len; idx++) {
DatanodeStorageInfo cur = getStorageInfo(idx);
if (cur == storageInfo) {
return idx;
}
}
return -1;
}
/**
* Insert this block into the head of the list of blocks
* related to the specified DatanodeStorageInfo.
* If the head is null then form a new list.
* @return current block as the new head of the list.
*/
BlockInfo listInsert(BlockInfo head, DatanodeStorageInfo storage) {
int dnIndex = this.findStorageInfo(storage);
assert dnIndex >= 0 : "Data node is not found: current";
assert getPrevious(dnIndex) == null && getNext(dnIndex) == null :
"Block is already in the list and cannot be inserted.";
this.setPrevious(dnIndex, null);
this.setNext(dnIndex, head);
if (head != null) {
head.setPrevious(head.findStorageInfo(storage), this);
}
return this;
}
/**
* Remove this block from the list of blocks
* related to the specified DatanodeStorageInfo.
* If this block is the head of the list then return the next block as
* the new head.
* @return the new head of the list or null if the list becomes
* empy after deletion.
*/
BlockInfo listRemove(BlockInfo head, DatanodeStorageInfo storage) {
if (head == null) {
return null;
}
int dnIndex = this.findStorageInfo(storage);
if (dnIndex < 0) { // this block is not on the data-node list
return head;
}
BlockInfo next = this.getNext(dnIndex);
BlockInfo prev = this.getPrevious(dnIndex);
this.setNext(dnIndex, null);
this.setPrevious(dnIndex, null);
if (prev != null) {
prev.setNext(prev.findStorageInfo(storage), next);
}
if (next != null) {
next.setPrevious(next.findStorageInfo(storage), prev);
}
if (this == head) { // removing the head
head = next;
}
return head;
}
/**
* Remove this block from the list of blocks related to the specified
* DatanodeDescriptor. Insert it into the head of the list of blocks.
*
* @return the new head of the list.
*/
public BlockInfo moveBlockToHead(BlockInfo head, DatanodeStorageInfo storage,
int curIndex, int headIndex) {
if (head == this) {
return this;
}
BlockInfo next = this.setNext(curIndex, head);
BlockInfo prev = this.setPrevious(curIndex, null);
head.setPrevious(headIndex, this);
prev.setNext(prev.findStorageInfo(storage), next);
if (next != null) {
next.setPrevious(next.findStorageInfo(storage), prev);
}
return this;
}
/**
* BlockInfo represents a block that is not being constructed.
* In order to start modifying the block, the BlockInfo should be converted
* to {@link BlockInfoContiguousUnderConstruction}.
* @return {@link HdfsServerConstants.BlockUCState#COMPLETE}
*/
public HdfsServerConstants.BlockUCState getBlockUCState() {
return HdfsServerConstants.BlockUCState.COMPLETE;
}
/**
* Is this block complete?
*
* @return true if the state of the block is
* {@link HdfsServerConstants.BlockUCState#COMPLETE}
*/
public boolean isComplete() {
return getBlockUCState().equals(HdfsServerConstants.BlockUCState.COMPLETE);
}
public boolean isDeleted() {
return (bc == null);
}
@Override
public int hashCode() {
// Super implementation is sufficient
return super.hashCode();
}
@Override
public boolean equals(Object obj) {
// Sufficient to rely on super's implementation
return (this == obj) || super.equals(obj);
}
@Override
public LightWeightGSet.LinkedElement getNext() {
return nextLinkedElement;
}
@Override
public void setNext(LightWeightGSet.LinkedElement next) {
this.nextLinkedElement = next;
}
static BlockInfo copyOf(BlockInfo b) {
if (b instanceof BlockInfoContiguous) {
return new BlockInfoContiguous((BlockInfoContiguous) b);
} else {
return new BlockInfoStriped((BlockInfoStriped) b);
}
}
}

View File

@ -17,66 +17,34 @@
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import java.util.LinkedList;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
import org.apache.hadoop.util.LightWeightGSet;
/**
* BlockInfo class maintains for a given block
* the {@link BlockCollection} it is part of and datanodes where the replicas of
* the block are stored.
* Subclass of {@link BlockInfo}, used for a block with replication scheme.
*/
@InterfaceAudience.Private
public class BlockInfoContiguous extends Block
implements LightWeightGSet.LinkedElement {
public class BlockInfoContiguous extends BlockInfo {
public static final BlockInfoContiguous[] EMPTY_ARRAY = {};
private BlockCollection bc;
/** For implementing {@link LightWeightGSet.LinkedElement} interface */
private LightWeightGSet.LinkedElement nextLinkedElement;
/**
* This array contains triplets of references. For each i-th storage, the
* block belongs to triplets[3*i] is the reference to the
* {@link DatanodeStorageInfo} and triplets[3*i+1] and triplets[3*i+2] are
* references to the previous and the next blocks, respectively, in the list
* of blocks belonging to this storage.
*
* Using previous and next in Object triplets is done instead of a
* {@link LinkedList} list to efficiently use memory. With LinkedList the cost
* per replica is 42 bytes (LinkedList#Entry object per replica) versus 16
* bytes using the triplets.
*/
private Object[] triplets;
/**
* Construct an entry for blocksmap
* @param replication the block's replication factor
*/
public BlockInfoContiguous(short replication) {
this.triplets = new Object[3*replication];
this.bc = null;
public BlockInfoContiguous(short size) {
super(size);
}
public BlockInfoContiguous(Block blk, short replication) {
super(blk);
this.triplets = new Object[3*replication];
this.bc = null;
public BlockInfoContiguous(Block blk, short size) {
super(blk, size);
}
/**
* Copy construction.
* This is used to convert BlockInfoUnderConstruction
* @param from BlockInfo to copy from.
* This is used to convert BlockReplicationInfoUnderConstruction
* @param from BlockReplicationInfo to copy from.
*/
protected BlockInfoContiguous(BlockInfoContiguous from) {
super(from);
this(from, from.getBlockCollection().getBlockReplication());
this.triplets = new Object[from.triplets.length];
this.bc = from.bc;
this.setBlockCollection(from.getBlockCollection());
}
public BlockCollection getBlockCollection() {
@ -173,9 +141,10 @@ public class BlockInfoContiguous extends Block
private int ensureCapacity(int num) {
assert this.triplets != null : "BlockInfo is not initialized";
int last = numNodes();
if(triplets.length >= (last+num)*3)
if (triplets.length >= (last+num)*3) {
return last;
/* Not enough space left. Create a new array. Should normally
}
/* Not enough space left. Create a new array. Should normally
* happen only when replication is manually increased by the user. */
Object[] old = triplets;
triplets = new Object[(last+num)*3];
@ -183,23 +152,8 @@ public class BlockInfoContiguous extends Block
return last;
}
/**
* Count the number of data-nodes the block belongs to.
*/
public int numNodes() {
assert this.triplets != null : "BlockInfo is not initialized";
assert triplets.length % 3 == 0 : "Malformed BlockInfo";
for(int idx = getCapacity()-1; idx >= 0; idx--) {
if(getDatanode(idx) != null)
return idx+1;
}
return 0;
}
/**
* Add a {@link DatanodeStorageInfo} location for a block
*/
boolean addStorage(DatanodeStorageInfo storage) {
@Override
boolean addStorage(DatanodeStorageInfo storage, Block reportedBlock) {
// find the last null node
int lastNode = ensureCapacity(1);
setStorageInfo(lastNode, storage);
@ -208,149 +162,53 @@ public class BlockInfoContiguous extends Block
return true;
}
/**
* Remove {@link DatanodeStorageInfo} location for a block
*/
@Override
boolean removeStorage(DatanodeStorageInfo storage) {
int dnIndex = findStorageInfo(storage);
if(dnIndex < 0) // the node is not found
if (dnIndex < 0) { // the node is not found
return false;
assert getPrevious(dnIndex) == null && getNext(dnIndex) == null :
"Block is still in the list and must be removed first.";
}
assert getPrevious(dnIndex) == null && getNext(dnIndex) == null :
"Block is still in the list and must be removed first.";
// find the last not null node
int lastNode = numNodes()-1;
// replace current node triplet by the lastNode one
int lastNode = numNodes()-1;
// replace current node triplet by the lastNode one
setStorageInfo(dnIndex, getStorageInfo(lastNode));
setNext(dnIndex, getNext(lastNode));
setPrevious(dnIndex, getPrevious(lastNode));
setNext(dnIndex, getNext(lastNode));
setPrevious(dnIndex, getPrevious(lastNode));
// set the last triplet to null
setStorageInfo(lastNode, null);
setNext(lastNode, null);
setPrevious(lastNode, null);
setNext(lastNode, null);
setPrevious(lastNode, null);
return true;
}
/**
* Find specified DatanodeStorageInfo.
* @return DatanodeStorageInfo or null if not found.
*/
DatanodeStorageInfo findStorageInfo(DatanodeDescriptor dn) {
int len = getCapacity();
for(int idx = 0; idx < len; idx++) {
DatanodeStorageInfo cur = getStorageInfo(idx);
if(cur == null)
break;
if(cur.getDatanodeDescriptor() == dn)
return cur;
}
return null;
}
/**
* Find specified DatanodeStorageInfo.
* @return index or -1 if not found.
*/
int findStorageInfo(DatanodeStorageInfo storageInfo) {
int len = getCapacity();
for(int idx = 0; idx < len; idx++) {
DatanodeStorageInfo cur = getStorageInfo(idx);
if (cur == storageInfo) {
return idx;
}
if (cur == null) {
break;
@Override
public int numNodes() {
assert this.triplets != null : "BlockInfo is not initialized";
assert triplets.length % 3 == 0 : "Malformed BlockInfo";
for (int idx = getCapacity()-1; idx >= 0; idx--) {
if (getDatanode(idx) != null) {
return idx + 1;
}
}
return -1;
return 0;
}
/**
* Insert this block into the head of the list of blocks
* related to the specified DatanodeStorageInfo.
* If the head is null then form a new list.
* @return current block as the new head of the list.
*/
BlockInfoContiguous listInsert(BlockInfoContiguous head,
DatanodeStorageInfo storage) {
int dnIndex = this.findStorageInfo(storage);
assert dnIndex >= 0 : "Data node is not found: current";
assert getPrevious(dnIndex) == null && getNext(dnIndex) == null :
"Block is already in the list and cannot be inserted.";
this.setPrevious(dnIndex, null);
this.setNext(dnIndex, head);
if(head != null)
head.setPrevious(head.findStorageInfo(storage), this);
return this;
}
@Override
void replaceBlock(BlockInfo newBlock) {
assert newBlock instanceof BlockInfoContiguous;
for (int i = this.numNodes() - 1; i >= 0; i--) {
final DatanodeStorageInfo storage = this.getStorageInfo(i);
final boolean removed = storage.removeBlock(this);
assert removed : "currentBlock not found.";
/**
* Remove this block from the list of blocks
* related to the specified DatanodeStorageInfo.
* If this block is the head of the list then return the next block as
* the new head.
* @return the new head of the list or null if the list becomes
* empy after deletion.
*/
BlockInfoContiguous listRemove(BlockInfoContiguous head,
DatanodeStorageInfo storage) {
if(head == null)
return null;
int dnIndex = this.findStorageInfo(storage);
if(dnIndex < 0) // this block is not on the data-node list
return head;
BlockInfoContiguous next = this.getNext(dnIndex);
BlockInfoContiguous prev = this.getPrevious(dnIndex);
this.setNext(dnIndex, null);
this.setPrevious(dnIndex, null);
if(prev != null)
prev.setNext(prev.findStorageInfo(storage), next);
if(next != null)
next.setPrevious(next.findStorageInfo(storage), prev);
if(this == head) // removing the head
head = next;
return head;
}
/**
* Remove this block from the list of blocks related to the specified
* DatanodeDescriptor. Insert it into the head of the list of blocks.
*
* @return the new head of the list.
*/
public BlockInfoContiguous moveBlockToHead(BlockInfoContiguous head,
DatanodeStorageInfo storage, int curIndex, int headIndex) {
if (head == this) {
return this;
final DatanodeStorageInfo.AddBlockResult result = storage.addBlock(
newBlock, newBlock);
assert result == DatanodeStorageInfo.AddBlockResult.ADDED :
"newBlock already exists.";
}
BlockInfoContiguous next = this.setNext(curIndex, head);
BlockInfoContiguous prev = this.setPrevious(curIndex, null);
head.setPrevious(headIndex, this);
prev.setNext(prev.findStorageInfo(storage), next);
if (next != null) {
next.setPrevious(next.findStorageInfo(storage), prev);
}
return this;
}
/**
* BlockInfo represents a block that is not being constructed.
* In order to start modifying the block, the BlockInfo should be converted
* to {@link BlockInfoContiguousUnderConstruction}.
* @return {@link BlockUCState#COMPLETE}
*/
public BlockUCState getBlockUCState() {
return BlockUCState.COMPLETE;
}
/**
* Is this block complete?
*
* @return true if the state of the block is {@link BlockUCState#COMPLETE}
*/
public boolean isComplete() {
return getBlockUCState().equals(BlockUCState.COMPLETE);
}
/**
@ -368,32 +226,10 @@ public class BlockInfoContiguous extends Block
}
// the block is already under construction
BlockInfoContiguousUnderConstruction ucBlock =
(BlockInfoContiguousUnderConstruction)this;
(BlockInfoContiguousUnderConstruction) this;
ucBlock.setBlockUCState(s);
ucBlock.setExpectedLocations(targets);
ucBlock.setBlockCollection(getBlockCollection());
return ucBlock;
}
@Override
public int hashCode() {
// Super implementation is sufficient
return super.hashCode();
}
@Override
public boolean equals(Object obj) {
// Sufficient to rely on super's implementation
return (this == obj) || super.equals(obj);
}
@Override
public LightWeightGSet.LinkedElement getNext() {
return nextLinkedElement;
}
@Override
public void setNext(LightWeightGSet.LinkedElement next) {
this.nextLinkedElement = next;
}
}

View File

@ -59,101 +59,6 @@ public class BlockInfoContiguousUnderConstruction extends BlockInfoContiguous {
*/
private Block truncateBlock;
/**
* ReplicaUnderConstruction contains information about replicas while
* they are under construction.
* The GS, the length and the state of the replica is as reported by
* the data-node.
* It is not guaranteed, but expected, that data-nodes actually have
* corresponding replicas.
*/
static class ReplicaUnderConstruction extends Block {
private final DatanodeStorageInfo expectedLocation;
private ReplicaState state;
private boolean chosenAsPrimary;
ReplicaUnderConstruction(Block block,
DatanodeStorageInfo target,
ReplicaState state) {
super(block);
this.expectedLocation = target;
this.state = state;
this.chosenAsPrimary = false;
}
/**
* Expected block replica location as assigned when the block was allocated.
* This defines the pipeline order.
* It is not guaranteed, but expected, that the data-node actually has
* the replica.
*/
private DatanodeStorageInfo getExpectedStorageLocation() {
return expectedLocation;
}
/**
* Get replica state as reported by the data-node.
*/
ReplicaState getState() {
return state;
}
/**
* Whether the replica was chosen for recovery.
*/
boolean getChosenAsPrimary() {
return chosenAsPrimary;
}
/**
* Set replica state.
*/
void setState(ReplicaState s) {
state = s;
}
/**
* Set whether this replica was chosen for recovery.
*/
void setChosenAsPrimary(boolean chosenAsPrimary) {
this.chosenAsPrimary = chosenAsPrimary;
}
/**
* Is data-node the replica belongs to alive.
*/
boolean isAlive() {
return expectedLocation.getDatanodeDescriptor().isAlive;
}
@Override // Block
public int hashCode() {
return super.hashCode();
}
@Override // Block
public boolean equals(Object obj) {
// Sufficient to rely on super's implementation
return (this == obj) || super.equals(obj);
}
@Override
public String toString() {
final StringBuilder b = new StringBuilder(50);
appendStringTo(b);
return b.toString();
}
@Override
public void appendStringTo(StringBuilder sb) {
sb.append("ReplicaUC[")
.append(expectedLocation)
.append("|")
.append(state)
.append("]");
}
}
/**
* Create block and set its state to
* {@link BlockUCState#UNDER_CONSTRUCTION}.
@ -165,7 +70,8 @@ public class BlockInfoContiguousUnderConstruction extends BlockInfoContiguous {
/**
* Create a block that is currently being constructed.
*/
public BlockInfoContiguousUnderConstruction(Block blk, short replication, BlockUCState state, DatanodeStorageInfo[] targets) {
public BlockInfoContiguousUnderConstruction(Block blk, short replication,
BlockUCState state, DatanodeStorageInfo[] targets) {
super(blk, replication);
assert getBlockUCState() != BlockUCState.COMPLETE :
"BlockInfoUnderConstruction cannot be in COMPLETE state";
@ -191,10 +97,11 @@ public class BlockInfoContiguousUnderConstruction extends BlockInfoContiguous {
/** Set expected locations */
public void setExpectedLocations(DatanodeStorageInfo[] targets) {
int numLocations = targets == null ? 0 : targets.length;
this.replicas = new ArrayList<ReplicaUnderConstruction>(numLocations);
for(int i = 0; i < numLocations; i++)
replicas.add(
new ReplicaUnderConstruction(this, targets[i], ReplicaState.RBW));
this.replicas = new ArrayList<>(numLocations);
for(int i = 0; i < numLocations; i++) {
replicas.add(new ReplicaUnderConstruction(this, targets[i],
ReplicaState.RBW));
}
}
/**
@ -204,8 +111,9 @@ public class BlockInfoContiguousUnderConstruction extends BlockInfoContiguous {
public DatanodeStorageInfo[] getExpectedStorageLocations() {
int numLocations = replicas == null ? 0 : replicas.size();
DatanodeStorageInfo[] storages = new DatanodeStorageInfo[numLocations];
for(int i = 0; i < numLocations; i++)
for (int i = 0; i < numLocations; i++) {
storages[i] = replicas.get(i).getExpectedStorageLocation();
}
return storages;
}
@ -293,17 +201,17 @@ public class BlockInfoContiguousUnderConstruction extends BlockInfoContiguous {
+ " No blocks found, lease removed.");
}
boolean allLiveReplicasTriedAsPrimary = true;
for (int i = 0; i < replicas.size(); i++) {
for (ReplicaUnderConstruction replica : replicas) {
// Check if all replicas have been tried or not.
if (replicas.get(i).isAlive()) {
allLiveReplicasTriedAsPrimary =
(allLiveReplicasTriedAsPrimary && replicas.get(i).getChosenAsPrimary());
if (replica.isAlive()) {
allLiveReplicasTriedAsPrimary = (allLiveReplicasTriedAsPrimary &&
replica.getChosenAsPrimary());
}
}
if (allLiveReplicasTriedAsPrimary) {
// Just set all the replicas to be chosen whether they are alive or not.
for (int i = 0; i < replicas.size(); i++) {
replicas.get(i).setChosenAsPrimary(false);
for (ReplicaUnderConstruction replica : replicas) {
replica.setChosenAsPrimary(false);
}
}
long mostRecentLastUpdate = 0;
@ -324,7 +232,8 @@ public class BlockInfoContiguousUnderConstruction extends BlockInfoContiguous {
}
}
if (primary != null) {
primary.getExpectedStorageLocation().getDatanodeDescriptor().addBlockToBeRecovered(this);
primary.getExpectedStorageLocation().getDatanodeDescriptor()
.addBlockToBeRecovered(this);
primary.setChosenAsPrimary(true);
NameNode.blockStateChangeLog.info(
"BLOCK* {} recovery started, primary={}", this, primary);
@ -357,18 +266,6 @@ public class BlockInfoContiguousUnderConstruction extends BlockInfoContiguous {
replicas.add(new ReplicaUnderConstruction(block, storage, rState));
}
@Override // BlockInfo
// BlockInfoUnderConstruction participates in maps the same way as BlockInfo
public int hashCode() {
return super.hashCode();
}
@Override // BlockInfo
public boolean equals(Object obj) {
// Sufficient to rely on super's implementation
return (this == obj) || super.equals(obj);
}
@Override
public String toString() {
final StringBuilder b = new StringBuilder(100);

View File

@ -0,0 +1,179 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import org.apache.hadoop.hdfs.protocol.Block;
/**
* Subclass of {@link BlockInfo}, presenting a block group in erasure coding.
*
* We still use triplets to store DatanodeStorageInfo for each block in the
* block group, as well as the previous/next block in the corresponding
* DatanodeStorageInfo. For a (m+k) block group, the first (m+k) triplet units
* are sorted and strictly mapped to the corresponding block.
*
* Normally each block belonging to group is stored in only one DataNode.
* However, it is possible that some block is over-replicated. Thus the triplet
* array's size can be larger than (m+k). Thus currently we use an extra byte
* array to record the block index for each triplet.
*/
public class BlockInfoStriped extends BlockInfo {
private final short dataBlockNum;
private final short parityBlockNum;
/**
* Always the same size with triplets. Record the block index for each triplet
* TODO: actually this is only necessary for over-replicated block. Thus can
* be further optimized to save memory usage.
*/
private byte[] indices;
public BlockInfoStriped(Block blk, short dataBlockNum, short parityBlockNum) {
super(blk, (short) (dataBlockNum + parityBlockNum));
indices = new byte[dataBlockNum + parityBlockNum];
initIndices();
this.dataBlockNum = dataBlockNum;
this.parityBlockNum = parityBlockNum;
}
BlockInfoStriped(BlockInfoStriped b) {
this(b, b.dataBlockNum, b.parityBlockNum);
this.setBlockCollection(b.getBlockCollection());
}
private short getTotalBlockNum() {
return (short) (dataBlockNum + parityBlockNum);
}
private void initIndices() {
for (int i = 0; i < indices.length; i++) {
indices[i] = -1;
}
}
private int findSlot() {
int i = getTotalBlockNum();
for (; i < getCapacity(); i++) {
if (getStorageInfo(i) == null) {
return i;
}
}
// need to expand the triplet size
ensureCapacity(i + 1, true);
return i;
}
@Override
boolean addStorage(DatanodeStorageInfo storage, Block reportedBlock) {
int blockIndex = BlockIdManager.getBlockIndex(reportedBlock);
int index = blockIndex;
DatanodeStorageInfo old = getStorageInfo(index);
if (old != null && !old.equals(storage)) { // over replicated
// check if the storage has been stored
int i = findStorageInfo(storage);
if (i == -1) {
index = findSlot();
} else {
return true;
}
}
addStorage(storage, index, blockIndex);
return true;
}
private void addStorage(DatanodeStorageInfo storage, int index,
int blockIndex) {
setStorageInfo(index, storage);
setNext(index, null);
setPrevious(index, null);
indices[index] = (byte) blockIndex;
}
private int findStorageInfoFromEnd(DatanodeStorageInfo storage) {
final int len = getCapacity();
for(int idx = len - 1; idx >= 0; idx--) {
DatanodeStorageInfo cur = getStorageInfo(idx);
if (storage.equals(cur)) {
return idx;
}
}
return -1;
}
@Override
boolean removeStorage(DatanodeStorageInfo storage) {
int dnIndex = findStorageInfoFromEnd(storage);
if (dnIndex < 0) { // the node is not found
return false;
}
assert getPrevious(dnIndex) == null && getNext(dnIndex) == null :
"Block is still in the list and must be removed first.";
// set the triplet to null
setStorageInfo(dnIndex, null);
setNext(dnIndex, null);
setPrevious(dnIndex, null);
indices[dnIndex] = -1;
return true;
}
private void ensureCapacity(int totalSize, boolean keepOld) {
if (getCapacity() < totalSize) {
Object[] old = triplets;
byte[] oldIndices = indices;
triplets = new Object[totalSize * 3];
indices = new byte[totalSize];
initIndices();
if (keepOld) {
System.arraycopy(old, 0, triplets, 0, old.length);
System.arraycopy(oldIndices, 0, indices, 0, oldIndices.length);
}
}
}
@Override
void replaceBlock(BlockInfo newBlock) {
assert newBlock instanceof BlockInfoStriped;
BlockInfoStriped newBlockGroup = (BlockInfoStriped) newBlock;
final int size = getCapacity();
newBlockGroup.ensureCapacity(size, false);
for (int i = 0; i < size; i++) {
final DatanodeStorageInfo storage = this.getStorageInfo(i);
if (storage != null) {
final int blockIndex = indices[i];
final boolean removed = storage.removeBlock(this);
assert removed : "currentBlock not found.";
newBlockGroup.addStorage(storage, i, blockIndex);
storage.insertToList(newBlockGroup);
}
}
}
@Override
public int numNodes() {
assert this.triplets != null : "BlockInfo is not initialized";
assert triplets.length % 3 == 0 : "Malformed BlockInfo";
int num = 0;
for (int idx = getCapacity()-1; idx >= 0; idx--) {
if (getStorageInfo(idx) != null) {
num++;
}
}
return num;
}
}

View File

@ -599,8 +599,8 @@ public class BlockManager {
* of replicas reported from data-nodes.
*/
private static boolean commitBlock(
final BlockInfoContiguousUnderConstruction block, final Block commitBlock)
throws IOException {
final BlockInfoContiguousUnderConstruction block,
final Block commitBlock) throws IOException {
if (block.getBlockUCState() == BlockUCState.COMMITTED)
return false;
assert block.getNumBytes() <= commitBlock.getNumBytes() :
@ -631,7 +631,7 @@ public class BlockManager {
return false; // already completed (e.g. by syncBlock)
final boolean b = commitBlock(
(BlockInfoContiguousUnderConstruction) lastBlock, commitBlock);
(BlockInfoContiguousUnderConstruction)lastBlock, commitBlock);
if(countNodes(lastBlock).liveReplicas() >= minReplication)
completeBlock(bc, bc.numBlocks()-1, false);
return b;
@ -644,15 +644,16 @@ public class BlockManager {
* @throws IOException if the block does not have at least a minimal number
* of replicas reported from data-nodes.
*/
private BlockInfoContiguous completeBlock(final BlockCollection bc,
private BlockInfo completeBlock(final BlockCollection bc,
final int blkIndex, boolean force) throws IOException {
if(blkIndex < 0)
return null;
BlockInfoContiguous curBlock = bc.getBlocks()[blkIndex];
if(curBlock.isComplete())
if (curBlock.isComplete())
return curBlock;
// TODO: support BlockInfoStripedUC
BlockInfoContiguousUnderConstruction ucBlock =
(BlockInfoContiguousUnderConstruction) curBlock;
(BlockInfoContiguousUnderConstruction)curBlock;
int numNodes = ucBlock.numNodes();
if (!force && numNodes < minReplication)
throw new IOException("Cannot complete block: " +
@ -678,13 +679,15 @@ public class BlockManager {
return blocksMap.replaceBlock(completeBlock);
}
private BlockInfoContiguous completeBlock(final BlockCollection bc,
final BlockInfoContiguous block, boolean force) throws IOException {
// TODO: support BlockInfoStrippedUC
private BlockInfo completeBlock(final BlockCollection bc,
final BlockInfo block, boolean force) throws IOException {
BlockInfoContiguous[] fileBlocks = bc.getBlocks();
for(int idx = 0; idx < fileBlocks.length; idx++)
if(fileBlocks[idx] == block) {
for (int idx = 0; idx < fileBlocks.length; idx++) {
if (fileBlocks[idx] == block) {
return completeBlock(bc, idx, force);
}
}
return block;
}
@ -693,7 +696,7 @@ public class BlockManager {
* regardless of whether enough replicas are present. This is necessary
* when tailing edit logs as a Standby.
*/
public BlockInfoContiguous forceCompleteBlock(final BlockCollection bc,
public BlockInfo forceCompleteBlock(final BlockCollection bc,
final BlockInfoContiguousUnderConstruction block) throws IOException {
block.commitBlock(block);
return completeBlock(bc, block, true);
@ -725,8 +728,8 @@ public class BlockManager {
DatanodeStorageInfo[] targets = getStorages(oldBlock);
BlockInfoContiguousUnderConstruction ucBlock =
bc.setLastBlock(oldBlock, targets);
BlockInfoContiguousUnderConstruction ucBlock = bc.setLastBlock(oldBlock,
targets);
blocksMap.replaceBlock(ucBlock);
// Remove block from replication queue.
@ -1027,7 +1030,7 @@ public class BlockManager {
if(numBlocks == 0) {
return new BlocksWithLocations(new BlockWithLocations[0]);
}
Iterator<BlockInfoContiguous> iter = node.getBlockIterator();
Iterator<BlockInfo> iter = node.getBlockIterator();
// starting from a random block
int startBlock = ThreadLocalRandom.current().nextInt(numBlocks);
// skip blocks
@ -1036,7 +1039,7 @@ public class BlockManager {
}
List<BlockWithLocations> results = new ArrayList<BlockWithLocations>();
long totalSize = 0;
BlockInfoContiguous curBlock;
BlockInfo curBlock;
while(totalSize<size && iter.hasNext()) {
curBlock = iter.next();
if(!curBlock.isComplete()) continue;
@ -1135,7 +1138,8 @@ public class BlockManager {
public void findAndMarkBlockAsCorrupt(final ExtendedBlock blk,
final DatanodeInfo dn, String storageID, String reason) throws IOException {
assert namesystem.hasWriteLock();
final BlockInfoContiguous storedBlock = getStoredBlock(blk.getLocalBlock());
final Block reportedBlock = blk.getLocalBlock();
final BlockInfo storedBlock = getStoredBlock(reportedBlock);
if (storedBlock == null) {
// Check if the replica is in the blockMap, if not
// ignore the request for now. This could happen when BlockScanner
@ -1152,7 +1156,7 @@ public class BlockManager {
+ ") does not exist");
}
markBlockAsCorrupt(new BlockToMarkCorrupt(storedBlock,
markBlockAsCorrupt(new BlockToMarkCorrupt(reportedBlock, storedBlock,
blk.getGenerationStamp(), reason, Reason.CORRUPTION_REPORTED),
storageID == null ? null : node.getStorageInfo(storageID),
node);
@ -1179,7 +1183,7 @@ public class BlockManager {
// Add replica to the data-node if it is not already there
if (storageInfo != null) {
storageInfo.addBlock(b.stored);
storageInfo.addBlock(b.stored, b.reportedBlock);
}
// Add this replica to corruptReplicas Map
@ -1731,41 +1735,55 @@ public class BlockManager {
this.reportedState = reportedState;
}
}
private static class BlockInfoToAdd {
final BlockInfo stored;
final Block reported;
BlockInfoToAdd(BlockInfo stored, Block reported) {
this.stored = stored;
this.reported = reported;
}
}
/**
* BlockToMarkCorrupt is used to build the "toCorrupt" list, which is a
* list of blocks that should be considered corrupt due to a block report.
*/
private static class BlockToMarkCorrupt {
/** The corrupted block in a datanode. */
final BlockInfoContiguous corrupted;
final BlockInfo corrupted;
/** The corresponding block stored in the BlockManager. */
final BlockInfoContiguous stored;
final BlockInfo stored;
/** The block reported from a datanode */
final Block reportedBlock;
/** The reason to mark corrupt. */
final String reason;
/** The reason code to be stored */
final Reason reasonCode;
BlockToMarkCorrupt(BlockInfoContiguous corrupted,
BlockInfoContiguous stored, String reason,
Reason reasonCode) {
BlockToMarkCorrupt(Block reported, BlockInfo corrupted,
BlockInfo stored, String reason, Reason reasonCode) {
Preconditions.checkNotNull(reported, "reported is null");
Preconditions.checkNotNull(corrupted, "corrupted is null");
Preconditions.checkNotNull(stored, "stored is null");
this.reportedBlock = reported;
this.corrupted = corrupted;
this.stored = stored;
this.reason = reason;
this.reasonCode = reasonCode;
}
BlockToMarkCorrupt(BlockInfoContiguous stored, String reason,
BlockToMarkCorrupt(Block reported, BlockInfo stored, String reason,
Reason reasonCode) {
this(stored, stored, reason, reasonCode);
this(reported, stored, stored, reason, reasonCode);
}
BlockToMarkCorrupt(BlockInfoContiguous stored, long gs, String reason,
Reason reasonCode) {
this(new BlockInfoContiguous(stored), stored, reason, reasonCode);
BlockToMarkCorrupt(Block reported, BlockInfo stored, long gs,
String reason, Reason reasonCode) {
this(reported, BlockInfo.copyOf(stored), stored, reason,
reasonCode);
//the corrupted block in datanode has a different generation stamp
corrupted.setGenerationStamp(gs);
}
@ -1943,7 +1961,7 @@ public class BlockManager {
break;
}
BlockInfoContiguous bi = getStoredBlock(b);
BlockInfo bi = getStoredBlock(b);
if (bi == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("BLOCK* rescanPostponedMisreplicatedBlocks: " +
@ -1983,7 +2001,7 @@ public class BlockManager {
// Modify the (block-->datanode) map, according to the difference
// between the old and new block report.
//
Collection<BlockInfoContiguous> toAdd = new LinkedList<BlockInfoContiguous>();
Collection<BlockInfoToAdd> toAdd = new LinkedList<>();
Collection<Block> toRemove = new TreeSet<Block>();
Collection<Block> toInvalidate = new LinkedList<Block>();
Collection<BlockToMarkCorrupt> toCorrupt = new LinkedList<BlockToMarkCorrupt>();
@ -2000,8 +2018,9 @@ public class BlockManager {
removeStoredBlock(b, node);
}
int numBlocksLogged = 0;
for (BlockInfoContiguous b : toAdd) {
addStoredBlock(b, storageInfo, null, numBlocksLogged < maxNumBlocksToLog);
for (BlockInfoToAdd b : toAdd) {
addStoredBlock(b.stored, b.reported, storageInfo, null,
numBlocksLogged < maxNumBlocksToLog);
numBlocksLogged++;
}
if (numBlocksLogged > maxNumBlocksToLog) {
@ -2091,7 +2110,7 @@ public class BlockManager {
continue;
}
BlockInfoContiguous storedBlock = getStoredBlock(iblk);
BlockInfo storedBlock = getStoredBlock(iblk);
// If block does not belong to any file, we are done.
if (storedBlock == null) continue;
@ -2114,7 +2133,7 @@ public class BlockManager {
// If block is under construction, add this replica to its list
if (isBlockUnderConstruction(storedBlock, ucState, reportedState)) {
((BlockInfoContiguousUnderConstruction)storedBlock)
((BlockInfoContiguousUnderConstruction) storedBlock)
.addReplicaIfNotPresent(storageInfo, iblk, reportedState);
// OpenFileBlocks only inside snapshots also will be added to safemode
// threshold. So we need to update such blocks to safemode
@ -2129,14 +2148,14 @@ public class BlockManager {
}
//add replica if appropriate
if (reportedState == ReplicaState.FINALIZED) {
addStoredBlockImmediate(storedBlock, storageInfo);
addStoredBlockImmediate(storedBlock, iblk, storageInfo);
}
}
}
private void reportDiff(DatanodeStorageInfo storageInfo,
BlockListAsLongs newReport,
Collection<BlockInfoContiguous> toAdd, // add to DatanodeDescriptor
Collection<BlockInfoToAdd> toAdd, // add to DatanodeDescriptor
Collection<Block> toRemove, // remove from DatanodeDescriptor
Collection<Block> toInvalidate, // should be removed from DN
Collection<BlockToMarkCorrupt> toCorrupt, // add to corrupt replicas list
@ -2144,8 +2163,10 @@ public class BlockManager {
// place a delimiter in the list which separates blocks
// that have been reported from those that have not
BlockInfoContiguous delimiter = new BlockInfoContiguous(new Block(), (short) 1);
AddBlockResult result = storageInfo.addBlock(delimiter);
Block delimiterBlock = new Block();
BlockInfoContiguous delimiter = new BlockInfoContiguous(delimiterBlock,
(short) 1);
AddBlockResult result = storageInfo.addBlock(delimiter, delimiterBlock);
assert result == AddBlockResult.ADDED
: "Delimiting block cannot be present in the node";
int headIndex = 0; //currently the delimiter is in the head of the list
@ -2157,7 +2178,7 @@ public class BlockManager {
// scan the report and process newly reported blocks
for (BlockReportReplica iblk : newReport) {
ReplicaState iState = iblk.getState();
BlockInfoContiguous storedBlock = processReportedBlock(storageInfo,
BlockInfo storedBlock = processReportedBlock(storageInfo,
iblk, iState, toAdd, toInvalidate, toCorrupt, toUC);
// move block to the head of the list
@ -2169,8 +2190,7 @@ public class BlockManager {
// collect blocks that have not been reported
// all of them are next to the delimiter
Iterator<BlockInfoContiguous> it =
storageInfo.new BlockIterator(delimiter.getNext(0));
Iterator<BlockInfo> it = storageInfo.new BlockIterator(delimiter.getNext(0));
while(it.hasNext())
toRemove.add(it.next());
storageInfo.removeBlock(delimiter);
@ -2207,10 +2227,10 @@ public class BlockManager {
* @return the up-to-date stored block, if it should be kept.
* Otherwise, null.
*/
private BlockInfoContiguous processReportedBlock(
private BlockInfo processReportedBlock(
final DatanodeStorageInfo storageInfo,
final Block block, final ReplicaState reportedState,
final Collection<BlockInfoContiguous> toAdd,
final Collection<BlockInfoToAdd> toAdd,
final Collection<Block> toInvalidate,
final Collection<BlockToMarkCorrupt> toCorrupt,
final Collection<StatefulBlockInfo> toUC) {
@ -2231,7 +2251,7 @@ public class BlockManager {
}
// find block by blockId
BlockInfoContiguous storedBlock = getStoredBlock(block);
BlockInfo storedBlock = getStoredBlock(block);
if(storedBlock == null) {
// If blocksMap does not contain reported block id,
// the replica should be removed from the data-node.
@ -2285,7 +2305,7 @@ public class BlockManager {
if (reportedState == ReplicaState.FINALIZED
&& (storedBlock.findStorageInfo(storageInfo) == -1 ||
corruptReplicas.isReplicaCorrupt(storedBlock, dn))) {
toAdd.add(storedBlock);
toAdd.add(new BlockInfoToAdd(storedBlock, block));
}
return storedBlock;
}
@ -2370,7 +2390,7 @@ public class BlockManager {
*/
private BlockToMarkCorrupt checkReplicaCorrupt(
Block reported, ReplicaState reportedState,
BlockInfoContiguous storedBlock, BlockUCState ucState,
BlockInfo storedBlock, BlockUCState ucState,
DatanodeDescriptor dn) {
switch(reportedState) {
case FINALIZED:
@ -2379,12 +2399,12 @@ public class BlockManager {
case COMMITTED:
if (storedBlock.getGenerationStamp() != reported.getGenerationStamp()) {
final long reportedGS = reported.getGenerationStamp();
return new BlockToMarkCorrupt(storedBlock, reportedGS,
return new BlockToMarkCorrupt(reported, storedBlock, reportedGS,
"block is " + ucState + " and reported genstamp " + reportedGS
+ " does not match genstamp in block map "
+ storedBlock.getGenerationStamp(), Reason.GENSTAMP_MISMATCH);
} else if (storedBlock.getNumBytes() != reported.getNumBytes()) {
return new BlockToMarkCorrupt(storedBlock,
return new BlockToMarkCorrupt(reported, storedBlock,
"block is " + ucState + " and reported length " +
reported.getNumBytes() + " does not match " +
"length in block map " + storedBlock.getNumBytes(),
@ -2395,8 +2415,8 @@ public class BlockManager {
case UNDER_CONSTRUCTION:
if (storedBlock.getGenerationStamp() > reported.getGenerationStamp()) {
final long reportedGS = reported.getGenerationStamp();
return new BlockToMarkCorrupt(storedBlock, reportedGS, "block is "
+ ucState + " and reported state " + reportedState
return new BlockToMarkCorrupt(reported, storedBlock, reportedGS,
"block is " + ucState + " and reported state " + reportedState
+ ", But reported genstamp " + reportedGS
+ " does not match genstamp in block map "
+ storedBlock.getGenerationStamp(), Reason.GENSTAMP_MISMATCH);
@ -2411,7 +2431,7 @@ public class BlockManager {
return null; // not corrupt
} else if (storedBlock.getGenerationStamp() != reported.getGenerationStamp()) {
final long reportedGS = reported.getGenerationStamp();
return new BlockToMarkCorrupt(storedBlock, reportedGS,
return new BlockToMarkCorrupt(reported, storedBlock, reportedGS,
"reported " + reportedState + " replica with genstamp " + reportedGS
+ " does not match COMPLETE block's genstamp in block map "
+ storedBlock.getGenerationStamp(), Reason.GENSTAMP_MISMATCH);
@ -2426,7 +2446,7 @@ public class BlockManager {
"complete with the same genstamp");
return null;
} else {
return new BlockToMarkCorrupt(storedBlock,
return new BlockToMarkCorrupt(reported, storedBlock,
"reported replica has invalid state " + reportedState,
Reason.INVALID_STATE);
}
@ -2439,11 +2459,12 @@ public class BlockManager {
" on " + dn + " size " + storedBlock.getNumBytes();
// log here at WARN level since this is really a broken HDFS invariant
LOG.warn(msg);
return new BlockToMarkCorrupt(storedBlock, msg, Reason.INVALID_STATE);
return new BlockToMarkCorrupt(reported, storedBlock, msg,
Reason.INVALID_STATE);
}
}
private boolean isBlockUnderConstruction(BlockInfoContiguous storedBlock,
private boolean isBlockUnderConstruction(BlockInfo storedBlock,
BlockUCState ucState, ReplicaState reportedState) {
switch(reportedState) {
case FINALIZED:
@ -2472,7 +2493,7 @@ public class BlockManager {
if (ucBlock.reportedState == ReplicaState.FINALIZED &&
(block.findStorageInfo(storageInfo) < 0)) {
addStoredBlock(block, storageInfo, null, true);
addStoredBlock(block, ucBlock.reportedBlock, storageInfo, null, true);
}
}
@ -2487,18 +2508,18 @@ public class BlockManager {
*
* @throws IOException
*/
private void addStoredBlockImmediate(BlockInfoContiguous storedBlock,
private void addStoredBlockImmediate(BlockInfo storedBlock, Block reported,
DatanodeStorageInfo storageInfo)
throws IOException {
assert (storedBlock != null && namesystem.hasWriteLock());
if (!namesystem.isInStartupSafeMode()
|| namesystem.isPopulatingReplQueues()) {
addStoredBlock(storedBlock, storageInfo, null, false);
addStoredBlock(storedBlock, reported, storageInfo, null, false);
return;
}
// just add it
AddBlockResult result = storageInfo.addBlock(storedBlock);
AddBlockResult result = storageInfo.addBlock(storedBlock, reported);
// Now check for completion of blocks and safe block count
int numCurrentReplica = countLiveNodes(storedBlock);
@ -2519,13 +2540,14 @@ public class BlockManager {
* needed replications if this takes care of the problem.
* @return the block that is stored in blockMap.
*/
private Block addStoredBlock(final BlockInfoContiguous block,
private Block addStoredBlock(final BlockInfo block,
final Block reportedBlock,
DatanodeStorageInfo storageInfo,
DatanodeDescriptor delNodeHint,
boolean logEveryBlock)
throws IOException {
assert block != null && namesystem.hasWriteLock();
BlockInfoContiguous storedBlock;
BlockInfo storedBlock;
DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
if (block instanceof BlockInfoContiguousUnderConstruction) {
//refresh our copy in case the block got completed in another thread
@ -2546,7 +2568,7 @@ public class BlockManager {
assert bc != null : "Block must belong to a file";
// add block to the datanode
AddBlockResult result = storageInfo.addBlock(storedBlock);
AddBlockResult result = storageInfo.addBlock(storedBlock, reportedBlock);
int curReplicaDelta;
if (result == AddBlockResult.ADDED) {
@ -2618,13 +2640,13 @@ public class BlockManager {
storedBlock + "blockMap has " + numCorruptNodes +
" but corrupt replicas map has " + corruptReplicasCount);
}
if ((corruptReplicasCount > 0) && (numLiveReplicas >= fileReplication))
invalidateCorruptReplicas(storedBlock);
if ((corruptReplicasCount > 0) && (numLiveReplicas >= fileReplication)) {
invalidateCorruptReplicas(storedBlock, reportedBlock);
}
return storedBlock;
}
private void logAddStoredBlock(BlockInfoContiguous storedBlock,
DatanodeDescriptor node) {
private void logAddStoredBlock(BlockInfo storedBlock, DatanodeDescriptor node) {
if (!blockLog.isInfoEnabled()) {
return;
}
@ -2651,7 +2673,7 @@ public class BlockManager {
*
* @param blk Block whose corrupt replicas need to be invalidated
*/
private void invalidateCorruptReplicas(BlockInfoContiguous blk) {
private void invalidateCorruptReplicas(BlockInfo blk, Block reported) {
Collection<DatanodeDescriptor> nodes = corruptReplicas.getNodes(blk);
boolean removedFromBlocksMap = true;
if (nodes == null)
@ -2661,7 +2683,7 @@ public class BlockManager {
DatanodeDescriptor[] nodesCopy = nodes.toArray(new DatanodeDescriptor[0]);
for (DatanodeDescriptor node : nodesCopy) {
try {
if (!invalidateBlock(new BlockToMarkCorrupt(blk, null,
if (!invalidateBlock(new BlockToMarkCorrupt(reported, blk, null,
Reason.ANY), node)) {
removedFromBlocksMap = false;
}
@ -2730,7 +2752,7 @@ public class BlockManager {
long nrInvalid = 0, nrOverReplicated = 0;
long nrUnderReplicated = 0, nrPostponed = 0, nrUnderConstruction = 0;
long startTimeMisReplicatedScan = Time.monotonicNow();
Iterator<BlockInfoContiguous> blocksItr = blocksMap.getBlocks().iterator();
Iterator<BlockInfo> blocksItr = blocksMap.getBlocks().iterator();
long totalBlocks = blocksMap.size();
replicationQueuesInitProgress = 0;
long totalProcessed = 0;
@ -2742,7 +2764,7 @@ public class BlockManager {
namesystem.writeLockInterruptibly();
try {
while (processed < numBlocksPerIteration && blocksItr.hasNext()) {
BlockInfoContiguous block = blocksItr.next();
BlockInfo block = blocksItr.next();
MisReplicationResult res = processMisReplicatedBlock(block);
if (LOG.isTraceEnabled()) {
LOG.trace("block " + block + ": " + res);
@ -2817,7 +2839,7 @@ public class BlockManager {
* appropriate queues if necessary, and returns a result code indicating
* what happened with it.
*/
private MisReplicationResult processMisReplicatedBlock(BlockInfoContiguous block) {
private MisReplicationResult processMisReplicatedBlock(BlockInfo block) {
if (block.isDeleted()) {
// block does not belong to any file
addToInvalidates(block);
@ -3157,14 +3179,14 @@ public class BlockManager {
ReplicaState reportedState, DatanodeDescriptor delHintNode)
throws IOException {
// blockReceived reports a finalized block
Collection<BlockInfoContiguous> toAdd = new LinkedList<BlockInfoContiguous>();
Collection<BlockInfoToAdd> toAdd = new LinkedList<>();
Collection<Block> toInvalidate = new LinkedList<Block>();
Collection<BlockToMarkCorrupt> toCorrupt = new LinkedList<BlockToMarkCorrupt>();
Collection<StatefulBlockInfo> toUC = new LinkedList<StatefulBlockInfo>();
final DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
processReportedBlock(storageInfo, block, reportedState,
toAdd, toInvalidate, toCorrupt, toUC);
processReportedBlock(storageInfo, block, reportedState, toAdd, toInvalidate,
toCorrupt, toUC);
// the block is only in one of the to-do lists
// if it is in none then data-node already has it
assert toUC.size() + toAdd.size() + toInvalidate.size() + toCorrupt.size() <= 1
@ -3174,8 +3196,9 @@ public class BlockManager {
addStoredBlockUnderConstruction(b, storageInfo);
}
long numBlocksLogged = 0;
for (BlockInfoContiguous b : toAdd) {
addStoredBlock(b, storageInfo, delHintNode, numBlocksLogged < maxNumBlocksToLog);
for (BlockInfoToAdd b : toAdd) {
addStoredBlock(b.stored, b.reported, storageInfo, delHintNode,
numBlocksLogged < maxNumBlocksToLog);
numBlocksLogged++;
}
if (numBlocksLogged > maxNumBlocksToLog) {
@ -3301,7 +3324,7 @@ public class BlockManager {
* @param b - the block being tested
* @return count of live nodes for this block
*/
int countLiveNodes(BlockInfoContiguous b) {
int countLiveNodes(BlockInfo b) {
if (!namesystem.isInStartupSafeMode()) {
return countNodes(b).liveReplicas();
}
@ -3380,7 +3403,7 @@ public class BlockManager {
return blocksMap.size();
}
public DatanodeStorageInfo[] getStorages(BlockInfoContiguous block) {
public DatanodeStorageInfo[] getStorages(BlockInfo block) {
final DatanodeStorageInfo[] storages = new DatanodeStorageInfo[block.numNodes()];
int i = 0;
for(DatanodeStorageInfo s : blocksMap.getStorages(block)) {
@ -3409,8 +3432,8 @@ public class BlockManager {
}
}
public BlockInfoContiguous getStoredBlock(Block block) {
BlockInfoContiguous info = null;
public BlockInfo getStoredBlock(Block block) {
BlockInfo info = null;
if (BlockIdManager.isStripedBlockID(block.getBlockId())) {
info = blocksMap.getStoredBlock(
new Block(BlockIdManager.convertToGroupID(block.getBlockId())));
@ -3588,7 +3611,8 @@ public class BlockManager {
public BlockInfoContiguous addBlockCollection(BlockInfoContiguous block,
BlockCollection bc) {
return blocksMap.addBlockCollection(block, bc);
// TODO
return (BlockInfoContiguous) blocksMap.addBlockCollection(block, bc);
}
public BlockCollection getBlockCollection(Block b) {
@ -3826,7 +3850,7 @@ public class BlockManager {
/**
* A simple result enum for the result of
* {@link BlockManager#processMisReplicatedBlock(BlockInfoContiguous)}.
* {@link BlockManager#processMisReplicatedBlock}.
*/
enum MisReplicationResult {
/** The block should be invalidated since it belongs to a deleted file. */

View File

@ -20,12 +20,10 @@ package org.apache.hadoop.hdfs.server.blockmanagement;
import java.util.Iterator;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo.AddBlockResult;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.util.GSet;
import org.apache.hadoop.util.LightWeightGSet;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
@ -36,10 +34,10 @@ import com.google.common.collect.Iterables;
*/
class BlocksMap {
private static class StorageIterator implements Iterator<DatanodeStorageInfo> {
private final BlockInfoContiguous blockInfo;
private final BlockInfo blockInfo;
private int nextIdx = 0;
StorageIterator(BlockInfoContiguous blkInfo) {
StorageIterator(BlockInfo blkInfo) {
this.blockInfo = blkInfo;
}
@ -63,14 +61,14 @@ class BlocksMap {
/** Constant {@link LightWeightGSet} capacity. */
private final int capacity;
private GSet<Block, BlockInfoContiguous> blocks;
private GSet<Block, BlockInfo> blocks;
BlocksMap(int capacity) {
// Use 2% of total memory to size the GSet capacity
this.capacity = capacity;
this.blocks = new LightWeightGSet<Block, BlockInfoContiguous>(capacity) {
this.blocks = new LightWeightGSet<Block, BlockInfo>(capacity) {
@Override
public Iterator<BlockInfoContiguous> iterator() {
public Iterator<BlockInfo> iterator() {
SetIterator iterator = new SetIterator();
/*
* Not tracking any modifications to set. As this set will be used
@ -97,15 +95,15 @@ class BlocksMap {
}
BlockCollection getBlockCollection(Block b) {
BlockInfoContiguous info = blocks.get(b);
BlockInfo info = blocks.get(b);
return (info != null) ? info.getBlockCollection() : null;
}
/**
* Add block b belonging to the specified block collection to the map.
*/
BlockInfoContiguous addBlockCollection(BlockInfoContiguous b, BlockCollection bc) {
BlockInfoContiguous info = blocks.get(b);
BlockInfo addBlockCollection(BlockInfo b, BlockCollection bc) {
BlockInfo info = blocks.get(b);
if (info != b) {
info = b;
blocks.put(info);
@ -120,11 +118,12 @@ class BlocksMap {
* and remove all data-node locations associated with the block.
*/
void removeBlock(Block block) {
BlockInfoContiguous blockInfo = blocks.remove(block);
BlockInfo blockInfo = blocks.remove(block);
if (blockInfo == null)
return;
blockInfo.setBlockCollection(null);
// TODO: fix this logic for block group
for(int idx = blockInfo.numNodes()-1; idx >= 0; idx--) {
DatanodeDescriptor dn = blockInfo.getDatanode(idx);
dn.removeBlock(blockInfo); // remove from the list and wipe the location
@ -132,7 +131,7 @@ class BlocksMap {
}
/** Returns the block object it it exists in the map. */
BlockInfoContiguous getStoredBlock(Block b) {
BlockInfo getStoredBlock(Block b) {
return blocks.get(b);
}
@ -164,7 +163,7 @@ class BlocksMap {
* For a block that has already been retrieved from the BlocksMap
* returns {@link Iterable} of the storages the block belongs to.
*/
Iterable<DatanodeStorageInfo> getStorages(final BlockInfoContiguous storedBlock) {
Iterable<DatanodeStorageInfo> getStorages(final BlockInfo storedBlock) {
return new Iterable<DatanodeStorageInfo>() {
@Override
public Iterator<DatanodeStorageInfo> iterator() {
@ -175,7 +174,7 @@ class BlocksMap {
/** counts number of containing nodes. Better than using iterator. */
int numNodes(Block b) {
BlockInfoContiguous info = blocks.get(b);
BlockInfo info = blocks.get(b);
return info == null ? 0 : info.numNodes();
}
@ -185,7 +184,7 @@ class BlocksMap {
* only if it does not belong to any file and data-nodes.
*/
boolean removeNode(Block b, DatanodeDescriptor node) {
BlockInfoContiguous info = blocks.get(b);
BlockInfo info = blocks.get(b);
if (info == null)
return false;
@ -203,7 +202,7 @@ class BlocksMap {
return blocks.size();
}
Iterable<BlockInfoContiguous> getBlocks() {
Iterable<BlockInfo> getBlocks() {
return blocks;
}
@ -218,20 +217,11 @@ class BlocksMap {
* @param newBlock - block for replacement
* @return new block
*/
BlockInfoContiguous replaceBlock(BlockInfoContiguous newBlock) {
BlockInfoContiguous currentBlock = blocks.get(newBlock);
BlockInfo replaceBlock(BlockInfo newBlock) {
BlockInfo currentBlock = blocks.get(newBlock);
assert currentBlock != null : "the block if not in blocksMap";
// replace block in data-node lists
for (int i = currentBlock.numNodes() - 1; i >= 0; i--) {
final DatanodeDescriptor dn = currentBlock.getDatanode(i);
final DatanodeStorageInfo storage = currentBlock.findStorageInfo(dn);
final boolean removed = storage.removeBlock(currentBlock);
Preconditions.checkState(removed, "currentBlock not found.");
final AddBlockResult result = storage.addBlock(newBlock);
Preconditions.checkState(result == AddBlockResult.ADDED,
"newBlock already exists.");
}
currentBlock.replaceBlock(newBlock);
// replace block in the map itself
blocks.put(newBlock);
return newBlock;

View File

@ -513,8 +513,7 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
iter.remove();
}
}
BlockInfoContiguous blockInfo = blockManager.
getStoredBlock(new Block(cblock.getBlockId()));
BlockInfoContiguous blockInfo = namesystem.getStoredBlock(new Block(cblock.getBlockId()));
String reason = findReasonForNotCaching(cblock, blockInfo);
int neededCached = 0;
if (reason != null) {
@ -628,8 +627,7 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
List<DatanodeDescriptor> pendingCached) {
// To figure out which replicas can be cached, we consult the
// blocksMap. We don't want to try to cache a corrupt replica, though.
BlockInfoContiguous blockInfo = blockManager.
getStoredBlock(new Block(cachedBlock.getBlockId()));
BlockInfoContiguous blockInfo = namesystem.getStoredBlock(new Block(cachedBlock.getBlockId()));
if (blockInfo == null) {
LOG.debug("Block {}: can't add new cached replicas," +
" because there is no record of this block " +
@ -668,7 +666,7 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
while (it.hasNext()) {
CachedBlock cBlock = it.next();
BlockInfoContiguous info =
blockManager.getStoredBlock(new Block(cBlock.getBlockId()));
namesystem.getStoredBlock(new Block(cBlock.getBlockId()));
if (info != null) {
pendingBytes -= info.getNumBytes();
}
@ -678,7 +676,7 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
while (it.hasNext()) {
CachedBlock cBlock = it.next();
BlockInfoContiguous info =
blockManager.getStoredBlock(new Block(cBlock.getBlockId()));
namesystem.getStoredBlock(new Block(cBlock.getBlockId()));
if (info != null) {
pendingBytes += info.getNumBytes();
}

View File

@ -335,7 +335,7 @@ public class DatanodeDescriptor extends DatanodeInfo {
* Remove block from the list of blocks belonging to the data-node. Remove
* data-node from the block.
*/
boolean removeBlock(BlockInfoContiguous b) {
boolean removeBlock(BlockInfo b) {
final DatanodeStorageInfo s = b.findStorageInfo(this);
// if block exists on this datanode
if (s != null) {
@ -348,12 +348,9 @@ public class DatanodeDescriptor extends DatanodeInfo {
* Remove block from the list of blocks belonging to the data-node. Remove
* data-node from the block.
*/
boolean removeBlock(String storageID, BlockInfoContiguous b) {
boolean removeBlock(String storageID, BlockInfo b) {
DatanodeStorageInfo s = getStorageInfo(storageID);
if (s != null) {
return s.removeBlock(b);
}
return false;
return s != null && s.removeBlock(b);
}
public void resetBlocks() {
@ -537,12 +534,12 @@ public class DatanodeDescriptor extends DatanodeInfo {
}
}
private static class BlockIterator implements Iterator<BlockInfoContiguous> {
private static class BlockIterator implements Iterator<BlockInfo> {
private int index = 0;
private final List<Iterator<BlockInfoContiguous>> iterators;
private final List<Iterator<BlockInfo>> iterators;
private BlockIterator(final DatanodeStorageInfo... storages) {
List<Iterator<BlockInfoContiguous>> iterators = new ArrayList<Iterator<BlockInfoContiguous>>();
List<Iterator<BlockInfo>> iterators = new ArrayList<>();
for (DatanodeStorageInfo e : storages) {
iterators.add(e.getBlockIterator());
}
@ -556,7 +553,7 @@ public class DatanodeDescriptor extends DatanodeInfo {
}
@Override
public BlockInfoContiguous next() {
public BlockInfo next() {
update();
return iterators.get(index).next();
}
@ -573,10 +570,11 @@ public class DatanodeDescriptor extends DatanodeInfo {
}
}
Iterator<BlockInfoContiguous> getBlockIterator() {
Iterator<BlockInfo> getBlockIterator() {
return new BlockIterator(getStorageInfos());
}
Iterator<BlockInfoContiguous> getBlockIterator(final String storageID) {
Iterator<BlockInfo> getBlockIterator(final String storageID) {
return new BlockIterator(getStorageInfo(storageID));
}

View File

@ -24,6 +24,7 @@ import java.util.List;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DatanodeInfoWithStorage;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@ -83,10 +84,10 @@ public class DatanodeStorageInfo {
/**
* Iterates over the list of blocks belonging to the data-node.
*/
class BlockIterator implements Iterator<BlockInfoContiguous> {
private BlockInfoContiguous current;
class BlockIterator implements Iterator<BlockInfo> {
private BlockInfo current;
BlockIterator(BlockInfoContiguous head) {
BlockIterator(BlockInfo head) {
this.current = head;
}
@ -94,8 +95,8 @@ public class DatanodeStorageInfo {
return current != null;
}
public BlockInfoContiguous next() {
BlockInfoContiguous res = current;
public BlockInfo next() {
BlockInfo res = current;
current = current.getNext(current.findStorageInfo(DatanodeStorageInfo.this));
return res;
}
@ -115,7 +116,7 @@ public class DatanodeStorageInfo {
private volatile long remaining;
private long blockPoolUsed;
private volatile BlockInfoContiguous blockList = null;
private volatile BlockInfo blockList = null;
private int numBlocks = 0;
// The ID of the last full block report which updated this storage.
@ -229,7 +230,7 @@ public class DatanodeStorageInfo {
return blockPoolUsed;
}
public AddBlockResult addBlock(BlockInfoContiguous b) {
public AddBlockResult addBlock(BlockInfo b, Block reportedBlock) {
// First check whether the block belongs to a different storage
// on the same DN.
AddBlockResult result = AddBlockResult.ADDED;
@ -248,13 +249,21 @@ public class DatanodeStorageInfo {
}
// add to the head of the data-node list
b.addStorage(this);
blockList = b.listInsert(blockList, this);
numBlocks++;
b.addStorage(this, reportedBlock);
insertToList(b);
return result;
}
public boolean removeBlock(BlockInfoContiguous b) {
AddBlockResult addBlock(BlockInfoContiguous b) {
return addBlock(b, b);
}
public void insertToList(BlockInfo b) {
blockList = b.listInsert(blockList, this);
numBlocks++;
}
public boolean removeBlock(BlockInfo b) {
blockList = b.listRemove(blockList, this);
if (b.removeStorage(this)) {
numBlocks--;
@ -268,16 +277,15 @@ public class DatanodeStorageInfo {
return numBlocks;
}
Iterator<BlockInfoContiguous> getBlockIterator() {
Iterator<BlockInfo> getBlockIterator() {
return new BlockIterator(blockList);
}
/**
* Move block to the head of the list of blocks belonging to the data-node.
* @return the index of the head of the blockList
*/
int moveBlockToHead(BlockInfoContiguous b, int curIndex, int headIndex) {
int moveBlockToHead(BlockInfo b, int curIndex, int headIndex) {
blockList = b.moveBlockToHead(blockList, this, curIndex, headIndex);
return curIndex;
}
@ -287,7 +295,7 @@ public class DatanodeStorageInfo {
* @return the head of the blockList
*/
@VisibleForTesting
BlockInfoContiguous getBlockListHeadForTesting(){
BlockInfo getBlockListHeadForTesting(){
return blockList;
}
@ -374,6 +382,6 @@ public class DatanodeStorageInfo {
}
static enum AddBlockResult {
ADDED, REPLACED, ALREADY_EXIST;
ADDED, REPLACED, ALREADY_EXIST
}
}

View File

@ -0,0 +1,119 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
/**
* ReplicaUnderConstruction contains information about replicas (or blocks
* belonging to a block group) while they are under construction.
*
* The GS, the length and the state of the replica is as reported by the
* datanode.
*
* It is not guaranteed, but expected, that datanodes actually have
* corresponding replicas.
*/
class ReplicaUnderConstruction extends Block {
private final DatanodeStorageInfo expectedLocation;
private HdfsServerConstants.ReplicaState state;
private boolean chosenAsPrimary;
ReplicaUnderConstruction(Block block,
DatanodeStorageInfo target,
HdfsServerConstants.ReplicaState state) {
super(block);
this.expectedLocation = target;
this.state = state;
this.chosenAsPrimary = false;
}
/**
* Expected block replica location as assigned when the block was allocated.
* This defines the pipeline order.
* It is not guaranteed, but expected, that the data-node actually has
* the replica.
*/
DatanodeStorageInfo getExpectedStorageLocation() {
return expectedLocation;
}
/**
* Get replica state as reported by the data-node.
*/
HdfsServerConstants.ReplicaState getState() {
return state;
}
/**
* Whether the replica was chosen for recovery.
*/
boolean getChosenAsPrimary() {
return chosenAsPrimary;
}
/**
* Set replica state.
*/
void setState(HdfsServerConstants.ReplicaState s) {
state = s;
}
/**
* Set whether this replica was chosen for recovery.
*/
void setChosenAsPrimary(boolean chosenAsPrimary) {
this.chosenAsPrimary = chosenAsPrimary;
}
/**
* Is data-node the replica belongs to alive.
*/
boolean isAlive() {
return expectedLocation.getDatanodeDescriptor().isAlive;
}
@Override // Block
public int hashCode() {
return super.hashCode();
}
@Override // Block
public boolean equals(Object obj) {
// Sufficient to rely on super's implementation
return (this == obj) || super.equals(obj);
}
@Override
public String toString() {
final StringBuilder b = new StringBuilder(50);
appendStringTo(b);
return b.toString();
}
@Override
public void appendStringTo(StringBuilder sb) {
sb.append("ReplicaUC[")
.append(expectedLocation)
.append("|")
.append(state)
.append("]");
}
}

View File

@ -3072,7 +3072,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
src, new BlockInfoContiguous[] { b });
}
}
/**
* Change the indicated filename.
* @deprecated Use {@link #renameTo(String, String, boolean,
@ -3540,7 +3540,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
throw new AlreadyBeingCreatedException(message);
case UNDER_CONSTRUCTION:
case UNDER_RECOVERY:
final BlockInfoContiguousUnderConstruction uc = (BlockInfoContiguousUnderConstruction)lastBlock;
final BlockInfoContiguousUnderConstruction uc =
(BlockInfoContiguousUnderConstruction)lastBlock;
// determine if last block was intended to be truncated
Block recoveryBlock = uc.getTruncateBlock();
boolean truncateRecovery = recoveryBlock != null;
@ -3650,9 +3651,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
blockManager.checkReplication(pendingFile);
}
@VisibleForTesting
BlockInfoContiguous getStoredBlock(Block block) {
return blockManager.getStoredBlock(block);
public BlockInfoContiguous getStoredBlock(Block block) {
return (BlockInfoContiguous) blockManager.getStoredBlock(block);
}
@Override
@ -3811,9 +3811,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
trimmedTargets.get(i).getStorageInfo(trimmedStorages.get(i));
if (storageInfo != null) {
if(copyTruncate) {
storageInfo.addBlock(truncatedBlock);
storageInfo.addBlock(truncatedBlock, truncatedBlock);
} else {
storageInfo.addBlock(storedBlock);
storageInfo.addBlock(storedBlock, storedBlock);
}
}
}
@ -4163,9 +4163,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
while (it.hasNext()) {
Block b = it.next();
BlockInfoContiguous blockInfo = blockManager.getStoredBlock(b);
if (blockInfo.getBlockCollection().getStoragePolicyID()
== lpPolicy.getId()) {
BlockInfoContiguous blockInfo = getStoredBlock(b);
if (blockInfo.getBlockCollection().getStoragePolicyID() == lpPolicy.getId()) {
filesToDelete.add(blockInfo.getBlockCollection());
}
}

View File

@ -243,7 +243,8 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
//get blockInfo
Block block = new Block(Block.getBlockId(blockId));
//find which file this block belongs to
BlockInfoContiguous blockInfo = bm.getStoredBlock(block);
BlockInfoContiguous blockInfo = namenode.getNamesystem()
.getStoredBlock(block);
if(blockInfo == null) {
out.println("Block "+ blockId +" " + NONEXISTENT_STATUS);
LOG.warn("Block "+ blockId + " " + NONEXISTENT_STATUS);

View File

@ -239,10 +239,12 @@ public class FSImageFormatPBSnapshot {
FileDiff diff = new FileDiff(pbf.getSnapshotId(), copy, null,
pbf.getFileSize());
List<BlockProto> bpl = pbf.getBlocksList();
// TODO: also persist striped blocks
BlockInfoContiguous[] blocks = new BlockInfoContiguous[bpl.size()];
for(int j = 0, e = bpl.size(); j < e; ++j) {
Block blk = PBHelper.convert(bpl.get(j));
BlockInfoContiguous storedBlock = fsn.getBlockManager().getStoredBlock(blk);
BlockInfoContiguous storedBlock =
(BlockInfoContiguous) fsn.getBlockManager().getStoredBlock(blk);
if(storedBlock == null) {
storedBlock = fsn.getBlockManager().addBlockCollection(
new BlockInfoContiguous(blk, copy.getFileReplication()), file);

View File

@ -1608,8 +1608,8 @@ public class DFSTestUtil {
*/
public static DatanodeDescriptor getExpectedPrimaryNode(NameNode nn,
ExtendedBlock blk) {
BlockManager bm0 = nn.getNamesystem().getBlockManager();
BlockInfoContiguous storedBlock = bm0.getStoredBlock(blk.getLocalBlock());
FSNamesystem fsn = nn.getNamesystem();
BlockInfoContiguous storedBlock = fsn.getStoredBlock(blk.getLocalBlock());
assertTrue("Block " + blk + " should be under construction, " +
"got: " + storedBlock,
storedBlock instanceof BlockInfoContiguousUnderConstruction);

View File

@ -63,7 +63,7 @@ public class TestBlockInfo {
final DatanodeStorageInfo storage = DFSTestUtil.createDatanodeStorageInfo("storageID", "127.0.0.1");
boolean added = blockInfo.addStorage(storage);
boolean added = blockInfo.addStorage(storage, blockInfo);
Assert.assertTrue(added);
Assert.assertEquals(storage, blockInfo.getStorageInfo(0));
@ -129,7 +129,7 @@ public class TestBlockInfo {
// list length should be equal to the number of blocks we inserted
LOG.info("Checking list length...");
assertEquals("Length should be MAX_BLOCK", MAX_BLOCKS, dd.numBlocks());
Iterator<BlockInfoContiguous> it = dd.getBlockIterator();
Iterator<BlockInfo> it = dd.getBlockIterator();
int len = 0;
while (it.hasNext()) {
it.next();
@ -151,7 +151,7 @@ public class TestBlockInfo {
// move head of the list to the head - this should not change the list
LOG.info("Moving head to the head...");
BlockInfoContiguous temp = dd.getBlockListHeadForTesting();
BlockInfo temp = dd.getBlockListHeadForTesting();
curIndex = 0;
headIndex = 0;
dd.moveBlockToHead(temp, curIndex, headIndex);

View File

@ -0,0 +1,219 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo.AddBlockResult;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.internal.util.reflection.Whitebox;
import static org.apache.hadoop.hdfs.protocol.HdfsConstants.NUM_DATA_BLOCKS;
import static org.apache.hadoop.hdfs.protocol.HdfsConstants.NUM_PARITY_BLOCKS;
/**
* Test {@link BlockInfoStriped}
*/
public class TestBlockInfoStriped {
private static final int TOTAL_NUM_BLOCKS = NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS;
private static final long BASE_ID = -1600;
private static final Block baseBlock = new Block(BASE_ID);
private BlockInfoStriped info;
@Before
public void setup() {
info = new BlockInfoStriped(baseBlock, NUM_DATA_BLOCKS, NUM_PARITY_BLOCKS);
}
private Block[] createReportedBlocks(int num) {
Block[] blocks = new Block[num];
for (int i = 0; i < num; i++) {
blocks[i] = new Block(BASE_ID + i);
}
return blocks;
}
/**
* Test adding storage and reported block
*/
@Test
public void testAddStorage() {
// first add NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS storages, i.e., a complete
// group of blocks/storages
DatanodeStorageInfo[] storageInfos = DFSTestUtil.createDatanodeStorageInfos(
TOTAL_NUM_BLOCKS);
Block[] blocks = createReportedBlocks(TOTAL_NUM_BLOCKS);
int i = 0;
for (; i < storageInfos.length; i += 2) {
info.addStorage(storageInfos[i], blocks[i]);
Assert.assertEquals(i/2 + 1, info.numNodes());
}
i /= 2;
for (int j = 1; j < storageInfos.length; j += 2) {
Assert.assertTrue(info.addStorage(storageInfos[j], blocks[j]));
Assert.assertEquals(i + (j+1)/2, info.numNodes());
}
// check
byte[] indices = (byte[]) Whitebox.getInternalState(info, "indices");
Assert.assertEquals(TOTAL_NUM_BLOCKS, info.getCapacity());
Assert.assertEquals(TOTAL_NUM_BLOCKS, indices.length);
i = 0;
for (DatanodeStorageInfo storage : storageInfos) {
int index = info.findStorageInfo(storage);
Assert.assertEquals(i++, index);
Assert.assertEquals(index, indices[index]);
}
// the same block is reported from the same storage twice
i = 0;
for (DatanodeStorageInfo storage : storageInfos) {
Assert.assertTrue(info.addStorage(storage, blocks[i++]));
}
Assert.assertEquals(TOTAL_NUM_BLOCKS, info.getCapacity());
Assert.assertEquals(TOTAL_NUM_BLOCKS, info.numNodes());
Assert.assertEquals(TOTAL_NUM_BLOCKS, indices.length);
i = 0;
for (DatanodeStorageInfo storage : storageInfos) {
int index = info.findStorageInfo(storage);
Assert.assertEquals(i++, index);
Assert.assertEquals(index, indices[index]);
}
// the same block is reported from another storage
DatanodeStorageInfo[] storageInfos2 = DFSTestUtil.createDatanodeStorageInfos(
TOTAL_NUM_BLOCKS * 2);
// only add the second half of info2
for (i = TOTAL_NUM_BLOCKS; i < storageInfos2.length; i++) {
info.addStorage(storageInfos2[i], blocks[i % TOTAL_NUM_BLOCKS]);
Assert.assertEquals(i + 1, info.getCapacity());
Assert.assertEquals(i + 1, info.numNodes());
indices = (byte[]) Whitebox.getInternalState(info, "indices");
Assert.assertEquals(i + 1, indices.length);
}
for (i = TOTAL_NUM_BLOCKS; i < storageInfos2.length; i++) {
int index = info.findStorageInfo(storageInfos2[i]);
Assert.assertEquals(i++, index);
Assert.assertEquals(index - TOTAL_NUM_BLOCKS, indices[index]);
}
}
@Test
public void testRemoveStorage() {
// first add TOTAL_NUM_BLOCKS into the BlockInfoStriped
DatanodeStorageInfo[] storages = DFSTestUtil.createDatanodeStorageInfos(
TOTAL_NUM_BLOCKS);
Block[] blocks = createReportedBlocks(TOTAL_NUM_BLOCKS);
for (int i = 0; i < storages.length; i++) {
info.addStorage(storages[i], blocks[i]);
}
// remove two storages
info.removeStorage(storages[0]);
info.removeStorage(storages[2]);
// check
Assert.assertEquals(TOTAL_NUM_BLOCKS, info.getCapacity());
Assert.assertEquals(TOTAL_NUM_BLOCKS - 2, info.numNodes());
byte[] indices = (byte[]) Whitebox.getInternalState(info, "indices");
for (int i = 0; i < storages.length; i++) {
int index = info.findStorageInfo(storages[i]);
if (i != 0 && i != 2) {
Assert.assertEquals(i, index);
Assert.assertEquals(index, indices[index]);
} else {
Assert.assertEquals(-1, index);
Assert.assertEquals(-1, indices[i]);
}
}
// the same block is reported from another storage
DatanodeStorageInfo[] storages2 = DFSTestUtil.createDatanodeStorageInfos(
TOTAL_NUM_BLOCKS * 2);
for (int i = TOTAL_NUM_BLOCKS; i < storages2.length; i++) {
info.addStorage(storages2[i], blocks[i % TOTAL_NUM_BLOCKS]);
}
// now we should have 8 storages
Assert.assertEquals(TOTAL_NUM_BLOCKS * 2 - 2, info.numNodes());
Assert.assertEquals(TOTAL_NUM_BLOCKS * 2 - 2, info.getCapacity());
indices = (byte[]) Whitebox.getInternalState(info, "indices");
Assert.assertEquals(TOTAL_NUM_BLOCKS * 2 - 2, indices.length);
int j = TOTAL_NUM_BLOCKS;
for (int i = TOTAL_NUM_BLOCKS; i < storages2.length; i++) {
int index = info.findStorageInfo(storages2[i]);
if (i == TOTAL_NUM_BLOCKS || i == TOTAL_NUM_BLOCKS + 2) {
Assert.assertEquals(i - TOTAL_NUM_BLOCKS, index);
} else {
Assert.assertEquals(j++, index);
}
}
// remove the storages from storages2
for (int i = 0; i < TOTAL_NUM_BLOCKS; i++) {
info.removeStorage(storages2[i + TOTAL_NUM_BLOCKS]);
}
// now we should have 3 storages
Assert.assertEquals(TOTAL_NUM_BLOCKS - 2, info.numNodes());
Assert.assertEquals(TOTAL_NUM_BLOCKS * 2 - 2, info.getCapacity());
indices = (byte[]) Whitebox.getInternalState(info, "indices");
Assert.assertEquals(TOTAL_NUM_BLOCKS * 2 - 2, indices.length);
for (int i = 0; i < TOTAL_NUM_BLOCKS; i++) {
if (i == 0 || i == 2) {
int index = info.findStorageInfo(storages2[i + TOTAL_NUM_BLOCKS]);
Assert.assertEquals(-1, index);
} else {
int index = info.findStorageInfo(storages[i]);
Assert.assertEquals(i, index);
}
}
for (int i = TOTAL_NUM_BLOCKS; i < TOTAL_NUM_BLOCKS * 2 - 2; i++) {
Assert.assertEquals(-1, indices[i]);
Assert.assertNull(info.getDatanode(i));
}
}
@Test
public void testReplaceBlock() {
DatanodeStorageInfo[] storages = DFSTestUtil.createDatanodeStorageInfos(
TOTAL_NUM_BLOCKS);
Block[] blocks = createReportedBlocks(TOTAL_NUM_BLOCKS);
// add block/storage 0, 2, 4 into the BlockInfoStriped
for (int i = 0; i < storages.length; i += 2) {
Assert.assertEquals(AddBlockResult.ADDED,
storages[i].addBlock(info, blocks[i]));
}
BlockInfoStriped newBlockInfo = new BlockInfoStriped(info);
info.replaceBlock(newBlockInfo);
// make sure the newBlockInfo is correct
byte[] indices = (byte[]) Whitebox.getInternalState(newBlockInfo, "indices");
for (int i = 0; i < storages.length; i += 2) {
int index = newBlockInfo.findStorageInfo(storages[i]);
Assert.assertEquals(i, index);
Assert.assertEquals(index, indices[i]);
// make sure the newBlockInfo is added to the linked list of the storage
Assert.assertSame(newBlockInfo, storages[i].getBlockListHeadForTesting());
Assert.assertEquals(1, storages[i].numBlocks());
Assert.assertNull(newBlockInfo.getNext());
}
}
}

View File

@ -383,7 +383,7 @@ public class TestBlockManager {
for (int i = 1; i < pipeline.length; i++) {
DatanodeStorageInfo storage = pipeline[i];
bm.addBlock(storage, blockInfo, null);
blockInfo.addStorage(storage);
blockInfo.addStorage(storage, blockInfo);
}
}
@ -393,7 +393,7 @@ public class TestBlockManager {
for (DatanodeDescriptor dn : nodes) {
for (DatanodeStorageInfo storage : dn.getStorageInfos()) {
blockInfo.addStorage(storage);
blockInfo.addStorage(storage, blockInfo);
}
}
return blockInfo;

View File

@ -1244,7 +1244,7 @@ public class TestReplicationPolicy {
when(storage.removeBlock(any(BlockInfoContiguous.class))).thenReturn(true);
when(storage.addBlock(any(BlockInfoContiguous.class))).thenReturn
(DatanodeStorageInfo.AddBlockResult.ADDED);
ucBlock.addStorage(storage);
ucBlock.addStorage(storage, ucBlock);
when(mbc.setLastBlock((BlockInfoContiguous) any(), (DatanodeStorageInfo[]) any()))
.thenReturn(ucBlock);