HDFS-8801. Convert BlockInfoUnderConstruction as a feature. Contributed by Jing Zhao.
This commit is contained in:
parent
4e712faac2
commit
55f7097900
|
@ -453,6 +453,9 @@ Release 2.8.0 - UNRELEASED
|
|||
HDFS-6407. Add sorting and pagination in the datanode tab of the NN Web UI.
|
||||
(wheat9)
|
||||
|
||||
HDFS-8801. Convert BlockInfoUnderConstruction as a feature.
|
||||
(Jing Zhao via wheat9)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
|
||||
|
|
|
@ -79,8 +79,7 @@ public interface BlockCollection {
|
|||
* Convert the last block of the collection to an under-construction block
|
||||
* and set the locations.
|
||||
*/
|
||||
public BlockInfoContiguousUnderConstruction setLastBlock(BlockInfo lastBlock,
|
||||
DatanodeStorageInfo[] targets) throws IOException;
|
||||
public void convertLastBlockToUC(BlockInfo lastBlock, DatanodeStorageInfo[] targets) throws IOException;
|
||||
|
||||
/**
|
||||
* @return whether the block collection is under construction.
|
||||
|
|
|
@ -17,11 +17,16 @@
|
|||
*/
|
||||
package org.apache.hadoop.hdfs.server.blockmanagement;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockUnderConstructionFeature.ReplicaUnderConstruction;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
import org.apache.hadoop.util.LightWeightGSet;
|
||||
|
||||
/**
|
||||
|
@ -35,6 +40,7 @@ import org.apache.hadoop.util.LightWeightGSet;
|
|||
@InterfaceAudience.Private
|
||||
public abstract class BlockInfo extends Block
|
||||
implements LightWeightGSet.LinkedElement {
|
||||
|
||||
public static final BlockInfo[] EMPTY_ARRAY = {};
|
||||
|
||||
private BlockCollection bc;
|
||||
|
@ -56,6 +62,8 @@ public abstract class BlockInfo extends Block
|
|||
*/
|
||||
protected Object[] triplets;
|
||||
|
||||
private BlockUnderConstructionFeature uc;
|
||||
|
||||
/**
|
||||
* Construct an entry for blocksmap
|
||||
* @param replication the block's replication factor
|
||||
|
@ -184,7 +192,6 @@ public abstract class BlockInfo extends Block
|
|||
*/
|
||||
abstract boolean removeStorage(DatanodeStorageInfo storage);
|
||||
|
||||
|
||||
/**
|
||||
* Replace the current BlockInfo with the new one in corresponding
|
||||
* DatanodeStorageInfo's linked list
|
||||
|
@ -295,47 +302,6 @@ public abstract class BlockInfo extends Block
|
|||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* BlockInfo represents a block that is not being constructed.
|
||||
* In order to start modifying the block, the BlockInfo should be converted
|
||||
* to {@link BlockInfoContiguousUnderConstruction}.
|
||||
* @return {@link BlockUCState#COMPLETE}
|
||||
*/
|
||||
public BlockUCState getBlockUCState() {
|
||||
return BlockUCState.COMPLETE;
|
||||
}
|
||||
|
||||
/**
|
||||
* Is this block complete?
|
||||
*
|
||||
* @return true if the state of the block is {@link BlockUCState#COMPLETE}
|
||||
*/
|
||||
public boolean isComplete() {
|
||||
return getBlockUCState().equals(BlockUCState.COMPLETE);
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert a complete block to an under construction block.
|
||||
* @return BlockInfoUnderConstruction - an under construction block.
|
||||
*/
|
||||
public BlockInfoContiguousUnderConstruction convertToBlockUnderConstruction(
|
||||
BlockUCState s, DatanodeStorageInfo[] targets) {
|
||||
if(isComplete()) {
|
||||
BlockInfoContiguousUnderConstruction ucBlock =
|
||||
new BlockInfoContiguousUnderConstruction(this,
|
||||
getBlockCollection().getPreferredBlockReplication(), s, targets);
|
||||
ucBlock.setBlockCollection(getBlockCollection());
|
||||
return ucBlock;
|
||||
}
|
||||
// the block is already under construction
|
||||
BlockInfoContiguousUnderConstruction ucBlock =
|
||||
(BlockInfoContiguousUnderConstruction)this;
|
||||
ucBlock.setBlockUCState(s);
|
||||
ucBlock.setExpectedLocations(targets);
|
||||
ucBlock.setBlockCollection(getBlockCollection());
|
||||
return ucBlock;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
// Super implementation is sufficient
|
||||
|
@ -357,4 +323,90 @@ public abstract class BlockInfo extends Block
|
|||
public void setNext(LightWeightGSet.LinkedElement next) {
|
||||
this.nextLinkedElement = next;
|
||||
}
|
||||
|
||||
/* UnderConstruction Feature related */
|
||||
|
||||
public BlockUnderConstructionFeature getUnderConstructionFeature() {
|
||||
return uc;
|
||||
}
|
||||
|
||||
public BlockUCState getBlockUCState() {
|
||||
return uc == null ? BlockUCState.COMPLETE : uc.getBlockUCState();
|
||||
}
|
||||
|
||||
/**
|
||||
* Is this block complete?
|
||||
*
|
||||
* @return true if the state of the block is {@link BlockUCState#COMPLETE}
|
||||
*/
|
||||
public boolean isComplete() {
|
||||
return getBlockUCState().equals(BlockUCState.COMPLETE);
|
||||
}
|
||||
|
||||
/**
|
||||
* Add/Update the under construction feature.
|
||||
*/
|
||||
public void convertToBlockUnderConstruction(BlockUCState s,
|
||||
DatanodeStorageInfo[] targets) {
|
||||
if (isComplete()) {
|
||||
uc = new BlockUnderConstructionFeature(this, s, targets);
|
||||
} else {
|
||||
// the block is already under construction
|
||||
uc.setBlockUCState(s);
|
||||
uc.setExpectedLocations(this.getGenerationStamp(), targets);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert an under construction block to a complete block.
|
||||
*
|
||||
* @return BlockInfo - a complete block.
|
||||
* @throws IOException if the state of the block
|
||||
* (the generation stamp and the length) has not been committed by
|
||||
* the client or it does not have at least a minimal number of replicas
|
||||
* reported from data-nodes.
|
||||
*/
|
||||
BlockInfo convertToCompleteBlock() throws IOException {
|
||||
assert getBlockUCState() != BlockUCState.COMPLETE :
|
||||
"Trying to convert a COMPLETE block";
|
||||
uc = null;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Process the recorded replicas. When about to commit or finish the
|
||||
* pipeline recovery sort out bad replicas.
|
||||
* @param genStamp The final generation stamp for the block.
|
||||
*/
|
||||
public void setGenerationStampAndVerifyReplicas(long genStamp) {
|
||||
Preconditions.checkState(uc != null && !isComplete());
|
||||
// Set the generation stamp for the block.
|
||||
setGenerationStamp(genStamp);
|
||||
|
||||
// Remove the replicas with wrong gen stamp
|
||||
List<ReplicaUnderConstruction> staleReplicas = uc.getStaleReplicas(genStamp);
|
||||
for (ReplicaUnderConstruction r : staleReplicas) {
|
||||
r.getExpectedStorageLocation().removeBlock(this);
|
||||
NameNode.blockStateChangeLog.debug("BLOCK* Removing stale replica "
|
||||
+ "from location: {}", r.getExpectedStorageLocation());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Commit block's length and generation stamp as reported by the client.
|
||||
* Set block state to {@link BlockUCState#COMMITTED}.
|
||||
* @param block - contains client reported block length and generation
|
||||
* @throws IOException if block ids are inconsistent.
|
||||
*/
|
||||
void commitBlock(Block block) throws IOException {
|
||||
if (getBlockId() != block.getBlockId()) {
|
||||
throw new IOException("Trying to commit inconsistent block: id = "
|
||||
+ block.getBlockId() + ", expected id = " + getBlockId());
|
||||
}
|
||||
Preconditions.checkState(!isComplete());
|
||||
uc.commit();
|
||||
this.set(getBlockId(), block.getNumBytes(), block.getGenerationStamp());
|
||||
// Sort out invalid replicas.
|
||||
setGenerationStampAndVerifyReplicas(block.getGenerationStamp());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,7 +17,6 @@
|
|||
*/
|
||||
package org.apache.hadoop.hdfs.server.blockmanagement;
|
||||
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX;
|
||||
import static org.apache.hadoop.util.ExitUtil.terminate;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -609,9 +608,8 @@ public class BlockManager implements BlockStatsMXBean {
|
|||
* @throws IOException if the block does not have at least a minimal number
|
||||
* of replicas reported from data-nodes.
|
||||
*/
|
||||
private static boolean commitBlock(
|
||||
final BlockInfoContiguousUnderConstruction block, final Block commitBlock)
|
||||
throws IOException {
|
||||
private static boolean commitBlock(final BlockInfo block,
|
||||
final Block commitBlock) throws IOException {
|
||||
if (block.getBlockUCState() == BlockUCState.COMMITTED)
|
||||
return false;
|
||||
assert block.getNumBytes() <= commitBlock.getNumBytes() :
|
||||
|
@ -641,8 +639,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|||
if(lastBlock.isComplete())
|
||||
return false; // already completed (e.g. by syncBlock)
|
||||
|
||||
final boolean b = commitBlock(
|
||||
(BlockInfoContiguousUnderConstruction) lastBlock, commitBlock);
|
||||
final boolean b = commitBlock(lastBlock, commitBlock);
|
||||
if(countNodes(lastBlock).liveReplicas() >= minReplication)
|
||||
completeBlock(bc, bc.numBlocks()-1, false);
|
||||
return b;
|
||||
|
@ -662,16 +659,15 @@ public class BlockManager implements BlockStatsMXBean {
|
|||
BlockInfo curBlock = bc.getBlocks()[blkIndex];
|
||||
if(curBlock.isComplete())
|
||||
return curBlock;
|
||||
BlockInfoContiguousUnderConstruction ucBlock =
|
||||
(BlockInfoContiguousUnderConstruction) curBlock;
|
||||
int numNodes = ucBlock.numNodes();
|
||||
|
||||
int numNodes = curBlock.numNodes();
|
||||
if (!force && numNodes < minReplication)
|
||||
throw new IOException("Cannot complete block: " +
|
||||
"block does not satisfy minimal replication requirement.");
|
||||
if(!force && ucBlock.getBlockUCState() != BlockUCState.COMMITTED)
|
||||
if(!force && curBlock.getBlockUCState() != BlockUCState.COMMITTED)
|
||||
throw new IOException(
|
||||
"Cannot complete block: block has not been COMMITTED by the client");
|
||||
BlockInfo completeBlock = ucBlock.convertToCompleteBlock();
|
||||
BlockInfo completeBlock = curBlock.convertToCompleteBlock();
|
||||
// replace penultimate block in file
|
||||
bc.setBlock(blkIndex, completeBlock);
|
||||
|
||||
|
@ -705,7 +701,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|||
* when tailing edit logs as a Standby.
|
||||
*/
|
||||
public BlockInfo forceCompleteBlock(final BlockCollection bc,
|
||||
final BlockInfoContiguousUnderConstruction block) throws IOException {
|
||||
final BlockInfo block) throws IOException {
|
||||
block.commitBlock(block);
|
||||
return completeBlock(bc, block, true);
|
||||
}
|
||||
|
@ -727,28 +723,29 @@ public class BlockManager implements BlockStatsMXBean {
|
|||
*/
|
||||
public LocatedBlock convertLastBlockToUnderConstruction(
|
||||
BlockCollection bc, long bytesToRemove) throws IOException {
|
||||
BlockInfo oldBlock = bc.getLastBlock();
|
||||
if(oldBlock == null ||
|
||||
bc.getPreferredBlockSize() == oldBlock.getNumBytes() - bytesToRemove)
|
||||
BlockInfo lastBlock = bc.getLastBlock();
|
||||
if (lastBlock == null ||
|
||||
bc.getPreferredBlockSize() == lastBlock.getNumBytes() - bytesToRemove) {
|
||||
return null;
|
||||
assert oldBlock == getStoredBlock(oldBlock) :
|
||||
}
|
||||
assert lastBlock == getStoredBlock(lastBlock) :
|
||||
"last block of the file is not in blocksMap";
|
||||
|
||||
DatanodeStorageInfo[] targets = getStorages(oldBlock);
|
||||
DatanodeStorageInfo[] targets = getStorages(lastBlock);
|
||||
|
||||
BlockInfoContiguousUnderConstruction ucBlock =
|
||||
bc.setLastBlock(oldBlock, targets);
|
||||
blocksMap.replaceBlock(ucBlock);
|
||||
// convert the last block to under construction. note no block replacement
|
||||
// is happening
|
||||
bc.convertLastBlockToUC(lastBlock, targets);
|
||||
|
||||
// Remove block from replication queue.
|
||||
NumberReplicas replicas = countNodes(ucBlock);
|
||||
neededReplications.remove(ucBlock, replicas.liveReplicas(),
|
||||
replicas.decommissionedAndDecommissioning(), getReplication(ucBlock));
|
||||
pendingReplications.remove(ucBlock);
|
||||
NumberReplicas replicas = countNodes(lastBlock);
|
||||
neededReplications.remove(lastBlock, replicas.liveReplicas(),
|
||||
replicas.decommissionedAndDecommissioning(), getReplication(lastBlock));
|
||||
pendingReplications.remove(lastBlock);
|
||||
|
||||
// remove this block from the list of pending blocks to be deleted.
|
||||
for (DatanodeStorageInfo storage : targets) {
|
||||
invalidateBlocks.remove(storage.getDatanodeDescriptor(), oldBlock);
|
||||
invalidateBlocks.remove(storage.getDatanodeDescriptor(), lastBlock);
|
||||
}
|
||||
|
||||
// Adjust safe-mode totals, since under-construction blocks don't
|
||||
|
@ -759,9 +756,11 @@ public class BlockManager implements BlockStatsMXBean {
|
|||
// always decrement total blocks
|
||||
-1);
|
||||
|
||||
final long fileLength = bc.computeContentSummary(getStoragePolicySuite()).getLength();
|
||||
final long pos = fileLength - ucBlock.getNumBytes();
|
||||
return createLocatedBlock(ucBlock, pos, BlockTokenIdentifier.AccessMode.WRITE);
|
||||
final long fileLength = bc.computeContentSummary(
|
||||
getStoragePolicySuite()).getLength();
|
||||
final long pos = fileLength - lastBlock.getNumBytes();
|
||||
return createLocatedBlock(lastBlock, pos,
|
||||
BlockTokenIdentifier.AccessMode.WRITE);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -838,15 +837,9 @@ public class BlockManager implements BlockStatsMXBean {
|
|||
/** @return a LocatedBlock for the given block */
|
||||
private LocatedBlock createLocatedBlock(final BlockInfo blk, final long pos
|
||||
) throws IOException {
|
||||
if (blk instanceof BlockInfoContiguousUnderConstruction) {
|
||||
if (blk.isComplete()) {
|
||||
throw new IOException(
|
||||
"blk instanceof BlockInfoUnderConstruction && blk.isComplete()"
|
||||
+ ", blk=" + blk);
|
||||
}
|
||||
final BlockInfoContiguousUnderConstruction uc =
|
||||
(BlockInfoContiguousUnderConstruction) blk;
|
||||
final DatanodeStorageInfo[] storages = uc.getExpectedStorageLocations();
|
||||
if (!blk.isComplete()) {
|
||||
final DatanodeStorageInfo[] storages = blk.getUnderConstructionFeature()
|
||||
.getExpectedStorageLocations();
|
||||
final ExtendedBlock eb = new ExtendedBlock(namesystem.getBlockPoolId(), blk);
|
||||
return newLocatedBlock(eb, storages, pos, false);
|
||||
}
|
||||
|
@ -1750,12 +1743,13 @@ public class BlockManager implements BlockStatsMXBean {
|
|||
* reported by the datanode in the block report.
|
||||
*/
|
||||
static class StatefulBlockInfo {
|
||||
final BlockInfoContiguousUnderConstruction storedBlock;
|
||||
final BlockInfo storedBlock;
|
||||
final Block reportedBlock;
|
||||
final ReplicaState reportedState;
|
||||
|
||||
StatefulBlockInfo(BlockInfoContiguousUnderConstruction storedBlock,
|
||||
StatefulBlockInfo(BlockInfo storedBlock,
|
||||
Block reportedBlock, ReplicaState reportedState) {
|
||||
Preconditions.checkArgument(!storedBlock.isComplete());
|
||||
this.storedBlock = storedBlock;
|
||||
this.reportedBlock = reportedBlock;
|
||||
this.reportedState = reportedState;
|
||||
|
@ -2154,15 +2148,14 @@ public class BlockManager implements BlockStatsMXBean {
|
|||
|
||||
// If block is under construction, add this replica to its list
|
||||
if (isBlockUnderConstruction(storedBlock, ucState, reportedState)) {
|
||||
((BlockInfoContiguousUnderConstruction)storedBlock)
|
||||
storedBlock.getUnderConstructionFeature()
|
||||
.addReplicaIfNotPresent(storageInfo, iblk, reportedState);
|
||||
// OpenFileBlocks only inside snapshots also will be added to safemode
|
||||
// threshold. So we need to update such blocks to safemode
|
||||
// refer HDFS-5283
|
||||
BlockInfoContiguousUnderConstruction blockUC =
|
||||
(BlockInfoContiguousUnderConstruction) storedBlock;
|
||||
if (namesystem.isInSnapshot(blockUC)) {
|
||||
int numOfReplicas = blockUC.getNumExpectedLocations();
|
||||
if (namesystem.isInSnapshot(storedBlock)) {
|
||||
int numOfReplicas = storedBlock.getUnderConstructionFeature()
|
||||
.getNumExpectedLocations();
|
||||
namesystem.incrementSafeBlockCount(numOfReplicas);
|
||||
}
|
||||
//and fall through to next clause
|
||||
|
@ -2287,11 +2280,6 @@ public class BlockManager implements BlockStatsMXBean {
|
|||
|
||||
// Ignore replicas already scheduled to be removed from the DN
|
||||
if(invalidateBlocks.contains(dn, block)) {
|
||||
/*
|
||||
* TODO: following assertion is incorrect, see HDFS-2668 assert
|
||||
* storedBlock.findDatanode(dn) < 0 : "Block " + block +
|
||||
* " in recentInvalidatesSet should not appear in DN " + dn;
|
||||
*/
|
||||
return storedBlock;
|
||||
}
|
||||
|
||||
|
@ -2314,8 +2302,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|||
}
|
||||
|
||||
if (isBlockUnderConstruction(storedBlock, ucState, reportedState)) {
|
||||
toUC.add(new StatefulBlockInfo(
|
||||
(BlockInfoContiguousUnderConstruction) storedBlock,
|
||||
toUC.add(new StatefulBlockInfo(storedBlock,
|
||||
new Block(block), reportedState));
|
||||
return storedBlock;
|
||||
}
|
||||
|
@ -2506,8 +2493,8 @@ public class BlockManager implements BlockStatsMXBean {
|
|||
|
||||
void addStoredBlockUnderConstruction(StatefulBlockInfo ucBlock,
|
||||
DatanodeStorageInfo storageInfo) throws IOException {
|
||||
BlockInfoContiguousUnderConstruction block = ucBlock.storedBlock;
|
||||
block.addReplicaIfNotPresent(
|
||||
BlockInfo block = ucBlock.storedBlock;
|
||||
block.getUnderConstructionFeature().addReplicaIfNotPresent(
|
||||
storageInfo, ucBlock.reportedBlock, ucBlock.reportedState);
|
||||
|
||||
if (ucBlock.reportedState == ReplicaState.FINALIZED &&
|
||||
|
@ -2567,7 +2554,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|||
assert block != null && namesystem.hasWriteLock();
|
||||
BlockInfo storedBlock;
|
||||
DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
|
||||
if (block instanceof BlockInfoContiguousUnderConstruction) {
|
||||
if (!block.isComplete()) {
|
||||
//refresh our copy in case the block got completed in another thread
|
||||
storedBlock = blocksMap.getStoredBlock(block);
|
||||
} else {
|
||||
|
@ -3522,11 +3509,9 @@ public class BlockManager implements BlockStatsMXBean {
|
|||
String src, BlockInfo[] blocks) {
|
||||
for (BlockInfo b: blocks) {
|
||||
if (!b.isComplete()) {
|
||||
final BlockInfoContiguousUnderConstruction uc =
|
||||
(BlockInfoContiguousUnderConstruction)b;
|
||||
final int numNodes = b.numNodes();
|
||||
LOG.info("BLOCK* " + b + " is not COMPLETE (ucState = "
|
||||
+ uc.getBlockUCState() + ", replication# = " + numNodes
|
||||
+ b.getBlockUCState() + ", replication# = " + numNodes
|
||||
+ (numNodes < minReplication ? " < ": " >= ")
|
||||
+ " minimum = " + minReplication + ") in file " + src);
|
||||
return false;
|
||||
|
|
|
@ -17,7 +17,6 @@
|
|||
*/
|
||||
package org.apache.hadoop.hdfs.server.blockmanagement;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
@ -31,7 +30,7 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
|||
* Represents a block that is currently being constructed.<br>
|
||||
* This is usually the last block of a file opened for write or append.
|
||||
*/
|
||||
public class BlockInfoContiguousUnderConstruction extends BlockInfoContiguous {
|
||||
public class BlockUnderConstructionFeature {
|
||||
/** Block state. See {@link BlockUCState} */
|
||||
private BlockUCState blockUCState;
|
||||
|
||||
|
@ -62,32 +61,40 @@ public class BlockInfoContiguousUnderConstruction extends BlockInfoContiguous {
|
|||
/**
|
||||
* ReplicaUnderConstruction contains information about replicas while
|
||||
* they are under construction.
|
||||
* The GS, the length and the state of the replica is as reported by
|
||||
* The GS, the length and the state of the replica is as reported by
|
||||
* the data-node.
|
||||
* It is not guaranteed, but expected, that data-nodes actually have
|
||||
* corresponding replicas.
|
||||
*/
|
||||
static class ReplicaUnderConstruction extends Block {
|
||||
static class ReplicaUnderConstruction {
|
||||
private long generationStamp;
|
||||
private final DatanodeStorageInfo expectedLocation;
|
||||
private ReplicaState state;
|
||||
private boolean chosenAsPrimary;
|
||||
|
||||
ReplicaUnderConstruction(Block block,
|
||||
DatanodeStorageInfo target,
|
||||
ReplicaState state) {
|
||||
super(block);
|
||||
ReplicaUnderConstruction(long generationStamp, DatanodeStorageInfo target,
|
||||
ReplicaState state) {
|
||||
this.generationStamp = generationStamp;
|
||||
this.expectedLocation = target;
|
||||
this.state = state;
|
||||
this.chosenAsPrimary = false;
|
||||
}
|
||||
|
||||
long getGenerationStamp() {
|
||||
return this.generationStamp;
|
||||
}
|
||||
|
||||
void setGenerationStamp(long generationStamp) {
|
||||
this.generationStamp = generationStamp;
|
||||
}
|
||||
|
||||
/**
|
||||
* Expected block replica location as assigned when the block was allocated.
|
||||
* This defines the pipeline order.
|
||||
* It is not guaranteed, but expected, that the data-node actually has
|
||||
* the replica.
|
||||
*/
|
||||
private DatanodeStorageInfo getExpectedStorageLocation() {
|
||||
DatanodeStorageInfo getExpectedStorageLocation() {
|
||||
return expectedLocation;
|
||||
}
|
||||
|
||||
|
@ -126,76 +133,38 @@ public class BlockInfoContiguousUnderConstruction extends BlockInfoContiguous {
|
|||
return expectedLocation.getDatanodeDescriptor().isAlive;
|
||||
}
|
||||
|
||||
@Override // Block
|
||||
public int hashCode() {
|
||||
return super.hashCode();
|
||||
}
|
||||
|
||||
@Override // Block
|
||||
public boolean equals(Object obj) {
|
||||
// Sufficient to rely on super's implementation
|
||||
return (this == obj) || super.equals(obj);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
final StringBuilder b = new StringBuilder(50);
|
||||
appendStringTo(b);
|
||||
final StringBuilder b = new StringBuilder(50)
|
||||
.append("ReplicaUC[")
|
||||
.append(expectedLocation)
|
||||
.append("|")
|
||||
.append(state)
|
||||
.append("]");
|
||||
return b.toString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void appendStringTo(StringBuilder sb) {
|
||||
sb.append("ReplicaUC[")
|
||||
.append(expectedLocation)
|
||||
.append("|")
|
||||
.append(state)
|
||||
.append("]");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create block and set its state to
|
||||
* {@link BlockUCState#UNDER_CONSTRUCTION}.
|
||||
*/
|
||||
public BlockInfoContiguousUnderConstruction(Block blk, short replication) {
|
||||
this(blk, replication, BlockUCState.UNDER_CONSTRUCTION, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a block that is currently being constructed.
|
||||
*/
|
||||
public BlockInfoContiguousUnderConstruction(Block blk, short replication,
|
||||
BlockUCState state, DatanodeStorageInfo[] targets) {
|
||||
super(blk, replication);
|
||||
public BlockUnderConstructionFeature(Block block, BlockUCState state,
|
||||
DatanodeStorageInfo[] targets) {
|
||||
assert getBlockUCState() != BlockUCState.COMPLETE :
|
||||
"BlockInfoUnderConstruction cannot be in COMPLETE state";
|
||||
this.blockUCState = state;
|
||||
setExpectedLocations(targets);
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert an under construction block to a complete block.
|
||||
*
|
||||
* @return BlockInfo - a complete block.
|
||||
* @throws IOException if the state of the block
|
||||
* (the generation stamp and the length) has not been committed by
|
||||
* the client or it does not have at least a minimal number of replicas
|
||||
* reported from data-nodes.
|
||||
*/
|
||||
BlockInfo convertToCompleteBlock() throws IOException {
|
||||
assert getBlockUCState() != BlockUCState.COMPLETE :
|
||||
"Trying to convert a COMPLETE block";
|
||||
return new BlockInfoContiguous(this);
|
||||
setExpectedLocations(block.getGenerationStamp(), targets);
|
||||
}
|
||||
|
||||
/** Set expected locations */
|
||||
public void setExpectedLocations(DatanodeStorageInfo[] targets) {
|
||||
public void setExpectedLocations(long generationStamp,
|
||||
DatanodeStorageInfo[] targets) {
|
||||
int numLocations = targets == null ? 0 : targets.length;
|
||||
this.replicas = new ArrayList<ReplicaUnderConstruction>(numLocations);
|
||||
for(int i = 0; i < numLocations; i++)
|
||||
replicas.add(
|
||||
new ReplicaUnderConstruction(this, targets[i], ReplicaState.RBW));
|
||||
this.replicas = new ArrayList<>(numLocations);
|
||||
for(int i = 0; i < numLocations; i++) {
|
||||
replicas.add(new ReplicaUnderConstruction(generationStamp, targets[i],
|
||||
ReplicaState.RBW));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -205,8 +174,9 @@ public class BlockInfoContiguousUnderConstruction extends BlockInfoContiguous {
|
|||
public DatanodeStorageInfo[] getExpectedStorageLocations() {
|
||||
int numLocations = replicas == null ? 0 : replicas.size();
|
||||
DatanodeStorageInfo[] storages = new DatanodeStorageInfo[numLocations];
|
||||
for(int i = 0; i < numLocations; i++)
|
||||
for (int i = 0; i < numLocations; i++) {
|
||||
storages[i] = replicas.get(i).getExpectedStorageLocation();
|
||||
}
|
||||
return storages;
|
||||
}
|
||||
|
||||
|
@ -219,7 +189,6 @@ public class BlockInfoContiguousUnderConstruction extends BlockInfoContiguous {
|
|||
* Return the state of the block under construction.
|
||||
* @see BlockUCState
|
||||
*/
|
||||
@Override // BlockInfo
|
||||
public BlockUCState getBlockUCState() {
|
||||
return blockUCState;
|
||||
}
|
||||
|
@ -243,41 +212,23 @@ public class BlockInfoContiguousUnderConstruction extends BlockInfoContiguous {
|
|||
}
|
||||
|
||||
/**
|
||||
* Process the recorded replicas. When about to commit or finish the
|
||||
* pipeline recovery sort out bad replicas.
|
||||
* @param genStamp The final generation stamp for the block.
|
||||
* Set {@link #blockUCState} to {@link BlockUCState#COMMITTED}.
|
||||
*/
|
||||
public void setGenerationStampAndVerifyReplicas(long genStamp) {
|
||||
// Set the generation stamp for the block.
|
||||
setGenerationStamp(genStamp);
|
||||
if (replicas == null)
|
||||
return;
|
||||
|
||||
// Remove the replicas with wrong gen stamp.
|
||||
// The replica list is unchanged.
|
||||
for (ReplicaUnderConstruction r : replicas) {
|
||||
if (genStamp != r.getGenerationStamp()) {
|
||||
r.getExpectedStorageLocation().removeBlock(this);
|
||||
NameNode.blockStateChangeLog.debug("BLOCK* Removing stale replica "
|
||||
+ "from location: {}", r.getExpectedStorageLocation());
|
||||
}
|
||||
}
|
||||
void commit() {
|
||||
blockUCState = BlockUCState.COMMITTED;
|
||||
}
|
||||
|
||||
/**
|
||||
* Commit block's length and generation stamp as reported by the client.
|
||||
* Set block state to {@link BlockUCState#COMMITTED}.
|
||||
* @param block - contains client reported block length and generation
|
||||
* @throws IOException if block ids are inconsistent.
|
||||
*/
|
||||
void commitBlock(Block block) throws IOException {
|
||||
if(getBlockId() != block.getBlockId())
|
||||
throw new IOException("Trying to commit inconsistent block: id = "
|
||||
+ block.getBlockId() + ", expected id = " + getBlockId());
|
||||
blockUCState = BlockUCState.COMMITTED;
|
||||
this.set(getBlockId(), block.getNumBytes(), block.getGenerationStamp());
|
||||
// Sort out invalid replicas.
|
||||
setGenerationStampAndVerifyReplicas(block.getGenerationStamp());
|
||||
List<ReplicaUnderConstruction> getStaleReplicas(long genStamp) {
|
||||
List<ReplicaUnderConstruction> staleReplicas = new ArrayList<>();
|
||||
if (replicas != null) {
|
||||
// Remove replicas with wrong gen stamp. The replica list is unchanged.
|
||||
for (ReplicaUnderConstruction r : replicas) {
|
||||
if (genStamp != r.getGenerationStamp()) {
|
||||
staleReplicas.add(r);
|
||||
}
|
||||
}
|
||||
}
|
||||
return staleReplicas;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -285,7 +236,7 @@ public class BlockInfoContiguousUnderConstruction extends BlockInfoContiguous {
|
|||
* Find the first alive data-node starting from the previous primary and
|
||||
* make it primary.
|
||||
*/
|
||||
public void initializeBlockRecovery(long recoveryId) {
|
||||
public void initializeBlockRecovery(BlockInfo block, long recoveryId) {
|
||||
setBlockUCState(BlockUCState.UNDER_RECOVERY);
|
||||
blockRecoveryId = recoveryId;
|
||||
if (replicas.size() == 0) {
|
||||
|
@ -294,17 +245,17 @@ public class BlockInfoContiguousUnderConstruction extends BlockInfoContiguous {
|
|||
+ " No blocks found, lease removed.");
|
||||
}
|
||||
boolean allLiveReplicasTriedAsPrimary = true;
|
||||
for (int i = 0; i < replicas.size(); i++) {
|
||||
for (ReplicaUnderConstruction replica : replicas) {
|
||||
// Check if all replicas have been tried or not.
|
||||
if (replicas.get(i).isAlive()) {
|
||||
allLiveReplicasTriedAsPrimary =
|
||||
(allLiveReplicasTriedAsPrimary && replicas.get(i).getChosenAsPrimary());
|
||||
if (replica.isAlive()) {
|
||||
allLiveReplicasTriedAsPrimary = allLiveReplicasTriedAsPrimary
|
||||
&& replica.getChosenAsPrimary();
|
||||
}
|
||||
}
|
||||
if (allLiveReplicasTriedAsPrimary) {
|
||||
// Just set all the replicas to be chosen whether they are alive or not.
|
||||
for (int i = 0; i < replicas.size(); i++) {
|
||||
replicas.get(i).setChosenAsPrimary(false);
|
||||
for (ReplicaUnderConstruction replica : replicas) {
|
||||
replica.setChosenAsPrimary(false);
|
||||
}
|
||||
}
|
||||
long mostRecentLastUpdate = 0;
|
||||
|
@ -325,28 +276,27 @@ public class BlockInfoContiguousUnderConstruction extends BlockInfoContiguous {
|
|||
}
|
||||
}
|
||||
if (primary != null) {
|
||||
primary.getExpectedStorageLocation().getDatanodeDescriptor().addBlockToBeRecovered(this);
|
||||
primary.getExpectedStorageLocation().getDatanodeDescriptor()
|
||||
.addBlockToBeRecovered(block);
|
||||
primary.setChosenAsPrimary(true);
|
||||
NameNode.blockStateChangeLog.debug(
|
||||
"BLOCK* {} recovery started, primary={}", this, primary);
|
||||
}
|
||||
}
|
||||
|
||||
void addReplicaIfNotPresent(DatanodeStorageInfo storage,
|
||||
Block block,
|
||||
ReplicaState rState) {
|
||||
void addReplicaIfNotPresent(DatanodeStorageInfo storage, Block block,
|
||||
ReplicaState rState) {
|
||||
Iterator<ReplicaUnderConstruction> it = replicas.iterator();
|
||||
while (it.hasNext()) {
|
||||
ReplicaUnderConstruction r = it.next();
|
||||
DatanodeStorageInfo expectedLocation = r.getExpectedStorageLocation();
|
||||
if(expectedLocation == storage) {
|
||||
if (expectedLocation == storage) {
|
||||
// Record the gen stamp from the report
|
||||
r.setGenerationStamp(block.getGenerationStamp());
|
||||
return;
|
||||
} else if (expectedLocation != null &&
|
||||
expectedLocation.getDatanodeDescriptor() ==
|
||||
storage.getDatanodeDescriptor()) {
|
||||
|
||||
// The Datanode reported that the block is on a different storage
|
||||
// than the one chosen by BlockPlacementPolicy. This can occur as
|
||||
// we allow Datanodes to choose the target storage. Update our
|
||||
|
@ -355,46 +305,28 @@ public class BlockInfoContiguousUnderConstruction extends BlockInfoContiguous {
|
|||
break;
|
||||
}
|
||||
}
|
||||
replicas.add(new ReplicaUnderConstruction(block, storage, rState));
|
||||
}
|
||||
|
||||
@Override // BlockInfo
|
||||
// BlockInfoUnderConstruction participates in maps the same way as BlockInfo
|
||||
public int hashCode() {
|
||||
return super.hashCode();
|
||||
}
|
||||
|
||||
@Override // BlockInfo
|
||||
public boolean equals(Object obj) {
|
||||
// Sufficient to rely on super's implementation
|
||||
return (this == obj) || super.equals(obj);
|
||||
replicas.add(new ReplicaUnderConstruction(block.getGenerationStamp(), storage, rState));
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
final StringBuilder b = new StringBuilder(100);
|
||||
appendStringTo(b);
|
||||
appendUCParts(b);
|
||||
return b.toString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void appendStringTo(StringBuilder sb) {
|
||||
super.appendStringTo(sb);
|
||||
appendUCParts(sb);
|
||||
}
|
||||
|
||||
private void appendUCParts(StringBuilder sb) {
|
||||
sb.append("{UCState=").append(blockUCState)
|
||||
.append(", truncateBlock=" + truncateBlock)
|
||||
.append(", truncateBlock=").append(truncateBlock)
|
||||
.append(", primaryNodeIndex=").append(primaryNodeIndex)
|
||||
.append(", replicas=[");
|
||||
if (replicas != null) {
|
||||
Iterator<ReplicaUnderConstruction> iter = replicas.iterator();
|
||||
if (iter.hasNext()) {
|
||||
iter.next().appendStringTo(sb);
|
||||
sb.append(iter.next());
|
||||
while (iter.hasNext()) {
|
||||
sb.append(", ");
|
||||
iter.next().appendStringTo(sb);
|
||||
sb.append(iter.next());
|
||||
}
|
||||
}
|
||||
}
|
|
@ -224,8 +224,7 @@ public class DatanodeDescriptor extends DatanodeInfo {
|
|||
private final BlockQueue<BlockTargetPair> replicateBlocks =
|
||||
new BlockQueue<>();
|
||||
/** A queue of blocks to be recovered by this datanode */
|
||||
private final BlockQueue<BlockInfoContiguousUnderConstruction> recoverBlocks =
|
||||
new BlockQueue<BlockInfoContiguousUnderConstruction>();
|
||||
private final BlockQueue<BlockInfo> recoverBlocks = new BlockQueue<>();
|
||||
/** A set of blocks to be invalidated by this datanode */
|
||||
private final LightWeightHashSet<Block> invalidateBlocks =
|
||||
new LightWeightHashSet<>();
|
||||
|
@ -604,7 +603,7 @@ public class DatanodeDescriptor extends DatanodeInfo {
|
|||
/**
|
||||
* Store block recovery work.
|
||||
*/
|
||||
void addBlockToBeRecovered(BlockInfoContiguousUnderConstruction block) {
|
||||
void addBlockToBeRecovered(BlockInfo block) {
|
||||
if(recoverBlocks.contains(block)) {
|
||||
// this prevents adding the same block twice to the recovery queue
|
||||
BlockManager.LOG.info(block + " is already in the recovery queue");
|
||||
|
@ -646,11 +645,11 @@ public class DatanodeDescriptor extends DatanodeInfo {
|
|||
return replicateBlocks.poll(maxTransfers);
|
||||
}
|
||||
|
||||
public BlockInfoContiguousUnderConstruction[] getLeaseRecoveryCommand(int maxTransfers) {
|
||||
List<BlockInfoContiguousUnderConstruction> blocks = recoverBlocks.poll(maxTransfers);
|
||||
public BlockInfo[] getLeaseRecoveryCommand(int maxTransfers) {
|
||||
List<BlockInfo> blocks = recoverBlocks.poll(maxTransfers);
|
||||
if(blocks == null)
|
||||
return null;
|
||||
return blocks.toArray(new BlockInfoContiguousUnderConstruction[blocks.size()]);
|
||||
return blocks.toArray(new BlockInfo[blocks.size()]);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -1379,13 +1379,14 @@ public class DatanodeManager {
|
|||
}
|
||||
|
||||
//check lease recovery
|
||||
BlockInfoContiguousUnderConstruction[] blocks = nodeinfo
|
||||
.getLeaseRecoveryCommand(Integer.MAX_VALUE);
|
||||
BlockInfo[] blocks = nodeinfo.getLeaseRecoveryCommand(Integer.MAX_VALUE);
|
||||
if (blocks != null) {
|
||||
BlockRecoveryCommand brCommand = new BlockRecoveryCommand(
|
||||
blocks.length);
|
||||
for (BlockInfoContiguousUnderConstruction b : blocks) {
|
||||
final DatanodeStorageInfo[] storages = b.getExpectedStorageLocations();
|
||||
for (BlockInfo b : blocks) {
|
||||
BlockUnderConstructionFeature uc = b.getUnderConstructionFeature();
|
||||
assert uc != null;
|
||||
final DatanodeStorageInfo[] storages = uc.getExpectedStorageLocations();
|
||||
// Skip stale nodes during recovery - not heart beated for some time (30s by default).
|
||||
final List<DatanodeStorageInfo> recoveryLocations =
|
||||
new ArrayList<>(storages.length);
|
||||
|
@ -1396,11 +1397,11 @@ public class DatanodeManager {
|
|||
}
|
||||
// If we are performing a truncate recovery than set recovery fields
|
||||
// to old block.
|
||||
boolean truncateRecovery = b.getTruncateBlock() != null;
|
||||
boolean truncateRecovery = uc.getTruncateBlock() != null;
|
||||
boolean copyOnTruncateRecovery = truncateRecovery &&
|
||||
b.getTruncateBlock().getBlockId() != b.getBlockId();
|
||||
uc.getTruncateBlock().getBlockId() != b.getBlockId();
|
||||
ExtendedBlock primaryBlock = (copyOnTruncateRecovery) ?
|
||||
new ExtendedBlock(blockPoolId, b.getTruncateBlock()) :
|
||||
new ExtendedBlock(blockPoolId, uc.getTruncateBlock()) :
|
||||
new ExtendedBlock(blockPoolId, b);
|
||||
// If we only get 1 replica after eliminating stale nodes, then choose all
|
||||
// replicas for recovery and let the primary data node handle failures.
|
||||
|
@ -1419,12 +1420,12 @@ public class DatanodeManager {
|
|||
}
|
||||
if(truncateRecovery) {
|
||||
Block recoveryBlock = (copyOnTruncateRecovery) ? b :
|
||||
b.getTruncateBlock();
|
||||
uc.getTruncateBlock();
|
||||
brCommand.add(new RecoveringBlock(primaryBlock, recoveryInfos,
|
||||
recoveryBlock));
|
||||
} else {
|
||||
brCommand.add(new RecoveringBlock(primaryBlock, recoveryInfos,
|
||||
b.getBlockRecoveryId()));
|
||||
uc.getBlockRecoveryId()));
|
||||
}
|
||||
}
|
||||
return new DatanodeCommand[] { brCommand };
|
||||
|
|
|
@ -28,8 +28,9 @@ import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
|||
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
|
||||
import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockUnderConstructionFeature;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.RecoverLeaseOp;
|
||||
import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
|
||||
|
@ -95,7 +96,7 @@ final class FSDirTruncateOp {
|
|||
final BlockInfo last = file.getLastBlock();
|
||||
if (last != null && last.getBlockUCState()
|
||||
== BlockUCState.UNDER_RECOVERY) {
|
||||
final Block truncatedBlock = ((BlockInfoContiguousUnderConstruction) last)
|
||||
final Block truncatedBlock = last.getUnderConstructionFeature()
|
||||
.getTruncateBlock();
|
||||
if (truncatedBlock != null) {
|
||||
final long truncateLength = file.computeFileSize(false, false)
|
||||
|
@ -222,42 +223,42 @@ final class FSDirTruncateOp {
|
|||
oldBlock)));
|
||||
}
|
||||
|
||||
BlockInfoContiguousUnderConstruction truncatedBlockUC;
|
||||
final BlockInfo truncatedBlockUC;
|
||||
BlockManager blockManager = fsn.getFSDirectory().getBlockManager();
|
||||
if (shouldCopyOnTruncate) {
|
||||
// Add new truncateBlock into blocksMap and
|
||||
// use oldBlock as a source for copy-on-truncate recovery
|
||||
truncatedBlockUC = new BlockInfoContiguousUnderConstruction(newBlock,
|
||||
truncatedBlockUC = new BlockInfoContiguous(newBlock,
|
||||
file.getPreferredBlockReplication());
|
||||
truncatedBlockUC.convertToBlockUnderConstruction(
|
||||
BlockUCState.UNDER_CONSTRUCTION, blockManager.getStorages(oldBlock));
|
||||
truncatedBlockUC.setNumBytes(oldBlock.getNumBytes() - lastBlockDelta);
|
||||
truncatedBlockUC.setTruncateBlock(oldBlock);
|
||||
file.setLastBlock(truncatedBlockUC, blockManager.getStorages(oldBlock));
|
||||
truncatedBlockUC.getUnderConstructionFeature().setTruncateBlock(oldBlock);
|
||||
file.setLastBlock(truncatedBlockUC);
|
||||
blockManager.addBlockCollection(truncatedBlockUC, file);
|
||||
|
||||
NameNode.stateChangeLog.debug(
|
||||
"BLOCK* prepareFileForTruncate: Scheduling copy-on-truncate to new"
|
||||
+ " size {} new block {} old block {}",
|
||||
truncatedBlockUC.getNumBytes(), newBlock,
|
||||
truncatedBlockUC.getTruncateBlock());
|
||||
truncatedBlockUC.getNumBytes(), newBlock, oldBlock);
|
||||
} else {
|
||||
// Use new generation stamp for in-place truncate recovery
|
||||
blockManager.convertLastBlockToUnderConstruction(file, lastBlockDelta);
|
||||
oldBlock = file.getLastBlock();
|
||||
assert !oldBlock.isComplete() : "oldBlock should be under construction";
|
||||
truncatedBlockUC = (BlockInfoContiguousUnderConstruction) oldBlock;
|
||||
truncatedBlockUC.setTruncateBlock(new Block(oldBlock));
|
||||
truncatedBlockUC.getTruncateBlock().setNumBytes(
|
||||
oldBlock.getNumBytes() - lastBlockDelta);
|
||||
truncatedBlockUC.getTruncateBlock().setGenerationStamp(
|
||||
newBlock.getGenerationStamp());
|
||||
BlockUnderConstructionFeature uc = oldBlock.getUnderConstructionFeature();
|
||||
uc.setTruncateBlock(new Block(oldBlock));
|
||||
uc.getTruncateBlock().setNumBytes(oldBlock.getNumBytes() - lastBlockDelta);
|
||||
uc.getTruncateBlock().setGenerationStamp(newBlock.getGenerationStamp());
|
||||
truncatedBlockUC = oldBlock;
|
||||
|
||||
NameNode.stateChangeLog.debug(
|
||||
"BLOCK* prepareFileForTruncate: {} Scheduling in-place block "
|
||||
+ "truncate to new size {}", truncatedBlockUC.getTruncateBlock()
|
||||
.getNumBytes(), truncatedBlockUC);
|
||||
NameNode.stateChangeLog.debug("BLOCK* prepareFileForTruncate: " +
|
||||
"{} Scheduling in-place block truncate to new size {}",
|
||||
uc, uc.getTruncateBlock().getNumBytes());
|
||||
}
|
||||
if (shouldRecoverNow) {
|
||||
truncatedBlockUC.initializeBlockRecovery(newBlock.getGenerationStamp());
|
||||
truncatedBlockUC.getUnderConstructionFeature().initializeBlockRecovery(
|
||||
truncatedBlockUC, newBlock.getGenerationStamp());
|
||||
}
|
||||
|
||||
return newBlock;
|
||||
|
|
|
@ -43,8 +43,9 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
|||
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
|
||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockUnderConstructionFeature;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
||||
import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
|
||||
|
@ -73,7 +74,7 @@ class FSDirWriteFileOp {
|
|||
Block block) throws IOException {
|
||||
// modify file-> block and blocksMap
|
||||
// fileNode should be under construction
|
||||
BlockInfoContiguousUnderConstruction uc = fileNode.removeLastBlock(block);
|
||||
BlockInfo uc = fileNode.removeLastBlock(block);
|
||||
if (uc == null) {
|
||||
return false;
|
||||
}
|
||||
|
@ -236,8 +237,7 @@ class FSDirWriteFileOp {
|
|||
} else {
|
||||
// add new chosen targets to already allocated block and return
|
||||
BlockInfo lastBlockInFile = pendingFile.getLastBlock();
|
||||
((BlockInfoContiguousUnderConstruction) lastBlockInFile)
|
||||
.setExpectedLocations(targets);
|
||||
lastBlockInFile.getUnderConstructionFeature().setExpectedLocations(lastBlockInFile.getGenerationStamp(), targets);
|
||||
offset = pendingFile.computeFileSize();
|
||||
return makeLocatedBlock(fsn, lastBlockInFile, targets, offset);
|
||||
}
|
||||
|
@ -520,12 +520,10 @@ class FSDirWriteFileOp {
|
|||
fileINode.getPreferredBlockReplication(), true);
|
||||
|
||||
// associate new last block for the file
|
||||
BlockInfoContiguousUnderConstruction blockInfo =
|
||||
new BlockInfoContiguousUnderConstruction(
|
||||
block,
|
||||
fileINode.getFileReplication(),
|
||||
HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION,
|
||||
targets);
|
||||
BlockInfo blockInfo = new BlockInfoContiguous(block,
|
||||
fileINode.getFileReplication());
|
||||
blockInfo.convertToBlockUnderConstruction(
|
||||
HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION, targets);
|
||||
fsd.getBlockManager().addBlockCollection(blockInfo, fileINode);
|
||||
fileINode.addBlock(blockInfo);
|
||||
|
||||
|
@ -662,10 +660,10 @@ class FSDirWriteFileOp {
|
|||
"allocation of a new block in " + src + ". Returning previously" +
|
||||
" allocated block " + lastBlockInFile);
|
||||
long offset = file.computeFileSize();
|
||||
BlockInfoContiguousUnderConstruction lastBlockUC =
|
||||
(BlockInfoContiguousUnderConstruction) lastBlockInFile;
|
||||
BlockUnderConstructionFeature uc =
|
||||
lastBlockInFile.getUnderConstructionFeature();
|
||||
onRetryBlock[0] = makeLocatedBlock(fsn, lastBlockInFile,
|
||||
lastBlockUC.getExpectedStorageLocations(), offset);
|
||||
uc.getExpectedStorageLocations(), offset);
|
||||
return new FileState(file, src, iip);
|
||||
} else {
|
||||
// Case 3
|
||||
|
|
|
@ -45,7 +45,6 @@ import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
|
|||
import org.apache.hadoop.hdfs.protocol.LayoutVersion;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.RollingUpgradeStartupOption;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
|
||||
|
@ -953,7 +952,7 @@ public class FSEditLogLoader {
|
|||
if (pBlock != null) { // the penultimate block is not null
|
||||
Preconditions.checkState(oldBlocks != null && oldBlocks.length > 0);
|
||||
// compare pBlock with the last block of oldBlocks
|
||||
Block oldLastBlock = oldBlocks[oldBlocks.length - 1];
|
||||
BlockInfo oldLastBlock = oldBlocks[oldBlocks.length - 1];
|
||||
if (oldLastBlock.getBlockId() != pBlock.getBlockId()
|
||||
|| oldLastBlock.getGenerationStamp() != pBlock.getGenerationStamp()) {
|
||||
throw new IOException(
|
||||
|
@ -963,17 +962,18 @@ public class FSEditLogLoader {
|
|||
}
|
||||
|
||||
oldLastBlock.setNumBytes(pBlock.getNumBytes());
|
||||
if (oldLastBlock instanceof BlockInfoContiguousUnderConstruction) {
|
||||
fsNamesys.getBlockManager().forceCompleteBlock(file,
|
||||
(BlockInfoContiguousUnderConstruction) oldLastBlock);
|
||||
if (!oldLastBlock.isComplete()) {
|
||||
fsNamesys.getBlockManager().forceCompleteBlock(file, oldLastBlock);
|
||||
fsNamesys.getBlockManager().processQueuedMessagesForBlock(pBlock);
|
||||
}
|
||||
} else { // the penultimate block is null
|
||||
Preconditions.checkState(oldBlocks == null || oldBlocks.length == 0);
|
||||
}
|
||||
// add the new block
|
||||
BlockInfo newBI = new BlockInfoContiguousUnderConstruction(
|
||||
newBlock, file.getPreferredBlockReplication());
|
||||
BlockInfo newBI = new BlockInfoContiguous(newBlock,
|
||||
file.getPreferredBlockReplication());
|
||||
newBI.convertToBlockUnderConstruction(
|
||||
HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION, null);
|
||||
fsNamesys.getBlockManager().addBlockCollection(newBI, file);
|
||||
file.addBlock(newBI);
|
||||
fsNamesys.getBlockManager().processQueuedMessagesForBlock(newBlock);
|
||||
|
@ -1013,11 +1013,10 @@ public class FSEditLogLoader {
|
|||
oldBlock.getGenerationStamp() != newBlock.getGenerationStamp();
|
||||
oldBlock.setGenerationStamp(newBlock.getGenerationStamp());
|
||||
|
||||
if (oldBlock instanceof BlockInfoContiguousUnderConstruction &&
|
||||
if (!oldBlock.isComplete() &&
|
||||
(!isLastBlock || op.shouldCompleteLastBlock())) {
|
||||
changeMade = true;
|
||||
fsNamesys.getBlockManager().forceCompleteBlock(file,
|
||||
(BlockInfoContiguousUnderConstruction) oldBlock);
|
||||
fsNamesys.getBlockManager().forceCompleteBlock(file, oldBlock);
|
||||
}
|
||||
if (changeMade) {
|
||||
// The state or gen-stamp of the block has changed. So, we may be
|
||||
|
@ -1052,8 +1051,10 @@ public class FSEditLogLoader {
|
|||
// TODO: shouldn't this only be true for the last block?
|
||||
// what about an old-version fsync() where fsync isn't called
|
||||
// until several blocks in?
|
||||
newBI = new BlockInfoContiguousUnderConstruction(
|
||||
newBlock, file.getPreferredBlockReplication());
|
||||
newBI = new BlockInfoContiguous(newBlock,
|
||||
file.getPreferredBlockReplication());
|
||||
newBI.convertToBlockUnderConstruction(
|
||||
HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION, null);
|
||||
} else {
|
||||
// OP_CLOSE should add finalized blocks. This code path
|
||||
// is only executed when loading edits written by prior
|
||||
|
|
|
@ -54,7 +54,6 @@ import org.apache.hadoop.hdfs.protocol.LayoutVersion;
|
|||
import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
|
||||
|
@ -777,8 +776,8 @@ public class FSImageFormat {
|
|||
// convert the last block to BlockUC
|
||||
if (blocks.length > 0) {
|
||||
BlockInfo lastBlk = blocks[blocks.length - 1];
|
||||
blocks[blocks.length - 1] = new BlockInfoContiguousUnderConstruction(
|
||||
lastBlk, replication);
|
||||
lastBlk.convertToBlockUnderConstruction(
|
||||
HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION, null);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -44,8 +44,8 @@ import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
|
|||
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSImageFormatProtobuf.LoaderContext;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSImageFormatProtobuf.SaverContext;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FsImageProto.FileSummary;
|
||||
|
@ -362,9 +362,8 @@ public final class FSImageFormatPBINode {
|
|||
file.toUnderConstruction(uc.getClientName(), uc.getClientMachine());
|
||||
if (blocks.length > 0) {
|
||||
BlockInfo lastBlk = file.getLastBlock();
|
||||
// replace the last block of file
|
||||
file.setBlock(file.numBlocks() - 1, new BlockInfoContiguousUnderConstruction(
|
||||
lastBlk, replication));
|
||||
lastBlk.convertToBlockUnderConstruction(
|
||||
HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION, null);
|
||||
}
|
||||
}
|
||||
return file;
|
||||
|
|
|
@ -35,7 +35,6 @@ import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
|
|||
import org.apache.hadoop.hdfs.protocol.LayoutVersion;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
|
||||
import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotFSImageFormat;
|
||||
import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotFSImageFormat.ReferenceMap;
|
||||
|
@ -137,8 +136,9 @@ public class FSImageSerialization {
|
|||
// last block is UNDER_CONSTRUCTION
|
||||
if(numBlocks > 0) {
|
||||
blk.readFields(in);
|
||||
blocks[i] = new BlockInfoContiguousUnderConstruction(
|
||||
blk, blockReplication, BlockUCState.UNDER_CONSTRUCTION, null);
|
||||
blocks[i] = new BlockInfoContiguous(blk, blockReplication);
|
||||
blocks[i].convertToBlockUnderConstruction(BlockUCState.UNDER_CONSTRUCTION,
|
||||
null);
|
||||
}
|
||||
PermissionStatus perm = PermissionStatus.read(in);
|
||||
String clientName = readString(in);
|
||||
|
@ -180,9 +180,9 @@ public class FSImageSerialization {
|
|||
|
||||
/**
|
||||
* Serialize a {@link INodeFile} node
|
||||
* @param node The node to write
|
||||
* @param file The INodeFile to write
|
||||
* @param out The {@link DataOutputStream} where the fields are written
|
||||
* @param writeBlock Whether to write block information
|
||||
* @param writeUnderConstruction Whether to write under construction information
|
||||
*/
|
||||
public static void writeINodeFile(INodeFile file, DataOutput out,
|
||||
boolean writeUnderConstruction) throws IOException {
|
||||
|
|
|
@ -146,7 +146,6 @@ import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
|
|||
import org.apache.hadoop.fs.CacheFlag;
|
||||
import org.apache.hadoop.fs.ContentSummary;
|
||||
import org.apache.hadoop.fs.CreateFlag;
|
||||
import org.apache.hadoop.fs.FileAlreadyExistsException;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.FsServerDefaults;
|
||||
|
@ -203,8 +202,8 @@ import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretMan
|
|||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockIdManager;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockUnderConstructionFeature;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStatistics;
|
||||
|
@ -3056,8 +3055,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
BlockInfo penultimateBlock = pendingFile.getPenultimateBlock();
|
||||
|
||||
// If penultimate block doesn't exist then its minReplication is met
|
||||
boolean penultimateBlockMinReplication = penultimateBlock == null ? true :
|
||||
blockManager.checkMinReplication(penultimateBlock);
|
||||
boolean penultimateBlockMinReplication = penultimateBlock == null
|
||||
|| blockManager.checkMinReplication(penultimateBlock);
|
||||
|
||||
switch(lastBlockState) {
|
||||
case COMPLETE:
|
||||
|
@ -3086,24 +3085,25 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
throw new AlreadyBeingCreatedException(message);
|
||||
case UNDER_CONSTRUCTION:
|
||||
case UNDER_RECOVERY:
|
||||
final BlockInfoContiguousUnderConstruction uc = (BlockInfoContiguousUnderConstruction)lastBlock;
|
||||
BlockUnderConstructionFeature uc = lastBlock.getUnderConstructionFeature();
|
||||
// determine if last block was intended to be truncated
|
||||
Block recoveryBlock = uc.getTruncateBlock();
|
||||
boolean truncateRecovery = recoveryBlock != null;
|
||||
boolean copyOnTruncate = truncateRecovery &&
|
||||
recoveryBlock.getBlockId() != uc.getBlockId();
|
||||
recoveryBlock.getBlockId() != lastBlock.getBlockId();
|
||||
assert !copyOnTruncate ||
|
||||
recoveryBlock.getBlockId() < uc.getBlockId() &&
|
||||
recoveryBlock.getGenerationStamp() < uc.getGenerationStamp() &&
|
||||
recoveryBlock.getNumBytes() > uc.getNumBytes() :
|
||||
recoveryBlock.getBlockId() < lastBlock.getBlockId() &&
|
||||
recoveryBlock.getGenerationStamp() < lastBlock.getGenerationStamp() &&
|
||||
recoveryBlock.getNumBytes() > lastBlock.getNumBytes() :
|
||||
"wrong recoveryBlock";
|
||||
|
||||
// setup the last block locations from the blockManager if not known
|
||||
if (uc.getNumExpectedLocations() == 0) {
|
||||
uc.setExpectedLocations(blockManager.getStorages(lastBlock));
|
||||
uc.setExpectedLocations(lastBlock.getGenerationStamp(),
|
||||
blockManager.getStorages(lastBlock));
|
||||
}
|
||||
|
||||
if (uc.getNumExpectedLocations() == 0 && uc.getNumBytes() == 0) {
|
||||
if (uc.getNumExpectedLocations() == 0 && lastBlock.getNumBytes() == 0) {
|
||||
// There is no datanode reported to this block.
|
||||
// may be client have crashed before writing data to pipeline.
|
||||
// This blocks doesn't need any recovery.
|
||||
|
@ -3116,14 +3116,15 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
return true;
|
||||
}
|
||||
// start recovery of the last block for this file
|
||||
long blockRecoveryId = nextGenerationStamp(blockIdManager.isLegacyBlock(uc));
|
||||
long blockRecoveryId = nextGenerationStamp(
|
||||
blockIdManager.isLegacyBlock(lastBlock));
|
||||
lease = reassignLease(lease, src, recoveryLeaseHolder, pendingFile);
|
||||
if(copyOnTruncate) {
|
||||
uc.setGenerationStamp(blockRecoveryId);
|
||||
lastBlock.setGenerationStamp(blockRecoveryId);
|
||||
} else if(truncateRecovery) {
|
||||
recoveryBlock.setGenerationStamp(blockRecoveryId);
|
||||
}
|
||||
uc.initializeBlockRecovery(blockRecoveryId);
|
||||
uc.initializeBlockRecovery(lastBlock, blockRecoveryId);
|
||||
leaseManager.renewLease(lease);
|
||||
// Cannot close file right now, since the last block requires recovery.
|
||||
// This may potentially cause infinite loop in lease recovery
|
||||
|
@ -3202,7 +3203,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
}
|
||||
|
||||
@Override
|
||||
public boolean isInSnapshot(BlockInfoContiguousUnderConstruction blockUC) {
|
||||
public boolean isInSnapshot(BlockInfo blockUC) {
|
||||
assert hasReadLock();
|
||||
final BlockCollection bc = blockUC.getBlockCollection();
|
||||
if (bc == null || !(bc instanceof INodeFile)
|
||||
|
@ -3249,7 +3250,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
waitForLoadingFSImage();
|
||||
writeLock();
|
||||
boolean copyTruncate = false;
|
||||
BlockInfoContiguousUnderConstruction truncatedBlock = null;
|
||||
BlockInfo truncatedBlock = null;
|
||||
try {
|
||||
checkOperation(OperationCategory.WRITE);
|
||||
// If a DN tries to commit to the standby, the recovery will
|
||||
|
@ -3306,9 +3307,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
return;
|
||||
}
|
||||
|
||||
truncatedBlock = (BlockInfoContiguousUnderConstruction) iFile
|
||||
.getLastBlock();
|
||||
long recoveryId = truncatedBlock.getBlockRecoveryId();
|
||||
truncatedBlock = iFile.getLastBlock();
|
||||
long recoveryId = truncatedBlock.getUnderConstructionFeature()
|
||||
.getBlockRecoveryId();
|
||||
copyTruncate = truncatedBlock.getBlockId() != storedBlock.getBlockId();
|
||||
if(recoveryId != newgenerationstamp) {
|
||||
throw new IOException("The recovery id " + newgenerationstamp
|
||||
|
@ -3371,9 +3372,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
trimmedTargets.toArray(new DatanodeID[trimmedTargets.size()]),
|
||||
trimmedStorages.toArray(new String[trimmedStorages.size()]));
|
||||
if(copyTruncate) {
|
||||
iFile.setLastBlock(truncatedBlock, trimmedStorageInfos);
|
||||
iFile.convertLastBlockToUC(truncatedBlock, trimmedStorageInfos);
|
||||
} else {
|
||||
iFile.setLastBlock(storedBlock, trimmedStorageInfos);
|
||||
iFile.convertLastBlockToUC(storedBlock, trimmedStorageInfos);
|
||||
if (closeFile) {
|
||||
blockManager.markBlockReplicasAsCorrupt(storedBlock,
|
||||
oldGenerationStamp, oldNumBytes, trimmedStorageInfos);
|
||||
|
@ -5346,8 +5347,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
assert hasWriteLock();
|
||||
// check the vadility of the block and lease holder name
|
||||
final INodeFile pendingFile = checkUCBlock(oldBlock, clientName);
|
||||
final BlockInfoContiguousUnderConstruction blockinfo
|
||||
= (BlockInfoContiguousUnderConstruction)pendingFile.getLastBlock();
|
||||
final BlockInfo blockinfo = pendingFile.getLastBlock();
|
||||
assert !blockinfo.isComplete();
|
||||
|
||||
// check new GS & length: this is not expected
|
||||
if (newBlock.getGenerationStamp() <= blockinfo.getGenerationStamp() ||
|
||||
|
@ -5366,7 +5367,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
// find the DatanodeDescriptor objects
|
||||
final DatanodeStorageInfo[] storages = blockManager.getDatanodeManager()
|
||||
.getDatanodeStorageInfos(newNodes, newStorageIDs);
|
||||
blockinfo.setExpectedLocations(storages);
|
||||
blockinfo.getUnderConstructionFeature().setExpectedLocations(
|
||||
blockinfo.getGenerationStamp(), storages);
|
||||
|
||||
String src = pendingFile.getFullPathName();
|
||||
FSDirWriteFileOp.persistBlocks(dir, src, pendingFile, logRetryCache);
|
||||
|
|
|
@ -21,7 +21,6 @@ import java.io.IOException;
|
|||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction;
|
||||
import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
|
||||
|
||||
/**
|
||||
|
@ -61,7 +60,7 @@ public class FileUnderConstructionFeature implements INode.Feature {
|
|||
BlockInfo lastBlock = f.getLastBlock();
|
||||
assert (lastBlock != null) : "The last block for path "
|
||||
+ f.getFullPathName() + " is null when updating its length";
|
||||
assert (lastBlock instanceof BlockInfoContiguousUnderConstruction)
|
||||
assert !lastBlock.isComplete()
|
||||
: "The last block for path " + f.getFullPathName()
|
||||
+ " is not a BlockInfoUnderConstruction when updating its length";
|
||||
lastBlock.setNumBytes(lastBlockLength);
|
||||
|
@ -76,9 +75,8 @@ public class FileUnderConstructionFeature implements INode.Feature {
|
|||
final BlocksMapUpdateInfo collectedBlocks) {
|
||||
final BlockInfo[] blocks = f.getBlocks();
|
||||
if (blocks != null && blocks.length > 0
|
||||
&& blocks[blocks.length - 1] instanceof BlockInfoContiguousUnderConstruction) {
|
||||
BlockInfoContiguousUnderConstruction lastUC =
|
||||
(BlockInfoContiguousUnderConstruction) blocks[blocks.length - 1];
|
||||
&& !blocks[blocks.length - 1].isComplete()) {
|
||||
BlockInfo lastUC = blocks[blocks.length - 1];
|
||||
if (lastUC.getNumBytes() == 0) {
|
||||
// this is a 0-sized block. do not need check its UC state here
|
||||
collectedBlocks.addDeleteBlock(lastUC);
|
||||
|
|
|
@ -37,7 +37,6 @@ import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
|||
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
|
||||
|
@ -81,7 +80,7 @@ public class INodeFile extends INodeWithAdditionalFields
|
|||
* Bit format:
|
||||
* [4-bit storagePolicyID][12-bit replication][48-bit preferredBlockSize]
|
||||
*/
|
||||
static enum HeaderFormat {
|
||||
enum HeaderFormat {
|
||||
PREFERRED_BLOCK_SIZE(null, 48, 1),
|
||||
REPLICATION(PREFERRED_BLOCK_SIZE.BITS, 12, 1),
|
||||
STORAGE_POLICY_ID(REPLICATION.BITS, BlockStoragePolicySuite.ID_BIT_LENGTH,
|
||||
|
@ -231,27 +230,28 @@ public class INodeFile extends INodeWithAdditionalFields
|
|||
}
|
||||
|
||||
@Override // BlockCollection, the file should be under construction
|
||||
public BlockInfoContiguousUnderConstruction setLastBlock(
|
||||
BlockInfo lastBlock, DatanodeStorageInfo[] locations)
|
||||
throws IOException {
|
||||
public void convertLastBlockToUC(BlockInfo lastBlock,
|
||||
DatanodeStorageInfo[] locations) throws IOException {
|
||||
Preconditions.checkState(isUnderConstruction(),
|
||||
"file is no longer under construction");
|
||||
|
||||
if (numBlocks() == 0) {
|
||||
throw new IOException("Failed to set last block: File is empty.");
|
||||
}
|
||||
BlockInfoContiguousUnderConstruction ucBlock =
|
||||
lastBlock.convertToBlockUnderConstruction(
|
||||
BlockUCState.UNDER_CONSTRUCTION, locations);
|
||||
setBlock(numBlocks() - 1, ucBlock);
|
||||
return ucBlock;
|
||||
lastBlock.convertToBlockUnderConstruction(BlockUCState.UNDER_CONSTRUCTION,
|
||||
locations);
|
||||
}
|
||||
|
||||
void setLastBlock(BlockInfo blk) {
|
||||
blk.setBlockCollection(this);
|
||||
setBlock(numBlocks() - 1, blk);
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove a block from the block list. This block should be
|
||||
* the last one on the list.
|
||||
*/
|
||||
BlockInfoContiguousUnderConstruction removeLastBlock(Block oldblock) {
|
||||
BlockInfo removeLastBlock(Block oldblock) {
|
||||
Preconditions.checkState(isUnderConstruction(),
|
||||
"file is no longer under construction");
|
||||
if (blocks == null || blocks.length == 0) {
|
||||
|
@ -262,13 +262,12 @@ public class INodeFile extends INodeWithAdditionalFields
|
|||
return null;
|
||||
}
|
||||
|
||||
BlockInfoContiguousUnderConstruction uc =
|
||||
(BlockInfoContiguousUnderConstruction)blocks[size_1];
|
||||
BlockInfo ucBlock = blocks[size_1];
|
||||
//copy to a new list
|
||||
BlockInfo[] newlist = new BlockInfo[size_1];
|
||||
System.arraycopy(blocks, 0, newlist, 0, size_1);
|
||||
setBlocks(newlist);
|
||||
return uc;
|
||||
return ucBlock;
|
||||
}
|
||||
|
||||
/* End of Under-Construction Feature */
|
||||
|
@ -696,7 +695,7 @@ public class INodeFile extends INodeWithAdditionalFields
|
|||
final int last = blocks.length - 1;
|
||||
//check if the last block is BlockInfoUnderConstruction
|
||||
long size = blocks[last].getNumBytes();
|
||||
if (blocks[last] instanceof BlockInfoContiguousUnderConstruction) {
|
||||
if (!blocks[last].isComplete()) {
|
||||
if (!includesLastUcBlock) {
|
||||
size = 0;
|
||||
} else if (usePreferredBlockSize4LastUcBlock) {
|
||||
|
|
|
@ -19,7 +19,7 @@ package org.apache.hadoop.hdfs.server.namenode;
|
|||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
|
||||
import org.apache.hadoop.hdfs.util.RwLock;
|
||||
import org.apache.hadoop.ipc.StandbyException;
|
||||
|
@ -45,7 +45,7 @@ public interface Namesystem extends RwLock, SafeMode {
|
|||
|
||||
void checkOperation(OperationCategory read) throws StandbyException;
|
||||
|
||||
boolean isInSnapshot(BlockInfoContiguousUnderConstruction blockUC);
|
||||
boolean isInSnapshot(BlockInfo blockUC);
|
||||
|
||||
CacheManager getCacheManager();
|
||||
}
|
|
@ -22,7 +22,6 @@ import java.util.List;
|
|||
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
||||
import org.apache.hadoop.hdfs.server.namenode.INode;
|
||||
import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
|
||||
|
@ -133,7 +132,7 @@ public class FileDiffList extends
|
|||
Block dontRemoveBlock = null;
|
||||
if (lastBlock != null && lastBlock.getBlockUCState().equals(
|
||||
HdfsServerConstants.BlockUCState.UNDER_RECOVERY)) {
|
||||
dontRemoveBlock = ((BlockInfoContiguousUnderConstruction) lastBlock)
|
||||
dontRemoveBlock = lastBlock.getUnderConstructionFeature()
|
||||
.getTruncateBlock();
|
||||
}
|
||||
// Collect the remaining blocks of the file, ignoring truncate block
|
||||
|
|
|
@ -110,7 +110,6 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseP
|
|||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
||||
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
|
||||
|
@ -1644,13 +1643,11 @@ public class DFSTestUtil {
|
|||
BlockManager bm0 = nn.getNamesystem().getBlockManager();
|
||||
BlockInfo storedBlock = bm0.getStoredBlock(blk.getLocalBlock());
|
||||
assertTrue("Block " + blk + " should be under construction, " +
|
||||
"got: " + storedBlock,
|
||||
storedBlock instanceof BlockInfoContiguousUnderConstruction);
|
||||
BlockInfoContiguousUnderConstruction ucBlock =
|
||||
(BlockInfoContiguousUnderConstruction)storedBlock;
|
||||
"got: " + storedBlock, !storedBlock.isComplete());
|
||||
// We expect that the replica with the most recent heart beat will be
|
||||
// the one to be in charge of the synchronization / recovery protocol.
|
||||
final DatanodeStorageInfo[] storages = ucBlock.getExpectedStorageLocations();
|
||||
final DatanodeStorageInfo[] storages = storedBlock
|
||||
.getUnderConstructionFeature().getExpectedStorageLocations();
|
||||
DatanodeStorageInfo expectedPrimary = storages[0];
|
||||
long mostRecentLastUpdate = expectedPrimary.getDatanodeDescriptor()
|
||||
.getLastUpdateMonotonic();
|
||||
|
|
|
@ -23,7 +23,6 @@ import org.apache.hadoop.hdfs.DFSTestUtil;
|
|||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.server.common.GenerationStamp;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
|
@ -40,25 +39,24 @@ public class TestBlockInfoUnderConstruction {
|
|||
DatanodeDescriptor dd3 = s3.getDatanodeDescriptor();
|
||||
|
||||
dd1.isAlive = dd2.isAlive = dd3.isAlive = true;
|
||||
BlockInfoContiguousUnderConstruction blockInfo = new BlockInfoContiguousUnderConstruction(
|
||||
new Block(0, 0, GenerationStamp.LAST_RESERVED_STAMP),
|
||||
(short) 3,
|
||||
BlockUCState.UNDER_CONSTRUCTION,
|
||||
BlockInfoContiguous blockInfo = new BlockInfoContiguous(
|
||||
new Block(0, 0, GenerationStamp.LAST_RESERVED_STAMP), (short) 3);
|
||||
blockInfo.convertToBlockUnderConstruction(BlockUCState.UNDER_CONSTRUCTION,
|
||||
new DatanodeStorageInfo[] {s1, s2, s3});
|
||||
|
||||
// Recovery attempt #1.
|
||||
DFSTestUtil.resetLastUpdatesWithOffset(dd1, -3 * 1000);
|
||||
DFSTestUtil.resetLastUpdatesWithOffset(dd2, -1 * 1000);
|
||||
DFSTestUtil.resetLastUpdatesWithOffset(dd3, -2 * 1000);
|
||||
blockInfo.initializeBlockRecovery(1);
|
||||
BlockInfoContiguousUnderConstruction[] blockInfoRecovery = dd2.getLeaseRecoveryCommand(1);
|
||||
blockInfo.getUnderConstructionFeature().initializeBlockRecovery(blockInfo, 1);
|
||||
BlockInfo[] blockInfoRecovery = dd2.getLeaseRecoveryCommand(1);
|
||||
assertEquals(blockInfoRecovery[0], blockInfo);
|
||||
|
||||
// Recovery attempt #2.
|
||||
DFSTestUtil.resetLastUpdatesWithOffset(dd1, -2 * 1000);
|
||||
DFSTestUtil.resetLastUpdatesWithOffset(dd2, -1 * 1000);
|
||||
DFSTestUtil.resetLastUpdatesWithOffset(dd3, -3 * 1000);
|
||||
blockInfo.initializeBlockRecovery(2);
|
||||
blockInfo.getUnderConstructionFeature().initializeBlockRecovery(blockInfo, 2);
|
||||
blockInfoRecovery = dd1.getLeaseRecoveryCommand(1);
|
||||
assertEquals(blockInfoRecovery[0], blockInfo);
|
||||
|
||||
|
@ -66,7 +64,7 @@ public class TestBlockInfoUnderConstruction {
|
|||
DFSTestUtil.resetLastUpdatesWithOffset(dd1, -2 * 1000);
|
||||
DFSTestUtil.resetLastUpdatesWithOffset(dd2, -1 * 1000);
|
||||
DFSTestUtil.resetLastUpdatesWithOffset(dd3, -3 * 1000);
|
||||
blockInfo.initializeBlockRecovery(3);
|
||||
blockInfo.getUnderConstructionFeature().initializeBlockRecovery(blockInfo, 3);
|
||||
blockInfoRecovery = dd3.getLeaseRecoveryCommand(1);
|
||||
assertEquals(blockInfoRecovery[0], blockInfo);
|
||||
|
||||
|
@ -75,7 +73,7 @@ public class TestBlockInfoUnderConstruction {
|
|||
DFSTestUtil.resetLastUpdatesWithOffset(dd1, -2 * 1000);
|
||||
DFSTestUtil.resetLastUpdatesWithOffset(dd2, -1 * 1000);
|
||||
DFSTestUtil.resetLastUpdatesWithOffset(dd3, 0);
|
||||
blockInfo.initializeBlockRecovery(3);
|
||||
blockInfo.getUnderConstructionFeature().initializeBlockRecovery(blockInfo, 3);
|
||||
blockInfoRecovery = dd3.getLeaseRecoveryCommand(1);
|
||||
assertEquals(blockInfoRecovery[0], blockInfo);
|
||||
}
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
*/
|
||||
package org.apache.hadoop.hdfs.server.blockmanagement;
|
||||
|
||||
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
|
@ -726,8 +727,8 @@ public class TestBlockManager {
|
|||
// verify the storage info is correct
|
||||
assertTrue(bm.getStoredBlock(new Block(receivedBlockId)).findStorageInfo
|
||||
(ds) >= 0);
|
||||
assertTrue(((BlockInfoContiguousUnderConstruction) bm.
|
||||
getStoredBlock(new Block(receivingBlockId))).getNumExpectedLocations() > 0);
|
||||
assertTrue(bm.getStoredBlock(new Block(receivingBlockId))
|
||||
.getUnderConstructionFeature().getNumExpectedLocations() > 0);
|
||||
assertTrue(bm.getStoredBlock(new Block(receivingReceivedBlockId))
|
||||
.findStorageInfo(ds) >= 0);
|
||||
assertNull(bm.getStoredBlock(new Block(ReceivedDeletedBlockId)));
|
||||
|
@ -747,8 +748,8 @@ public class TestBlockManager {
|
|||
|
||||
private BlockInfo addUcBlockToBM(long blkId) {
|
||||
Block block = new Block(blkId);
|
||||
BlockInfoContiguousUnderConstruction blockInfo =
|
||||
new BlockInfoContiguousUnderConstruction(block, (short) 3);
|
||||
BlockInfo blockInfo = new BlockInfoContiguous(block, (short) 3);
|
||||
blockInfo.convertToBlockUnderConstruction(UNDER_CONSTRUCTION, null);
|
||||
BlockCollection bc = Mockito.mock(BlockCollection.class);
|
||||
Mockito.doReturn((short) 3).when(bc).getPreferredBlockReplication();
|
||||
bm.blocksMap.addBlockCollection(blockInfo, bc);
|
||||
|
|
|
@ -39,7 +39,6 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
|
|||
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
|
@ -173,9 +172,10 @@ public class TestHeartbeatHandling {
|
|||
dd1.getStorageInfos()[0],
|
||||
dd2.getStorageInfos()[0],
|
||||
dd3.getStorageInfos()[0]};
|
||||
BlockInfoContiguousUnderConstruction blockInfo = new BlockInfoContiguousUnderConstruction(
|
||||
new Block(0, 0, GenerationStamp.LAST_RESERVED_STAMP), (short) 3,
|
||||
BlockUCState.UNDER_RECOVERY, storages);
|
||||
BlockInfo blockInfo = new BlockInfoContiguous(
|
||||
new Block(0, 0, GenerationStamp.LAST_RESERVED_STAMP), (short) 3);
|
||||
blockInfo.convertToBlockUnderConstruction(BlockUCState.UNDER_RECOVERY,
|
||||
storages);
|
||||
dd1.addBlockToBeRecovered(blockInfo);
|
||||
DatanodeCommand[] cmds =
|
||||
NameNodeAdapter.sendHeartBeat(nodeReg1, dd1, namesystem).getCommands();
|
||||
|
@ -195,9 +195,10 @@ public class TestHeartbeatHandling {
|
|||
// More than the default stale interval of 30 seconds.
|
||||
DFSTestUtil.resetLastUpdatesWithOffset(dd2, -40 * 1000);
|
||||
DFSTestUtil.resetLastUpdatesWithOffset(dd3, 0);
|
||||
blockInfo = new BlockInfoContiguousUnderConstruction(
|
||||
new Block(0, 0, GenerationStamp.LAST_RESERVED_STAMP), (short) 3,
|
||||
BlockUCState.UNDER_RECOVERY, storages);
|
||||
blockInfo = new BlockInfoContiguous(
|
||||
new Block(0, 0, GenerationStamp.LAST_RESERVED_STAMP), (short) 3);
|
||||
blockInfo.convertToBlockUnderConstruction(BlockUCState.UNDER_RECOVERY,
|
||||
storages);
|
||||
dd1.addBlockToBeRecovered(blockInfo);
|
||||
cmds = NameNodeAdapter.sendHeartBeat(nodeReg1, dd1, namesystem).getCommands();
|
||||
assertEquals(1, cmds.length);
|
||||
|
@ -216,9 +217,10 @@ public class TestHeartbeatHandling {
|
|||
// More than the default stale interval of 30 seconds.
|
||||
DFSTestUtil.resetLastUpdatesWithOffset(dd2, - 40 * 1000);
|
||||
DFSTestUtil.resetLastUpdatesWithOffset(dd3, - 80 * 1000);
|
||||
blockInfo = new BlockInfoContiguousUnderConstruction(
|
||||
new Block(0, 0, GenerationStamp.LAST_RESERVED_STAMP), (short) 3,
|
||||
BlockUCState.UNDER_RECOVERY, storages);
|
||||
blockInfo = new BlockInfoContiguous(
|
||||
new Block(0, 0, GenerationStamp.LAST_RESERVED_STAMP), (short) 3);
|
||||
blockInfo.convertToBlockUnderConstruction(BlockUCState.UNDER_RECOVERY,
|
||||
storages);
|
||||
dd1.addBlockToBeRecovered(blockInfo);
|
||||
cmds = NameNodeAdapter.sendHeartBeat(nodeReg1, dd1, namesystem).getCommands();
|
||||
assertEquals(1, cmds.length);
|
||||
|
|
|
@ -1182,7 +1182,8 @@ public class TestReplicationPolicy {
|
|||
// block under construction, the BlockManager will realize the expected
|
||||
// replication has been achieved and remove it from the under-replicated
|
||||
// queue.
|
||||
BlockInfoContiguousUnderConstruction info = new BlockInfoContiguousUnderConstruction(block1, (short) 1);
|
||||
BlockInfoContiguous info = new BlockInfoContiguous(block1, (short) 1);
|
||||
info.convertToBlockUnderConstruction(BlockUCState.UNDER_CONSTRUCTION, null);
|
||||
BlockCollection bc = mock(BlockCollection.class);
|
||||
when(bc.getPreferredBlockReplication()).thenReturn((short)1);
|
||||
bm.addBlockCollection(info, bc);
|
||||
|
@ -1238,9 +1239,8 @@ public class TestReplicationPolicy {
|
|||
|
||||
DatanodeStorageInfo[] storageAry = {new DatanodeStorageInfo(
|
||||
dataNodes[0], new DatanodeStorage("s1"))};
|
||||
final BlockInfoContiguousUnderConstruction ucBlock =
|
||||
info.convertToBlockUnderConstruction(BlockUCState.UNDER_CONSTRUCTION,
|
||||
storageAry);
|
||||
info.convertToBlockUnderConstruction(BlockUCState.UNDER_CONSTRUCTION,
|
||||
storageAry);
|
||||
DatanodeStorageInfo storage = mock(DatanodeStorageInfo.class);
|
||||
DatanodeDescriptor dn = mock(DatanodeDescriptor.class);
|
||||
when(dn.isDecommissioned()).thenReturn(true);
|
||||
|
@ -1249,10 +1249,9 @@ public class TestReplicationPolicy {
|
|||
when(storage.removeBlock(any(BlockInfo.class))).thenReturn(true);
|
||||
when(storage.addBlock(any(BlockInfo.class))).thenReturn
|
||||
(DatanodeStorageInfo.AddBlockResult.ADDED);
|
||||
ucBlock.addStorage(storage);
|
||||
info.addStorage(storage);
|
||||
|
||||
when(mbc.setLastBlock((BlockInfo) any(), (DatanodeStorageInfo[]) any()))
|
||||
.thenReturn(ucBlock);
|
||||
when(mbc.getLastBlock()).thenReturn(info);
|
||||
|
||||
bm.convertLastBlockToUnderConstruction(mbc, 0L);
|
||||
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
package org.apache.hadoop.hdfs.server.namenode;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -36,7 +37,7 @@ import org.apache.hadoop.hdfs.protocol.Block;
|
|||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
|
||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
||||
import org.junit.AfterClass;
|
||||
|
@ -156,6 +157,7 @@ public class TestBlockUnderConstruction {
|
|||
@Test
|
||||
public void testGetBlockLocations() throws IOException {
|
||||
final NamenodeProtocols namenode = cluster.getNameNodeRpc();
|
||||
final BlockManager blockManager = cluster.getNamesystem().getBlockManager();
|
||||
final Path p = new Path(BASE_DIR, "file2.dat");
|
||||
final String src = p.toString();
|
||||
final FSDataOutputStream out = TestFileCreation.createFile(hdfs, p, 3);
|
||||
|
@ -170,7 +172,7 @@ public class TestBlockUnderConstruction {
|
|||
final List<LocatedBlock> blocks = lb.getLocatedBlocks();
|
||||
assertEquals(i, blocks.size());
|
||||
final Block b = blocks.get(blocks.size() - 1).getBlock().getLocalBlock();
|
||||
assertTrue(b instanceof BlockInfoContiguousUnderConstruction);
|
||||
assertFalse(blockManager.getStoredBlock(b).isComplete());
|
||||
|
||||
if (++i < NUM_BLOCKS) {
|
||||
// write one more block
|
||||
|
|
|
@ -24,7 +24,6 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
|||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
||||
import org.junit.Test;
|
||||
|
@ -68,11 +67,13 @@ public class TestCommitBlockSynchronization {
|
|||
namesystem.dir.getINodeMap().put(file);
|
||||
|
||||
FSNamesystem namesystemSpy = spy(namesystem);
|
||||
BlockInfoContiguousUnderConstruction blockInfo = new BlockInfoContiguousUnderConstruction(
|
||||
block, (short) 1, HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION, targets);
|
||||
BlockInfo blockInfo = new BlockInfoContiguous(block, (short) 1);
|
||||
blockInfo.convertToBlockUnderConstruction(
|
||||
HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION, targets);
|
||||
blockInfo.setBlockCollection(file);
|
||||
blockInfo.setGenerationStamp(genStamp);
|
||||
blockInfo.initializeBlockRecovery(genStamp);
|
||||
blockInfo.getUnderConstructionFeature().initializeBlockRecovery(blockInfo,
|
||||
genStamp);
|
||||
doReturn(blockInfo).when(file).removeLastBlock(any(Block.class));
|
||||
doReturn(true).when(file).isUnderConstruction();
|
||||
doReturn(new BlockInfoContiguous[1]).when(file).getBlocks();
|
||||
|
|
|
@ -54,7 +54,6 @@ import org.apache.hadoop.hdfs.protocol.Block;
|
|||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
|
@ -1016,7 +1015,7 @@ public class TestFileTruncate {
|
|||
is(fsn.getBlockIdManager().getGenerationStampV2()));
|
||||
assertThat(file.getLastBlock().getBlockUCState(),
|
||||
is(HdfsServerConstants.BlockUCState.UNDER_RECOVERY));
|
||||
long blockRecoveryId = ((BlockInfoContiguousUnderConstruction) file.getLastBlock())
|
||||
long blockRecoveryId = file.getLastBlock().getUnderConstructionFeature()
|
||||
.getBlockRecoveryId();
|
||||
assertThat(blockRecoveryId, is(initialGenStamp + 1));
|
||||
fsn.getEditLog().logTruncate(
|
||||
|
@ -1049,7 +1048,7 @@ public class TestFileTruncate {
|
|||
is(fsn.getBlockIdManager().getGenerationStampV2()));
|
||||
assertThat(file.getLastBlock().getBlockUCState(),
|
||||
is(HdfsServerConstants.BlockUCState.UNDER_RECOVERY));
|
||||
long blockRecoveryId = ((BlockInfoContiguousUnderConstruction) file.getLastBlock())
|
||||
long blockRecoveryId = file.getLastBlock().getUnderConstructionFeature()
|
||||
.getBlockRecoveryId();
|
||||
assertThat(blockRecoveryId, is(initialGenStamp + 1));
|
||||
fsn.getEditLog().logTruncate(
|
||||
|
|
|
@ -72,7 +72,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
|||
import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||
import org.apache.hadoop.hdfs.server.namenode.INodeFile;
|
||||
import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper;
|
||||
|
@ -752,12 +752,13 @@ public class TestRetryCacheWithHA {
|
|||
boolean checkNamenodeBeforeReturn() throws Exception {
|
||||
INodeFile fileNode = cluster.getNamesystem(0).getFSDirectory()
|
||||
.getINode4Write(file).asFile();
|
||||
BlockInfoContiguousUnderConstruction blkUC =
|
||||
(BlockInfoContiguousUnderConstruction) (fileNode.getBlocks())[1];
|
||||
int datanodeNum = blkUC.getExpectedStorageLocations().length;
|
||||
BlockInfo blkUC = (fileNode.getBlocks())[1];
|
||||
int datanodeNum = blkUC.getUnderConstructionFeature()
|
||||
.getExpectedStorageLocations().length;
|
||||
for (int i = 0; i < CHECKTIMES && datanodeNum != 2; i++) {
|
||||
Thread.sleep(1000);
|
||||
datanodeNum = blkUC.getExpectedStorageLocations().length;
|
||||
datanodeNum = blkUC.getUnderConstructionFeature()
|
||||
.getExpectedStorageLocations().length;
|
||||
}
|
||||
return datanodeNum == 2;
|
||||
}
|
||||
|
|
|
@ -44,8 +44,8 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
|
|||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockUnderConstructionFeature;
|
||||
import org.apache.hadoop.hdfs.server.datanode.BlockPoolSliceStorage;
|
||||
import org.apache.hadoop.hdfs.server.datanode.BlockScanner;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||
|
@ -177,8 +177,7 @@ public class SnapshotTestHelper {
|
|||
* Specific information for different types of INode:
|
||||
* {@link INodeDirectory}:childrenSize
|
||||
* {@link INodeFile}: fileSize, block list. Check {@link BlockInfo#toString()}
|
||||
* and {@link BlockInfoContiguousUnderConstruction#toString()} for detailed information.
|
||||
* {@link FileWithSnapshot}: next link
|
||||
* and {@link BlockUnderConstructionFeature#toString()} for detailed information.
|
||||
* </pre>
|
||||
* @see INode#dumpTreeRecursively()
|
||||
*/
|
||||
|
|
Loading…
Reference in New Issue