HDFS-8823. Move replication factor into individual blocks. Contributed by Haohui Mai.

This commit is contained in:
Haohui Mai 2015-08-22 00:09:40 -07:00
parent 7087e700e0
commit 745d04be59
26 changed files with 256 additions and 156 deletions

View File

@ -824,7 +824,9 @@ Release 2.8.0 - UNRELEASED
HDFS-8828. Utilize Snapshot diff report to build diff copy list in distcp. HDFS-8828. Utilize Snapshot diff report to build diff copy list in distcp.
(Yufei Gu via Yongjun Zhang) (Yufei Gu via Yongjun Zhang)
HDFS-8823. Move replication factor into individual blocks. (wheat9)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

View File

@ -55,12 +55,6 @@ public interface BlockCollection {
public long getPreferredBlockSize(); public long getPreferredBlockSize();
/** /**
* Get block replication for the collection
* @return block replication value
*/
public short getPreferredBlockReplication();
/**
* @return the storage policy ID. * @return the storage policy ID.
*/ */
public byte getStoragePolicyID(); public byte getStoragePolicyID();

View File

@ -40,9 +40,14 @@ public abstract class BlockInfo extends Block
public static final BlockInfo[] EMPTY_ARRAY = {}; public static final BlockInfo[] EMPTY_ARRAY = {};
/**
* Replication factor
*/
private short replication;
private BlockCollection bc; private BlockCollection bc;
/** For implementing {@link LightWeightGSet.LinkedElement} interface */ /** For implementing {@link LightWeightGSet.LinkedElement} interface. */
private LightWeightGSet.LinkedElement nextLinkedElement; private LightWeightGSet.LinkedElement nextLinkedElement;
/** /**
@ -68,12 +73,14 @@ public abstract class BlockInfo extends Block
public BlockInfo(short replication) { public BlockInfo(short replication) {
this.triplets = new Object[3*replication]; this.triplets = new Object[3*replication];
this.bc = null; this.bc = null;
this.replication = replication;
} }
public BlockInfo(Block blk, short replication) { public BlockInfo(Block blk, short replication) {
super(blk); super(blk);
this.triplets = new Object[3*replication]; this.triplets = new Object[3*replication];
this.bc = null; this.bc = null;
this.replication = replication;
} }
/** /**
@ -81,11 +88,18 @@ public abstract class BlockInfo extends Block
* @param from BlockInfo to copy from. * @param from BlockInfo to copy from.
*/ */
protected BlockInfo(BlockInfo from) { protected BlockInfo(BlockInfo from) {
super(from); this(from, from.getReplication());
this.triplets = new Object[from.triplets.length];
this.bc = from.bc; this.bc = from.bc;
} }
public short getReplication() {
return replication;
}
public void setReplication(short repl) {
this.replication = repl;
}
public BlockCollection getBlockCollection() { public BlockCollection getBlockCollection() {
return bc; return bc;
} }

View File

@ -1187,8 +1187,7 @@ public class BlockManager implements BlockStatsMXBean {
addToInvalidates(b.corrupted, node); addToInvalidates(b.corrupted, node);
return; return;
} }
short expectedReplicas = short expectedReplicas = b.corrupted.getReplication();
b.corrupted.getBlockCollection().getPreferredBlockReplication();
// Add replica to the data-node if it is not already there // Add replica to the data-node if it is not already there
if (storageInfo != null) { if (storageInfo != null) {
@ -1363,7 +1362,7 @@ public class BlockManager implements BlockStatsMXBean {
continue; continue;
} }
requiredReplication = bc.getPreferredBlockReplication(); requiredReplication = getExpectedReplicaNum(block);
// get a source data-node // get a source data-node
containingNodes = new ArrayList<DatanodeDescriptor>(); containingNodes = new ArrayList<DatanodeDescriptor>();
@ -1447,7 +1446,7 @@ public class BlockManager implements BlockStatsMXBean {
rw.targets = null; rw.targets = null;
continue; continue;
} }
requiredReplication = bc.getPreferredBlockReplication(); requiredReplication = getExpectedReplicaNum(block);
// do not schedule more if enough replicas is already pending // do not schedule more if enough replicas is already pending
NumberReplicas numReplicas = countNodes(block); NumberReplicas numReplicas = countNodes(block);
@ -1712,7 +1711,7 @@ public class BlockManager implements BlockStatsMXBean {
continue; continue;
} }
NumberReplicas num = countNodes(timedOutItems[i]); NumberReplicas num = countNodes(timedOutItems[i]);
if (isNeededReplication(bi, getReplication(bi), num.liveReplicas())) { if (isNeededReplication(bi, num.liveReplicas())) {
neededReplications.add(bi, num.liveReplicas(), neededReplications.add(bi, num.liveReplicas(),
num.decommissionedAndDecommissioning(), getReplication(bi)); num.decommissionedAndDecommissioning(), getReplication(bi));
} }
@ -2637,8 +2636,8 @@ public class BlockManager implements BlockStatsMXBean {
} }
// handle underReplication/overReplication // handle underReplication/overReplication
short fileReplication = bc.getPreferredBlockReplication(); short fileReplication = getExpectedReplicaNum(storedBlock);
if (!isNeededReplication(storedBlock, fileReplication, numCurrentReplica)) { if (!isNeededReplication(storedBlock, numCurrentReplica)) {
neededReplications.remove(storedBlock, numCurrentReplica, neededReplications.remove(storedBlock, numCurrentReplica,
num.decommissionedAndDecommissioning(), fileReplication); num.decommissionedAndDecommissioning(), fileReplication);
} else { } else {
@ -2867,12 +2866,11 @@ public class BlockManager implements BlockStatsMXBean {
return MisReplicationResult.UNDER_CONSTRUCTION; return MisReplicationResult.UNDER_CONSTRUCTION;
} }
// calculate current replication // calculate current replication
short expectedReplication = short expectedReplication = getExpectedReplicaNum(block);
block.getBlockCollection().getPreferredBlockReplication();
NumberReplicas num = countNodes(block); NumberReplicas num = countNodes(block);
int numCurrentReplica = num.liveReplicas(); int numCurrentReplica = num.liveReplicas();
// add to under-replicated queue if need to be // add to under-replicated queue if need to be
if (isNeededReplication(block, expectedReplication, numCurrentReplica)) { if (isNeededReplication(block, numCurrentReplica)) {
if (neededReplications.add(block, numCurrentReplica, num if (neededReplications.add(block, numCurrentReplica, num
.decommissionedAndDecommissioning(), expectedReplication)) { .decommissionedAndDecommissioning(), expectedReplication)) {
return MisReplicationResult.UNDER_REPLICATED; return MisReplicationResult.UNDER_REPLICATED;
@ -2898,27 +2896,18 @@ public class BlockManager implements BlockStatsMXBean {
} }
/** Set replication for the blocks. */ /** Set replication for the blocks. */
public void setReplication(final short oldRepl, final short newRepl, public void setReplication(
final String src, final BlockInfo... blocks) { final short oldRepl, final short newRepl, final BlockInfo b) {
if (newRepl == oldRepl) { if (newRepl == oldRepl) {
return; return;
} }
// update needReplication priority queues // update needReplication priority queues
for(BlockInfo b : blocks) { b.setReplication(newRepl);
updateNeededReplications(b, 0, newRepl-oldRepl); updateNeededReplications(b, 0, newRepl - oldRepl);
}
if (oldRepl > newRepl) { if (oldRepl > newRepl) {
// old replication > the new one; need to remove copies processOverReplicatedBlock(b, newRepl, null, null);
LOG.info("Decreasing replication from " + oldRepl + " to " + newRepl
+ " for " + src);
for(BlockInfo b : blocks) {
processOverReplicatedBlock(b, newRepl, null, null);
}
} else { // replication factor is increased
LOG.info("Increasing replication from " + oldRepl + " to " + newRepl
+ " for " + src);
} }
} }
@ -3385,8 +3374,7 @@ public class BlockManager implements BlockStatsMXBean {
int numOverReplicated = 0; int numOverReplicated = 0;
while(it.hasNext()) { while(it.hasNext()) {
final BlockInfo block = it.next(); final BlockInfo block = it.next();
BlockCollection bc = blocksMap.getBlockCollection(block); short expectedReplication = block.getReplication();
short expectedReplication = bc.getPreferredBlockReplication();
NumberReplicas num = countNodes(block); NumberReplicas num = countNodes(block);
int numCurrentReplica = num.liveReplicas(); int numCurrentReplica = num.liveReplicas();
if (numCurrentReplica > expectedReplication) { if (numCurrentReplica > expectedReplication) {
@ -3478,7 +3466,7 @@ public class BlockManager implements BlockStatsMXBean {
} }
NumberReplicas repl = countNodes(block); NumberReplicas repl = countNodes(block);
int curExpectedReplicas = getReplication(block); int curExpectedReplicas = getReplication(block);
if (isNeededReplication(block, curExpectedReplicas, repl.liveReplicas())) { if (isNeededReplication(block, repl.liveReplicas())) {
neededReplications.update(block, repl.liveReplicas(), repl neededReplications.update(block, repl.liveReplicas(), repl
.decommissionedAndDecommissioning(), curExpectedReplicas, .decommissionedAndDecommissioning(), curExpectedReplicas,
curReplicasDelta, expectedReplicasDelta); curReplicasDelta, expectedReplicasDelta);
@ -3500,10 +3488,10 @@ public class BlockManager implements BlockStatsMXBean {
* process it as an over replicated block. * process it as an over replicated block.
*/ */
public void checkReplication(BlockCollection bc) { public void checkReplication(BlockCollection bc) {
final short expected = bc.getPreferredBlockReplication();
for (BlockInfo block : bc.getBlocks()) { for (BlockInfo block : bc.getBlocks()) {
final short expected = block.getReplication();
final NumberReplicas n = countNodes(block); final NumberReplicas n = countNodes(block);
if (isNeededReplication(block, expected, n.liveReplicas())) { if (isNeededReplication(block, n.liveReplicas())) {
neededReplications.add(block, n.liveReplicas(), neededReplications.add(block, n.liveReplicas(),
n.decommissionedAndDecommissioning(), expected); n.decommissionedAndDecommissioning(), expected);
} else if (n.liveReplicas() > expected) { } else if (n.liveReplicas() > expected) {
@ -3535,12 +3523,10 @@ public class BlockManager implements BlockStatsMXBean {
* @return 0 if the block is not found; * @return 0 if the block is not found;
* otherwise, return the replication factor of the block. * otherwise, return the replication factor of the block.
*/ */
private int getReplication(Block block) { private int getReplication(BlockInfo block) {
final BlockCollection bc = blocksMap.getBlockCollection(block); return getExpectedReplicaNum(block);
return bc == null? 0: bc.getPreferredBlockReplication();
} }
/** /**
* Get blocks to invalidate for <i>nodeId</i> * Get blocks to invalidate for <i>nodeId</i>
* in {@link #invalidateBlocks}. * in {@link #invalidateBlocks}.
@ -3581,7 +3567,7 @@ public class BlockManager implements BlockStatsMXBean {
return toInvalidate.size(); return toInvalidate.size();
} }
boolean blockHasEnoughRacks(Block b) { boolean blockHasEnoughRacks(BlockInfo b) {
if (!this.shouldCheckForEnoughRacks) { if (!this.shouldCheckForEnoughRacks) {
return true; return true;
} }
@ -3617,8 +3603,13 @@ public class BlockManager implements BlockStatsMXBean {
* A block needs replication if the number of replicas is less than expected * A block needs replication if the number of replicas is less than expected
* or if it does not have enough racks. * or if it does not have enough racks.
*/ */
boolean isNeededReplication(Block b, int expected, int current) { boolean isNeededReplication(BlockInfo storedBlock, int current) {
return current < expected || !blockHasEnoughRacks(b); int expected = storedBlock.getReplication();
return current < expected || !blockHasEnoughRacks(storedBlock);
}
public short getExpectedReplicaNum(BlockInfo block) {
return block.getReplication();
} }
public long getMissingBlocksCount() { public long getMissingBlocksCount() {

View File

@ -36,7 +36,6 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.server.namenode.Namesystem; import org.apache.hadoop.hdfs.server.namenode.Namesystem;
import org.apache.hadoop.hdfs.util.CyclicIteration; import org.apache.hadoop.hdfs.util.CyclicIteration;
@ -241,9 +240,9 @@ public class DecommissionManager {
private boolean isSufficientlyReplicated(BlockInfo block, private boolean isSufficientlyReplicated(BlockInfo block,
BlockCollection bc, BlockCollection bc,
NumberReplicas numberReplicas) { NumberReplicas numberReplicas) {
final int numExpected = bc.getPreferredBlockReplication(); final int numExpected = block.getReplication();
final int numLive = numberReplicas.liveReplicas(); final int numLive = numberReplicas.liveReplicas();
if (!blockManager.isNeededReplication(block, numExpected, numLive)) { if (!blockManager.isNeededReplication(block, numLive)) {
// Block doesn't need replication. Skip. // Block doesn't need replication. Skip.
LOG.trace("Block {} does not need replication.", block); LOG.trace("Block {} does not need replication.", block);
return true; return true;
@ -274,11 +273,12 @@ public class DecommissionManager {
return false; return false;
} }
private static void logBlockReplicationInfo(Block block, BlockCollection bc, private static void logBlockReplicationInfo(BlockInfo block,
BlockCollection bc,
DatanodeDescriptor srcNode, NumberReplicas num, DatanodeDescriptor srcNode, NumberReplicas num,
Iterable<DatanodeStorageInfo> storages) { Iterable<DatanodeStorageInfo> storages) {
int curReplicas = num.liveReplicas(); int curReplicas = num.liveReplicas();
int curExpectedReplicas = bc.getPreferredBlockReplication(); int curExpectedReplicas = block.getReplication();
StringBuilder nodeList = new StringBuilder(); StringBuilder nodeList = new StringBuilder();
for (DatanodeStorageInfo storage : storages) { for (DatanodeStorageInfo storage : storages) {
final DatanodeDescriptor node = storage.getDatanodeDescriptor(); final DatanodeDescriptor node = storage.getDatanodeDescriptor();
@ -540,8 +540,7 @@ public class DecommissionManager {
// Schedule under-replicated blocks for replication if not already // Schedule under-replicated blocks for replication if not already
// pending // pending
if (blockManager.isNeededReplication(block, if (blockManager.isNeededReplication(block, liveReplicas)) {
bc.getPreferredBlockReplication(), liveReplicas)) {
if (!blockManager.neededReplications.contains(block) && if (!blockManager.neededReplications.contains(block) &&
blockManager.pendingReplications.getNumReplicas(block) == 0 && blockManager.pendingReplications.getNumReplicas(block) == 0 &&
namesystem.isPopulatingReplQueues()) { namesystem.isPopulatingReplQueues()) {
@ -549,7 +548,7 @@ public class DecommissionManager {
blockManager.neededReplications.add(block, blockManager.neededReplications.add(block,
curReplicas, curReplicas,
num.decommissionedAndDecommissioning(), num.decommissionedAndDecommissioning(),
bc.getPreferredBlockReplication()); block.getReplication());
} }
} }

View File

@ -245,7 +245,7 @@ final class FSDirAppendOp {
final BlockInfo lastBlock = file.getLastBlock(); final BlockInfo lastBlock = file.getLastBlock();
if (lastBlock != null) { if (lastBlock != null) {
final long diff = file.getPreferredBlockSize() - lastBlock.getNumBytes(); final long diff = file.getPreferredBlockSize() - lastBlock.getNumBytes();
final short repl = file.getPreferredBlockReplication(); final short repl = lastBlock.getReplication();
delta.addStorageSpace(diff * repl); delta.addStorageSpace(diff * repl);
final BlockStoragePolicy policy = fsn.getFSDirectory() final BlockStoragePolicy policy = fsn.getFSDirectory()
.getBlockStoragePolicySuite().getPolicy(file.getStoragePolicyID()); .getBlockStoragePolicySuite().getPolicy(file.getStoragePolicyID());

View File

@ -147,13 +147,11 @@ public class FSDirAttrOp {
fsd.checkPathAccess(pc, iip, FsAction.WRITE); fsd.checkPathAccess(pc, iip, FsAction.WRITE);
} }
final short[] blockRepls = new short[2]; // 0: old, 1: new
final BlockInfo[] blocks = unprotectedSetReplication(fsd, src, final BlockInfo[] blocks = unprotectedSetReplication(fsd, src,
replication, blockRepls); replication);
isFile = blocks != null; isFile = blocks != null;
if (isFile) { if (isFile) {
fsd.getEditLog().logSetReplication(src, replication); fsd.getEditLog().logSetReplication(src, replication);
bm.setReplication(blockRepls[0], blockRepls[1], src, blocks);
} }
} finally { } finally {
fsd.writeUnlock(); fsd.writeUnlock();
@ -399,39 +397,51 @@ public class FSDirAttrOp {
} }
static BlockInfo[] unprotectedSetReplication( static BlockInfo[] unprotectedSetReplication(
FSDirectory fsd, String src, short replication, short[] blockRepls) FSDirectory fsd, String src, short replication)
throws QuotaExceededException, UnresolvedLinkException, throws QuotaExceededException, UnresolvedLinkException,
SnapshotAccessControlException { SnapshotAccessControlException {
assert fsd.hasWriteLock(); assert fsd.hasWriteLock();
final BlockManager bm = fsd.getBlockManager();
final INodesInPath iip = fsd.getINodesInPath4Write(src, true); final INodesInPath iip = fsd.getINodesInPath4Write(src, true);
final INode inode = iip.getLastINode(); final INode inode = iip.getLastINode();
if (inode == null || !inode.isFile()) { if (inode == null || !inode.isFile()) {
return null; return null;
} }
INodeFile file = inode.asFile(); INodeFile file = inode.asFile();
final short oldBR = file.getPreferredBlockReplication();
// before setFileReplication, check for increasing block replication. // Make sure the directory has sufficient quotas
// if replication > oldBR, then newBR == replication. short oldBR = file.getPreferredBlockReplication();
// if replication < oldBR, we don't know newBR yet.
if (replication > oldBR) { // Ensure the quota does not exceed
long dsDelta = file.storagespaceConsumed(null).getStorageSpace() / oldBR; if (oldBR < replication) {
fsd.updateCount(iip, 0L, dsDelta, oldBR, replication, true); long size = file.computeFileSize(true, true);
fsd.updateCount(iip, 0L, size, oldBR, replication, true);
} }
file.setFileReplication(replication, iip.getLatestSnapshotId()); file.setFileReplication(replication, iip.getLatestSnapshotId());
short targetReplication = (short) Math.max(
replication, file.getPreferredBlockReplication());
final short newBR = file.getPreferredBlockReplication(); for (BlockInfo b : file.getBlocks()) {
// check newBR < oldBR case. if (oldBR == targetReplication) {
if (newBR < oldBR) { continue;
long dsDelta = file.storagespaceConsumed(null).getStorageSpace() / newBR; }
fsd.updateCount(iip, 0L, dsDelta, oldBR, newBR, true); if (oldBR > replication) {
fsd.updateCount(iip, 0L, b.getNumBytes(), oldBR, targetReplication,
true);
}
bm.setReplication(oldBR, targetReplication, b);
} }
if (blockRepls != null) { if (oldBR != -1) {
blockRepls[0] = oldBR; if (oldBR > targetReplication) {
blockRepls[1] = newBR; FSDirectory.LOG.info("Decreasing replication from {} to {} for {}",
oldBR, targetReplication, src);
} else {
FSDirectory.LOG.info("Increasing replication from {} to {} for {}",
oldBR, targetReplication, src);
}
} }
return file.getBlocks(); return file.getBlocks();
} }

View File

@ -170,7 +170,7 @@ class FSDirConcatOp {
QuotaCounts deltas = new QuotaCounts.Builder().build(); QuotaCounts deltas = new QuotaCounts.Builder().build();
final short targetRepl = target.getPreferredBlockReplication(); final short targetRepl = target.getPreferredBlockReplication();
for (INodeFile src : srcList) { for (INodeFile src : srcList) {
short srcRepl = src.getPreferredBlockReplication(); short srcRepl = src.getFileReplication();
long fileSize = src.computeFileSize(); long fileSize = src.computeFileSize();
if (targetRepl != srcRepl) { if (targetRepl != srcRepl) {
deltas.addStorageSpace(fileSize * (targetRepl - srcRepl)); deltas.addStorageSpace(fileSize * (targetRepl - srcRepl));
@ -223,7 +223,7 @@ class FSDirConcatOp {
// the target file can be included in a snapshot // the target file can be included in a snapshot
trgInode.recordModification(targetIIP.getLatestSnapshotId()); trgInode.recordModification(targetIIP.getLatestSnapshotId());
INodeDirectory trgParent = targetIIP.getINode(-2).asDirectory(); INodeDirectory trgParent = targetIIP.getINode(-2).asDirectory();
trgInode.concatBlocks(srcList); trgInode.concatBlocks(srcList, fsd.getBlockManager());
// since we are in the same dir - we can use same parent to remove files // since we are in the same dir - we can use same parent to remove files
int count = 0; int count = 0;

View File

@ -47,6 +47,7 @@ class FSDirDeleteOp {
NameNode.stateChangeLog.debug("DIR* FSDirectory.delete: " + iip.getPath()); NameNode.stateChangeLog.debug("DIR* FSDirectory.delete: " + iip.getPath());
} }
long filesRemoved = -1; long filesRemoved = -1;
FSNamesystem fsn = fsd.getFSNamesystem();
fsd.writeLock(); fsd.writeLock();
try { try {
if (deleteAllowed(iip, iip.getPath()) ) { if (deleteAllowed(iip, iip.getPath()) ) {
@ -58,7 +59,9 @@ class FSDirDeleteOp {
if (unprotectedDelete(fsd, iip, context, mtime)) { if (unprotectedDelete(fsd, iip, context, mtime)) {
filesRemoved = context.quotaDelta().getNsDelta(); filesRemoved = context.quotaDelta().getNsDelta();
} }
fsd.getFSNamesystem().removeSnapshottableDirs(snapshottableDirs); fsd.updateReplicationFactor(context.collectedBlocks()
.toUpdateReplicationInfo());
fsn.removeSnapshottableDirs(snapshottableDirs);
fsd.updateCount(iip, context.quotaDelta(), false); fsd.updateCount(iip, context.quotaDelta(), false);
} }
} finally { } finally {

View File

@ -729,8 +729,8 @@ class FSDirRenameOp {
Preconditions.checkState(oldDstChild != null); Preconditions.checkState(oldDstChild != null);
List<INode> removedINodes = new ChunkedArrayList<>(); List<INode> removedINodes = new ChunkedArrayList<>();
List<Long> removedUCFiles = new ChunkedArrayList<>(); List<Long> removedUCFiles = new ChunkedArrayList<>();
INode.ReclaimContext context = new INode.ReclaimContext(bsps, INode.ReclaimContext context = new INode.ReclaimContext(
collectedBlocks, removedINodes, removedUCFiles); bsps, collectedBlocks, removedINodes, removedUCFiles);
final boolean filesDeleted; final boolean filesDeleted;
if (!oldDstChild.isInLatestSnapshot(dstIIP.getLatestSnapshotId())) { if (!oldDstChild.isInLatestSnapshot(dstIIP.getLatestSnapshotId())) {
oldDstChild.destroyAndCollectBlocks(context); oldDstChild.destroyAndCollectBlocks(context);
@ -740,6 +740,9 @@ class FSDirRenameOp {
dstIIP.getLatestSnapshotId()); dstIIP.getLatestSnapshotId());
filesDeleted = context.quotaDelta().getNsDelta() >= 0; filesDeleted = context.quotaDelta().getNsDelta() >= 0;
} }
fsd.updateReplicationFactor(context.collectedBlocks()
.toUpdateReplicationInfo());
fsd.getFSNamesystem().removeLeasesAndINodes( fsd.getFSNamesystem().removeLeasesAndINodes(
removedUCFiles, removedINodes, false); removedUCFiles, removedINodes, false);
return filesDeleted; return filesDeleted;

View File

@ -184,6 +184,8 @@ class FSDirSnapshotOp {
snapshotManager.deleteSnapshot(iip, snapshotName, context); snapshotManager.deleteSnapshot(iip, snapshotName, context);
fsd.updateCount(iip, context.quotaDelta(), false); fsd.updateCount(iip, context.quotaDelta(), false);
fsd.removeFromInodeMap(removedINodes); fsd.removeFromInodeMap(removedINodes);
fsd.updateReplicationFactor(context.collectedBlocks()
.toUpdateReplicationInfo());
} finally { } finally {
fsd.writeUnlock(); fsd.writeUnlock();
} }

View File

@ -517,7 +517,7 @@ class FSDirWriteFileOp {
// check quota limits and updated space consumed // check quota limits and updated space consumed
fsd.updateCount(inodesInPath, 0, fileINode.getPreferredBlockSize(), fsd.updateCount(inodesInPath, 0, fileINode.getPreferredBlockSize(),
fileINode.getPreferredBlockReplication(), true); fileINode.getFileReplication(), true);
// associate new last block for the file // associate new last block for the file
BlockInfo blockInfo = new BlockInfoContiguous(block, BlockInfo blockInfo = new BlockInfoContiguous(block,

View File

@ -48,9 +48,11 @@ import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException; import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
import org.apache.hadoop.hdfs.protocolPB.PBHelper; import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite; import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo.UpdatedReplicationInfo;
import org.apache.hadoop.hdfs.util.ByteArray; import org.apache.hadoop.hdfs.util.ByteArray;
import org.apache.hadoop.hdfs.util.EnumCounters; import org.apache.hadoop.hdfs.util.EnumCounters;
import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.AccessControlException;
@ -63,6 +65,7 @@ import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -450,6 +453,20 @@ public class FSDirectory implements Closeable {
} }
} }
/**
* Tell the block manager to update the replication factors when delete
* happens. Deleting a file or a snapshot might decrease the replication
* factor of the blocks as the blocks are always replicated to the highest
* replication factor among all snapshots.
*/
void updateReplicationFactor(Collection<UpdatedReplicationInfo> blocks) {
BlockManager bm = getBlockManager();
for (UpdatedReplicationInfo e : blocks) {
BlockInfo b = e.block();
bm.setReplication(b.getReplication(), e.targetReplication(), b);
}
}
/** Updates namespace, storagespace and typespaces consumed for all /** Updates namespace, storagespace and typespaces consumed for all
* directories until the parent directory of file represented by path. * directories until the parent directory of file represented by path.
* *

View File

@ -514,7 +514,7 @@ public class FSEditLogLoader {
short replication = fsNamesys.getBlockManager().adjustReplication( short replication = fsNamesys.getBlockManager().adjustReplication(
setReplicationOp.replication); setReplicationOp.replication);
FSDirAttrOp.unprotectedSetReplication(fsDir, renameReservedPathsOnUpgrade( FSDirAttrOp.unprotectedSetReplication(fsDir, renameReservedPathsOnUpgrade(
setReplicationOp.path, logVersion), replication, null); setReplicationOp.path, logVersion), replication);
break; break;
} }
case OP_CONCAT_DELETE: { case OP_CONCAT_DELETE: {
@ -1058,7 +1058,7 @@ public class FSEditLogLoader {
// versions of Hadoop. Current versions always log // versions of Hadoop. Current versions always log
// OP_ADD operations as each block is allocated. // OP_ADD operations as each block is allocated.
newBI = new BlockInfoContiguous(newBlock, newBI = new BlockInfoContiguous(newBlock,
file.getPreferredBlockReplication()); file.getFileReplication());
} }
fsNamesys.getBlockManager().addBlockCollection(newBI, file); fsNamesys.getBlockManager().addBlockCollection(newBI, file);
file.addBlock(newBI); file.addBlock(newBI);

View File

@ -901,15 +901,14 @@ public abstract class INode implements INodeAttributes, Diff.Element<byte[]> {
/** /**
* @param bsps * @param bsps
* block storage policy suite to calculate intended storage type * block storage policy suite to calculate intended storage type
* usage * usage
* @param collectedBlocks * @param collectedBlocks
* blocks collected from the descents for further block * blocks collected from the descents for further block
* deletion/update will be added to the given map. * deletion/update will be added to the given map.
* @param removedINodes * @param removedINodes
* INodes collected from the descents for further cleaning up of * INodes collected from the descents for further cleaning up of
* @param removedUCFiles * @param removedUCFiles
* files that the NN need to remove the leases
*/ */
public ReclaimContext( public ReclaimContext(
BlockStoragePolicySuite bsps, BlocksMapUpdateInfo collectedBlocks, BlockStoragePolicySuite bsps, BlocksMapUpdateInfo collectedBlocks,
@ -947,13 +946,44 @@ public abstract class INode implements INodeAttributes, Diff.Element<byte[]> {
* Information used for updating the blocksMap when deleting files. * Information used for updating the blocksMap when deleting files.
*/ */
public static class BlocksMapUpdateInfo { public static class BlocksMapUpdateInfo {
/**
* The blocks whose replication factor need to be updated.
*/
public static class UpdatedReplicationInfo {
/**
* the expected replication after the update.
*/
private final short targetReplication;
/**
* The block whose replication needs to be updated.
*/
private final BlockInfo block;
public UpdatedReplicationInfo(short targetReplication, BlockInfo block) {
this.targetReplication = targetReplication;
this.block = block;
}
public BlockInfo block() {
return block;
}
public short targetReplication() {
return targetReplication;
}
}
/** /**
* The list of blocks that need to be removed from blocksMap * The list of blocks that need to be removed from blocksMap
*/ */
private final List<BlockInfo> toDeleteList; private final List<BlockInfo> toDeleteList;
/**
* The list of blocks whose replication factor needs to be adjusted
*/
private final List<UpdatedReplicationInfo> toUpdateReplicationInfo;
public BlocksMapUpdateInfo() { public BlocksMapUpdateInfo() {
toDeleteList = new ChunkedArrayList<>(); toDeleteList = new ChunkedArrayList<>();
toUpdateReplicationInfo = new ChunkedArrayList<>();
} }
/** /**
@ -962,7 +992,11 @@ public abstract class INode implements INodeAttributes, Diff.Element<byte[]> {
public List<BlockInfo> getToDeleteList() { public List<BlockInfo> getToDeleteList() {
return toDeleteList; return toDeleteList;
} }
public List<UpdatedReplicationInfo> toUpdateReplicationInfo() {
return toUpdateReplicationInfo;
}
/** /**
* Add a to-be-deleted block into the * Add a to-be-deleted block into the
* {@link BlocksMapUpdateInfo#toDeleteList} * {@link BlocksMapUpdateInfo#toDeleteList}
@ -978,6 +1012,10 @@ public abstract class INode implements INodeAttributes, Diff.Element<byte[]> {
toDeleteList.remove(block); toDeleteList.remove(block);
} }
public void addUpdateReplicationFactor(BlockInfo block, short targetRepl) {
toUpdateReplicationInfo.add(
new UpdatedReplicationInfo(targetRepl, block));
}
/** /**
* Clear {@link BlocksMapUpdateInfo#toDeleteList} * Clear {@link BlocksMapUpdateInfo#toDeleteList}
*/ */

View File

@ -37,6 +37,7 @@ import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.QuotaExceededException; import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection; import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite; import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
@ -353,12 +354,11 @@ public class INodeFile extends INodeWithAdditionalFields
return getFileReplication(CURRENT_STATE_ID); return getFileReplication(CURRENT_STATE_ID);
} }
@Override // BlockCollection
public short getPreferredBlockReplication() { public short getPreferredBlockReplication() {
short max = getFileReplication(CURRENT_STATE_ID); short max = getFileReplication(CURRENT_STATE_ID);
FileWithSnapshotFeature sf = this.getFileWithSnapshotFeature(); FileWithSnapshotFeature sf = this.getFileWithSnapshotFeature();
if (sf != null) { if (sf != null) {
short maxInSnapshot = sf.getMaxBlockRepInDiffs(); short maxInSnapshot = sf.getMaxBlockRepInDiffs(null);
if (sf.isCurrentFileDeleted()) { if (sf.isCurrentFileDeleted()) {
return maxInSnapshot; return maxInSnapshot;
} }
@ -439,19 +439,10 @@ public class INodeFile extends INodeWithAdditionalFields
return (snapshotBlocks == null) ? getBlocks() : snapshotBlocks; return (snapshotBlocks == null) ? getBlocks() : snapshotBlocks;
} }
/** Used during concat to update the BlockCollection for each block. */
private void updateBlockCollection() {
if (blocks != null) {
for(BlockInfo b : blocks) {
b.setBlockCollection(this);
}
}
}
/** /**
* append array of blocks to this.blocks * append array of blocks to this.blocks
*/ */
void concatBlocks(INodeFile[] inodes) { void concatBlocks(INodeFile[] inodes, BlockManager bm) {
int size = this.blocks.length; int size = this.blocks.length;
int totalAddedBlocks = 0; int totalAddedBlocks = 0;
for(INodeFile f : inodes) { for(INodeFile f : inodes) {
@ -468,7 +459,14 @@ public class INodeFile extends INodeWithAdditionalFields
} }
setBlocks(newlist); setBlocks(newlist);
updateBlockCollection(); for(BlockInfo b : blocks) {
b.setBlockCollection(this);
short oldRepl = b.getReplication();
short repl = getPreferredBlockReplication();
if (oldRepl != repl) {
bm.setReplication(oldRepl, repl, b);
}
}
} }
/** /**
@ -857,10 +855,9 @@ public class INodeFile extends INodeWithAdditionalFields
truncatedBytes -= bi.getNumBytes(); truncatedBytes -= bi.getNumBytes();
} }
delta.addStorageSpace(-truncatedBytes * getPreferredBlockReplication()); delta.addStorageSpace(-truncatedBytes * bi.getReplication());
if (bsps != null) { if (bsps != null) {
List<StorageType> types = bsps.chooseStorageTypes( List<StorageType> types = bsps.chooseStorageTypes(bi.getReplication());
getPreferredBlockReplication());
for (StorageType t : types) { for (StorageType t : types) {
if (t.supportTypeQuota()) { if (t.supportTypeQuota()) {
delta.addTypeSpace(t, -truncatedBytes); delta.addTypeSpace(t, -truncatedBytes);

View File

@ -254,8 +254,9 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
NumberReplicas numberReplicas= bm.countNodes(blockInfo); NumberReplicas numberReplicas= bm.countNodes(blockInfo);
out.println("Block Id: " + blockId); out.println("Block Id: " + blockId);
out.println("Block belongs to: "+iNode.getFullPathName()); out.println("Block belongs to: "+iNode.getFullPathName());
out.println("No. of Expected Replica: " + if (blockInfo != null) {
bc.getPreferredBlockReplication()); out.println("No. of Expected Replica: " + blockInfo.getReplication());
}
out.println("No. of live Replica: " + numberReplicas.liveReplicas()); out.println("No. of live Replica: " + numberReplicas.liveReplicas());
out.println("No. of excess Replica: " + numberReplicas.excessReplicas()); out.println("No. of excess Replica: " + numberReplicas.excessReplicas());
out.println("No. of stale Replica: " + out.println("No. of stale Replica: " +

View File

@ -44,6 +44,7 @@ import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
import org.apache.hadoop.hdfs.protocolPB.PBHelper; import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.namenode.AclEntryStatusFormat; import org.apache.hadoop.hdfs.server.namenode.AclEntryStatusFormat;
import org.apache.hadoop.hdfs.server.namenode.AclFeature; import org.apache.hadoop.hdfs.server.namenode.AclFeature;
import org.apache.hadoop.hdfs.server.namenode.FSDirectory; import org.apache.hadoop.hdfs.server.namenode.FSDirectory;
@ -208,6 +209,7 @@ public class FSImageFormatPBSnapshot {
throws IOException { throws IOException {
final FileDiffList diffs = new FileDiffList(); final FileDiffList diffs = new FileDiffList();
final LoaderContext state = parent.getLoaderContext(); final LoaderContext state = parent.getLoaderContext();
final BlockManager bm = fsn.getBlockManager();
for (int i = 0; i < size; i++) { for (int i = 0; i < size; i++) {
SnapshotDiffSection.FileDiff pbf = SnapshotDiffSection.FileDiff SnapshotDiffSection.FileDiff pbf = SnapshotDiffSection.FileDiff
.parseDelimitedFrom(in); .parseDelimitedFrom(in);
@ -243,9 +245,9 @@ public class FSImageFormatPBSnapshot {
BlockInfo[] blocks = new BlockInfo[bpl.size()]; BlockInfo[] blocks = new BlockInfo[bpl.size()];
for(int j = 0, e = bpl.size(); j < e; ++j) { for(int j = 0, e = bpl.size(); j < e; ++j) {
Block blk = PBHelper.convert(bpl.get(j)); Block blk = PBHelper.convert(bpl.get(j));
BlockInfo storedBlock = fsn.getBlockManager().getStoredBlock(blk); BlockInfo storedBlock = bm.getStoredBlock(blk);
if(storedBlock == null) { if(storedBlock == null) {
storedBlock = fsn.getBlockManager().addBlockCollection( storedBlock = bm.addBlockCollection(
new BlockInfoContiguous(blk, copy.getFileReplication()), file); new BlockInfoContiguous(blk, copy.getFileReplication()), file);
} }
blocks[j] = storedBlock; blocks[j] = storedBlock;
@ -256,6 +258,12 @@ public class FSImageFormatPBSnapshot {
diffs.addFirst(diff); diffs.addFirst(diff);
} }
file.addSnapshotFeature(diffs); file.addSnapshotFeature(diffs);
short repl = file.getPreferredBlockReplication();
for (BlockInfo b : file.getBlocks()) {
if (b.getReplication() < repl) {
bm.setReplication(b.getReplication(), repl, b);
}
}
} }
/** Load the created list in a DirectoryDiff */ /** Load the created list in a DirectoryDiff */

View File

@ -65,10 +65,10 @@ public class FileWithSnapshotFeature implements INode.Feature {
} }
/** @return the max replication factor in diffs */ /** @return the max replication factor in diffs */
public short getMaxBlockRepInDiffs() { public short getMaxBlockRepInDiffs(FileDiff excluded) {
short max = 0; short max = 0;
for(FileDiff d : getDiffs()) { for(FileDiff d : getDiffs()) {
if (d.snapshotINode != null) { if (d != excluded && d.snapshotINode != null) {
final short replication = d.snapshotINode.getFileReplication(); final short replication = d.snapshotINode.getFileReplication();
if (replication > max) { if (replication > max) {
max = replication; max = replication;
@ -147,28 +147,27 @@ public class FileWithSnapshotFeature implements INode.Feature {
byte storagePolicyID = file.getStoragePolicyID(); byte storagePolicyID = file.getStoragePolicyID();
BlockStoragePolicy bsp = null; BlockStoragePolicy bsp = null;
if (storagePolicyID != HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED) { if (storagePolicyID != HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED) {
bsp = reclaimContext.storagePolicySuite().getPolicy(file.getStoragePolicyID()); bsp = reclaimContext.storagePolicySuite().
getPolicy(file.getStoragePolicyID());
} }
QuotaCounts oldCounts;
QuotaCounts oldCounts = file.storagespaceConsumed(null);
long oldStoragespace;
if (removed.snapshotINode != null) { if (removed.snapshotINode != null) {
short replication = removed.snapshotINode.getFileReplication(); oldCounts = new QuotaCounts.Builder().build();
short currentRepl = file.getPreferredBlockReplication(); BlockInfo[] blocks = file.getBlocks() == null ? new
if (replication > currentRepl) { BlockInfo[0] : file.getBlocks();
long oldFileSizeNoRep = currentRepl == 0 for (BlockInfo b: blocks) {
? file.computeFileSize(true, true) short replication = b.getReplication();
: oldCounts.getStorageSpace() / long blockSize = b.isComplete() ? b.getNumBytes() : file
file.getPreferredBlockReplication(); .getPreferredBlockSize();
oldStoragespace = oldFileSizeNoRep * replication;
oldCounts.setStorageSpace(oldStoragespace); oldCounts.addStorageSpace(blockSize * replication);
if (bsp != null) { if (bsp != null) {
List<StorageType> oldTypeChosen = bsp.chooseStorageTypes(replication); List<StorageType> oldTypeChosen = bsp.chooseStorageTypes(replication);
for (StorageType t : oldTypeChosen) { for (StorageType t : oldTypeChosen) {
if (t.supportTypeQuota()) { if (t.supportTypeQuota()) {
oldCounts.addTypeSpace(t, oldFileSizeNoRep); oldCounts.addTypeSpace(t, blockSize);
} }
} }
} }
@ -178,10 +177,21 @@ public class FileWithSnapshotFeature implements INode.Feature {
if (aclFeature != null) { if (aclFeature != null) {
AclStorage.removeAclFeature(aclFeature); AclStorage.removeAclFeature(aclFeature);
} }
} else {
oldCounts = file.storagespaceConsumed(null);
} }
getDiffs().combineAndCollectSnapshotBlocks(reclaimContext, file, removed); getDiffs().combineAndCollectSnapshotBlocks(reclaimContext, file, removed);
if (file.getBlocks() != null) {
short replInDiff = getMaxBlockRepInDiffs(removed);
short repl = (short) Math.max(file.getPreferredBlockReplication(),
replInDiff);
for (BlockInfo b : file.getBlocks()) {
if (repl != b.getReplication()) {
reclaimContext.collectedBlocks().addUpdateReplicationFactor(b, repl);
}
}
}
QuotaCounts current = file.storagespaceConsumed(bsp); QuotaCounts current = file.storagespaceConsumed(bsp);
reclaimContext.quotaDelta().add(oldCounts.subtract(current)); reclaimContext.quotaDelta().add(oldCounts.subtract(current));
} }

View File

@ -434,7 +434,6 @@ public class TestBlockManager {
private BlockInfo addBlockOnNodes(long blockId, List<DatanodeDescriptor> nodes) { private BlockInfo addBlockOnNodes(long blockId, List<DatanodeDescriptor> nodes) {
BlockCollection bc = Mockito.mock(BlockCollection.class); BlockCollection bc = Mockito.mock(BlockCollection.class);
Mockito.doReturn((short)3).when(bc).getPreferredBlockReplication();
BlockInfo blockInfo = blockOnNodes(blockId, nodes); BlockInfo blockInfo = blockOnNodes(blockId, nodes);
bm.blocksMap.addBlockCollection(blockInfo, bc); bm.blocksMap.addBlockCollection(blockInfo, bc);
@ -741,7 +740,6 @@ public class TestBlockManager {
BlockInfo blockInfo = BlockInfo blockInfo =
new BlockInfoContiguous(block, (short) 3); new BlockInfoContiguous(block, (short) 3);
BlockCollection bc = Mockito.mock(BlockCollection.class); BlockCollection bc = Mockito.mock(BlockCollection.class);
Mockito.doReturn((short) 3).when(bc).getPreferredBlockReplication();
bm.blocksMap.addBlockCollection(blockInfo, bc); bm.blocksMap.addBlockCollection(blockInfo, bc);
return blockInfo; return blockInfo;
} }
@ -751,7 +749,6 @@ public class TestBlockManager {
BlockInfo blockInfo = new BlockInfoContiguous(block, (short) 3); BlockInfo blockInfo = new BlockInfoContiguous(block, (short) 3);
blockInfo.convertToBlockUnderConstruction(UNDER_CONSTRUCTION, null); blockInfo.convertToBlockUnderConstruction(UNDER_CONSTRUCTION, null);
BlockCollection bc = Mockito.mock(BlockCollection.class); BlockCollection bc = Mockito.mock(BlockCollection.class);
Mockito.doReturn((short) 3).when(bc).getPreferredBlockReplication();
bm.blocksMap.addBlockCollection(blockInfo, bc); bm.blocksMap.addBlockCollection(blockInfo, bc);
return blockInfo; return blockInfo;
} }

View File

@ -190,7 +190,6 @@ public class TestPendingReplication {
DatanodeStorageInfo.toDatanodeDescriptors( DatanodeStorageInfo.toDatanodeDescriptors(
DFSTestUtil.createDatanodeStorageInfos(1))); DFSTestUtil.createDatanodeStorageInfos(1)));
BlockCollection bc = Mockito.mock(BlockCollection.class); BlockCollection bc = Mockito.mock(BlockCollection.class);
Mockito.doReturn((short) 3).when(bc).getPreferredBlockReplication();
// Place into blocksmap with GenerationStamp = 1 // Place into blocksmap with GenerationStamp = 1
blockInfo.setGenerationStamp(1); blockInfo.setGenerationStamp(1);
blocksMap.addBlockCollection(blockInfo, bc); blocksMap.addBlockCollection(blockInfo, bc);

View File

@ -1225,7 +1225,6 @@ public class TestReplicationPolicy {
BlockInfoContiguous info = new BlockInfoContiguous(block1, (short) 1); BlockInfoContiguous info = new BlockInfoContiguous(block1, (short) 1);
info.convertToBlockUnderConstruction(BlockUCState.UNDER_CONSTRUCTION, null); info.convertToBlockUnderConstruction(BlockUCState.UNDER_CONSTRUCTION, null);
BlockCollection bc = mock(BlockCollection.class); BlockCollection bc = mock(BlockCollection.class);
when(bc.getPreferredBlockReplication()).thenReturn((short)1);
bm.addBlockCollection(info, bc); bm.addBlockCollection(info, bc);
// Adding this block will increase its current replication, and that will // Adding this block will increase its current replication, and that will
@ -1269,7 +1268,6 @@ public class TestReplicationPolicy {
final BlockCollection mbc = mock(BlockCollection.class); final BlockCollection mbc = mock(BlockCollection.class);
when(mbc.getLastBlock()).thenReturn(info); when(mbc.getLastBlock()).thenReturn(info);
when(mbc.getPreferredBlockSize()).thenReturn(block1.getNumBytes() + 1); when(mbc.getPreferredBlockSize()).thenReturn(block1.getNumBytes() + 1);
when(mbc.getPreferredBlockReplication()).thenReturn((short)1);
when(mbc.isUnderConstruction()).thenReturn(true); when(mbc.isUnderConstruction()).thenReturn(true);
ContentSummary cs = mock(ContentSummary.class); ContentSummary cs = mock(ContentSummary.class);
when(cs.getLength()).thenReturn((long)1); when(cs.getLength()).thenReturn((long)1);
@ -1326,7 +1324,7 @@ public class TestReplicationPolicy {
chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(1); chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(1);
assertTheChosenBlocks(chosenBlocks, 1, 0, 0, 0, 0); assertTheChosenBlocks(chosenBlocks, 1, 0, 0, 0, 0);
bm.setReplication((short)0, (short)1, "", block1); bm.setReplication((short)0, (short)1, block1);
// Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block // Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block
// from QUEUE_VERY_UNDER_REPLICATED. // from QUEUE_VERY_UNDER_REPLICATED.

View File

@ -55,6 +55,7 @@ import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DirectoryListing; import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
@ -63,6 +64,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.QuotaExceededException; import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot; import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
@ -271,8 +273,9 @@ public class TestINodeFile {
INodeFile origFile = createINodeFiles(1, "origfile")[0]; INodeFile origFile = createINodeFiles(1, "origfile")[0];
assertEquals("Number of blocks didn't match", origFile.numBlocks(), 1L); assertEquals("Number of blocks didn't match", origFile.numBlocks(), 1L);
INodeFile[] appendFiles = createINodeFiles(4, "appendfile"); INodeFile[] appendFiles = createINodeFiles(4, "appendfile");
origFile.concatBlocks(appendFiles); BlockManager bm = Mockito.mock(BlockManager.class);
origFile.concatBlocks(appendFiles, bm);
assertEquals("Number of blocks didn't match", origFile.numBlocks(), 5L); assertEquals("Number of blocks didn't match", origFile.numBlocks(), 5L);
} }

View File

@ -22,6 +22,7 @@ import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite; import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
import org.apache.hadoop.hdfs.server.namenode.INode; import org.apache.hadoop.hdfs.server.namenode.INode;
import org.apache.hadoop.hdfs.server.namenode.INodeFile; import org.apache.hadoop.hdfs.server.namenode.INodeFile;
@ -53,12 +54,16 @@ public class TestFileWithSnapshotFeature {
BlockInfo[] blocks = new BlockInfo[] { BlockInfo[] blocks = new BlockInfo[] {
new BlockInfoContiguous(new Block(1, BLOCK_SIZE, 1), REPL_1) new BlockInfoContiguous(new Block(1, BLOCK_SIZE, 1), REPL_1)
}; };
BlockManager bm = mock(BlockManager.class);
// No snapshot // No snapshot
INodeFile file = mock(INodeFile.class); INodeFile file = mock(INodeFile.class);
when(file.getFileWithSnapshotFeature()).thenReturn(sf); when(file.getFileWithSnapshotFeature()).thenReturn(sf);
when(file.getBlocks()).thenReturn(blocks); when(file.getBlocks()).thenReturn(blocks);
when(file.getStoragePolicyID()).thenReturn((byte) 1); when(file.getStoragePolicyID()).thenReturn((byte) 1);
Whitebox.setInternalState(file, "header", (long) REPL_1 << 48);
when(file.getPreferredBlockReplication()).thenReturn(REPL_1);
when(bsps.getPolicy(anyByte())).thenReturn(bsp); when(bsps.getPolicy(anyByte())).thenReturn(bsp);
INode.BlocksMapUpdateInfo collectedBlocks = mock( INode.BlocksMapUpdateInfo collectedBlocks = mock(
INode.BlocksMapUpdateInfo.class); INode.BlocksMapUpdateInfo.class);
@ -72,7 +77,6 @@ public class TestFileWithSnapshotFeature {
// INode only exists in the snapshot // INode only exists in the snapshot
INodeFile snapshotINode = mock(INodeFile.class); INodeFile snapshotINode = mock(INodeFile.class);
when(file.getPreferredBlockReplication()).thenReturn(REPL_1);
Whitebox.setInternalState(snapshotINode, "header", (long) REPL_3 << 48); Whitebox.setInternalState(snapshotINode, "header", (long) REPL_3 << 48);
Whitebox.setInternalState(diff, "snapshotINode", snapshotINode); Whitebox.setInternalState(diff, "snapshotINode", snapshotINode);
when(diff.getSnapshotINode()).thenReturn(snapshotINode); when(diff.getSnapshotINode()).thenReturn(snapshotINode);
@ -81,6 +85,7 @@ public class TestFileWithSnapshotFeature {
.thenReturn(Lists.newArrayList(SSD)); .thenReturn(Lists.newArrayList(SSD));
when(bsp.chooseStorageTypes(REPL_3)) when(bsp.chooseStorageTypes(REPL_3))
.thenReturn(Lists.newArrayList(DISK)); .thenReturn(Lists.newArrayList(DISK));
blocks[0].setReplication(REPL_3);
sf.updateQuotaAndCollectBlocks(ctx, file, diff); sf.updateQuotaAndCollectBlocks(ctx, file, diff);
counts = ctx.quotaDelta().getCountsCopy(); counts = ctx.quotaDelta().getCountsCopy();
Assert.assertEquals((REPL_3 - REPL_1) * BLOCK_SIZE, Assert.assertEquals((REPL_3 - REPL_1) * BLOCK_SIZE,

View File

@ -782,7 +782,7 @@ public class TestSnapshotDeletion {
// modify file10, to check if the posterior diff was set correctly // modify file10, to check if the posterior diff was set correctly
hdfs.setReplication(file10, REPLICATION); hdfs.setReplication(file10, REPLICATION);
checkQuotaUsageComputation(snapshotRoot, dirNodeNum + 7, 20 * BLOCKSIZE); checkQuotaUsageComputation(snapshotRoot, dirNodeNum + 7, 20 * BLOCKSIZE);
Path file10_s1 = SnapshotTestHelper.getSnapshotPath(snapshotRoot, "s1", Path file10_s1 = SnapshotTestHelper.getSnapshotPath(snapshotRoot, "s1",
modDirStr + "file10"); modDirStr + "file10");
Path file11_s1 = SnapshotTestHelper.getSnapshotPath(snapshotRoot, "s1", Path file11_s1 = SnapshotTestHelper.getSnapshotPath(snapshotRoot, "s1",
@ -830,7 +830,7 @@ public class TestSnapshotDeletion {
blockmanager); blockmanager);
TestSnapshotBlocksMap.assertBlockCollection(file13_s1.toString(), 1, fsdir, TestSnapshotBlocksMap.assertBlockCollection(file13_s1.toString(), 1, fsdir,
blockmanager); blockmanager);
// make sure file14 and file15 are not included in s1 // make sure file14 and file15 are not included in s1
Path file14_s1 = SnapshotTestHelper.getSnapshotPath(snapshotRoot, "s1", Path file14_s1 = SnapshotTestHelper.getSnapshotPath(snapshotRoot, "s1",
modDirStr + "file14"); modDirStr + "file14");
@ -841,14 +841,18 @@ public class TestSnapshotDeletion {
for (BlockInfo b : blocks_14) { for (BlockInfo b : blocks_14) {
assertNull(blockmanager.getBlockCollection(b)); assertNull(blockmanager.getBlockCollection(b));
} }
INodeFile nodeFile13 = (INodeFile) fsdir.getINode(file13.toString()); INodeFile nodeFile13 = (INodeFile) fsdir.getINode(file13.toString());
assertEquals(REPLICATION_1, nodeFile13.getPreferredBlockReplication()); for (BlockInfo b: nodeFile13.getBlocks()) {
assertEquals(REPLICATION_1, b.getReplication());
}
TestSnapshotBlocksMap.assertBlockCollection(file13.toString(), 1, fsdir, TestSnapshotBlocksMap.assertBlockCollection(file13.toString(), 1, fsdir,
blockmanager); blockmanager);
INodeFile nodeFile12 = (INodeFile) fsdir.getINode(file12_s1.toString()); INodeFile nodeFile12 = (INodeFile) fsdir.getINode(file12_s1.toString());
assertEquals(REPLICATION_1, nodeFile12.getPreferredBlockReplication()); for (BlockInfo b: nodeFile12.getBlocks()) {
assertEquals(REPLICATION_1, b.getReplication());
}
} }
/** Test deleting snapshots with modification on the metadata of directory */ /** Test deleting snapshots with modification on the metadata of directory */

View File

@ -28,6 +28,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.namenode.FSDirectory; import org.apache.hadoop.hdfs.server.namenode.FSDirectory;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.INode; import org.apache.hadoop.hdfs.server.namenode.INode;
@ -38,10 +39,9 @@ import org.junit.Before;
import org.junit.Test; import org.junit.Test;
/** /**
* This class tests the replication handling/calculation of snapshots. In * This class tests the replication handling/calculation of snapshots to make
* particular, {@link INodeFile#getFileReplication()} and * sure the number of replication is calculated correctly with/without
* {@link INodeFile#getPreferredBlockReplication()} are tested to make sure * snapshots.
* the number of replication is calculated correctly with/without snapshots.
*/ */
public class TestSnapshotReplication { public class TestSnapshotReplication {
@ -79,9 +79,7 @@ public class TestSnapshotReplication {
} }
/** /**
* Check the replication of a given file. We test both * Check the replication of a given file.
* {@link INodeFile#getFileReplication()} and
* {@link INodeFile#getPreferredBlockReplication()}.
* *
* @param file The given file * @param file The given file
* @param replication The expected replication number * @param replication The expected replication number
@ -98,8 +96,9 @@ public class TestSnapshotReplication {
// Check the correctness of getPreferredBlockReplication() // Check the correctness of getPreferredBlockReplication()
INode inode = fsdir.getINode(file1.toString()); INode inode = fsdir.getINode(file1.toString());
assertTrue(inode instanceof INodeFile); assertTrue(inode instanceof INodeFile);
assertEquals(blockReplication, for (BlockInfo b: inode.asFile().getBlocks()) {
((INodeFile) inode).getPreferredBlockReplication()); assertEquals(blockReplication, b.getReplication());
}
} }
/** /**
@ -141,8 +140,9 @@ public class TestSnapshotReplication {
// First check the getPreferredBlockReplication for the INode of // First check the getPreferredBlockReplication for the INode of
// the currentFile // the currentFile
final INodeFile inodeOfCurrentFile = getINodeFile(currentFile); final INodeFile inodeOfCurrentFile = getINodeFile(currentFile);
assertEquals(expectedBlockRep, for (BlockInfo b : inodeOfCurrentFile.getBlocks()) {
inodeOfCurrentFile.getPreferredBlockReplication()); assertEquals(expectedBlockRep, b.getReplication());
}
// Then check replication for every snapshot // Then check replication for every snapshot
for (Path ss : snapshotRepMap.keySet()) { for (Path ss : snapshotRepMap.keySet()) {
final INodesInPath iip = fsdir.getINodesInPath(ss.toString(), true); final INodesInPath iip = fsdir.getINodesInPath(ss.toString(), true);
@ -150,7 +150,9 @@ public class TestSnapshotReplication {
// The replication number derived from the // The replication number derived from the
// INodeFileWithLink#getPreferredBlockReplication should // INodeFileWithLink#getPreferredBlockReplication should
// always == expectedBlockRep // always == expectedBlockRep
assertEquals(expectedBlockRep, ssInode.getPreferredBlockReplication()); for (BlockInfo b : ssInode.getBlocks()) {
assertEquals(expectedBlockRep, b.getReplication());
}
// Also check the number derived from INodeFile#getFileReplication // Also check the number derived from INodeFile#getFileReplication
assertEquals(snapshotRepMap.get(ss).shortValue(), assertEquals(snapshotRepMap.get(ss).shortValue(),
ssInode.getFileReplication(iip.getPathSnapshotId())); ssInode.getFileReplication(iip.getPathSnapshotId()));
@ -224,7 +226,10 @@ public class TestSnapshotReplication {
// The replication number derived from the // The replication number derived from the
// INodeFileWithLink#getPreferredBlockReplication should // INodeFileWithLink#getPreferredBlockReplication should
// always == expectedBlockRep // always == expectedBlockRep
assertEquals(REPLICATION, ssInode.getPreferredBlockReplication()); for (BlockInfo b : ssInode.getBlocks()) {
assertEquals(REPLICATION, b.getReplication());
}
// Also check the number derived from INodeFile#getFileReplication // Also check the number derived from INodeFile#getFileReplication
assertEquals(snapshotRepMap.get(ss).shortValue(), assertEquals(snapshotRepMap.get(ss).shortValue(),
ssInode.getFileReplication()); ssInode.getFileReplication());