svn merge -c 1398288 from trunk for HDFS-4037. Rename the getReplication() method in BlockCollection to getBlockReplication().

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1398289 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2012-10-15 13:55:18 +00:00
parent 5e2b9b88ef
commit 1f109df2c6
14 changed files with 42 additions and 37 deletions

View File

@ -19,8 +19,8 @@ Release 2.0.3-alpha - Unreleased
HDFS-3939. NN RPC address cleanup. (eli) HDFS-3939. NN RPC address cleanup. (eli)
HDFS-3373. Change DFSClient input stream socket cache to global static and add HDFS-3373. Change DFSClient input stream socket cache to global static and
a thread to cleanup expired cache entries. (John George via szetszwo) add a thread to cleanup expired cache entries. (John George via szetszwo)
HDFS-3896. Add descriptions for dfs.namenode.rpc-address and HDFS-3896. Add descriptions for dfs.namenode.rpc-address and
dfs.namenode.servicerpc-address to hdfs-default.xml. (Jeff Lord via atm) dfs.namenode.servicerpc-address to hdfs-default.xml. (Jeff Lord via atm)
@ -51,6 +51,9 @@ Release 2.0.3-alpha - Unreleased
HADOOP-8911. CRLF characters in source and text files. HADOOP-8911. CRLF characters in source and text files.
(Raja Aluri via suresh) (Raja Aluri via suresh)
HDFS-4037. Rename the getReplication() method in BlockCollection to
getBlockReplication(). (szetszwo)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -19,12 +19,14 @@
import java.io.IOException; import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.ContentSummary;
/** /**
* This interface is used by the block manager to expose a * This interface is used by the block manager to expose a
* few characteristics of a collection of Block/BlockUnderConstruction. * few characteristics of a collection of Block/BlockUnderConstruction.
*/ */
@InterfaceAudience.Private
public interface BlockCollection { public interface BlockCollection {
/** /**
* Get the last block of the collection. * Get the last block of the collection.
@ -56,7 +58,7 @@ public interface BlockCollection {
* Get block replication for the collection * Get block replication for the collection
* @return block replication value * @return block replication value
*/ */
public short getReplication(); public short getBlockReplication();
/** /**
* Get the name of the collection. * Get the name of the collection.

View File

@ -68,7 +68,7 @@ public BlockInfo(Block blk, int replication) {
* @param from BlockInfo to copy from. * @param from BlockInfo to copy from.
*/ */
protected BlockInfo(BlockInfo from) { protected BlockInfo(BlockInfo from) {
this(from, from.bc.getReplication()); this(from, from.bc.getBlockReplication());
this.bc = from.bc; this.bc = from.bc;
} }
@ -344,7 +344,7 @@ public BlockInfoUnderConstruction convertToBlockUnderConstruction(
BlockUCState s, DatanodeDescriptor[] targets) { BlockUCState s, DatanodeDescriptor[] targets) {
if(isComplete()) { if(isComplete()) {
return new BlockInfoUnderConstruction( return new BlockInfoUnderConstruction(
this, getBlockCollection().getReplication(), s, targets); this, getBlockCollection().getBlockReplication(), s, targets);
} }
// the block is already under construction // the block is already under construction
BlockInfoUnderConstruction ucBlock = (BlockInfoUnderConstruction)this; BlockInfoUnderConstruction ucBlock = (BlockInfoUnderConstruction)this;

View File

@ -998,7 +998,7 @@ private void markBlockAsCorrupt(BlockToMarkCorrupt b,
// Add this replica to corruptReplicas Map // Add this replica to corruptReplicas Map
corruptReplicas.addToCorruptReplicasMap(b.corrupted, node, b.reason); corruptReplicas.addToCorruptReplicasMap(b.corrupted, node, b.reason);
if (countNodes(b.stored).liveReplicas() >= bc.getReplication()) { if (countNodes(b.stored).liveReplicas() >= bc.getBlockReplication()) {
// the block is over-replicated so invalidate the replicas immediately // the block is over-replicated so invalidate the replicas immediately
invalidateBlock(b, node); invalidateBlock(b, node);
} else if (namesystem.isPopulatingReplQueues()) { } else if (namesystem.isPopulatingReplQueues()) {
@ -1136,7 +1136,7 @@ int computeReplicationWorkForBlocks(List<List<Block>> blocksToReplicate) {
continue; continue;
} }
requiredReplication = bc.getReplication(); requiredReplication = bc.getBlockReplication();
// get a source data-node // get a source data-node
containingNodes = new ArrayList<DatanodeDescriptor>(); containingNodes = new ArrayList<DatanodeDescriptor>();
@ -1222,7 +1222,7 @@ int computeReplicationWorkForBlocks(List<List<Block>> blocksToReplicate) {
neededReplications.decrementReplicationIndex(priority); neededReplications.decrementReplicationIndex(priority);
continue; continue;
} }
requiredReplication = bc.getReplication(); requiredReplication = bc.getBlockReplication();
// do not schedule more if enough replicas is already pending // do not schedule more if enough replicas is already pending
NumberReplicas numReplicas = countNodes(block); NumberReplicas numReplicas = countNodes(block);
@ -2090,7 +2090,7 @@ private Block addStoredBlock(final BlockInfo block,
} }
// handle underReplication/overReplication // handle underReplication/overReplication
short fileReplication = bc.getReplication(); short fileReplication = bc.getBlockReplication();
if (!isNeededReplication(storedBlock, fileReplication, numCurrentReplica)) { if (!isNeededReplication(storedBlock, fileReplication, numCurrentReplica)) {
neededReplications.remove(storedBlock, numCurrentReplica, neededReplications.remove(storedBlock, numCurrentReplica,
num.decommissionedReplicas(), fileReplication); num.decommissionedReplicas(), fileReplication);
@ -2229,7 +2229,7 @@ private MisReplicationResult processMisReplicatedBlock(BlockInfo block) {
return MisReplicationResult.UNDER_CONSTRUCTION; return MisReplicationResult.UNDER_CONSTRUCTION;
} }
// calculate current replication // calculate current replication
short expectedReplication = bc.getReplication(); short expectedReplication = bc.getBlockReplication();
NumberReplicas num = countNodes(block); NumberReplicas num = countNodes(block);
int numCurrentReplica = num.liveReplicas(); int numCurrentReplica = num.liveReplicas();
// add to under-replicated queue if need to be // add to under-replicated queue if need to be
@ -2728,7 +2728,7 @@ void processOverReplicatedBlocksOnReCommission(
while(it.hasNext()) { while(it.hasNext()) {
final Block block = it.next(); final Block block = it.next();
BlockCollection bc = blocksMap.getBlockCollection(block); BlockCollection bc = blocksMap.getBlockCollection(block);
short expectedReplication = bc.getReplication(); short expectedReplication = bc.getBlockReplication();
NumberReplicas num = countNodes(block); NumberReplicas num = countNodes(block);
int numCurrentReplica = num.liveReplicas(); int numCurrentReplica = num.liveReplicas();
if (numCurrentReplica > expectedReplication) { if (numCurrentReplica > expectedReplication) {
@ -2874,7 +2874,7 @@ private int getReplication(Block block) {
if (bc == null) { // block does not belong to any file if (bc == null) { // block does not belong to any file
return 0; return 0;
} }
return bc.getReplication(); return bc.getBlockReplication();
} }

View File

@ -345,13 +345,13 @@ BlockInfo addBlock(String path,
// check quota limits and updated space consumed // check quota limits and updated space consumed
updateCount(inodes, inodes.length-1, 0, updateCount(inodes, inodes.length-1, 0,
fileINode.getPreferredBlockSize()*fileINode.getReplication(), true); fileINode.getPreferredBlockSize()*fileINode.getBlockReplication(), true);
// associate new last block for the file // associate new last block for the file
BlockInfoUnderConstruction blockInfo = BlockInfoUnderConstruction blockInfo =
new BlockInfoUnderConstruction( new BlockInfoUnderConstruction(
block, block,
fileINode.getReplication(), fileINode.getBlockReplication(),
BlockUCState.UNDER_CONSTRUCTION, BlockUCState.UNDER_CONSTRUCTION,
targets); targets);
getBlockManager().addBlockCollection(blockInfo, fileINode); getBlockManager().addBlockCollection(blockInfo, fileINode);
@ -442,7 +442,7 @@ void unprotectedRemoveBlock(String path, INodeFileUnderConstruction fileNode,
// update space consumed // update space consumed
INode[] pathINodes = getExistingPathINodes(path); INode[] pathINodes = getExistingPathINodes(path);
updateCount(pathINodes, pathINodes.length-1, 0, updateCount(pathINodes, pathINodes.length-1, 0,
-fileNode.getPreferredBlockSize()*fileNode.getReplication(), true); -fileNode.getPreferredBlockSize()*fileNode.getBlockReplication(), true);
} }
/** /**
@ -821,7 +821,7 @@ Block[] unprotectedSetReplication(String src,
return null; return null;
} }
INodeFile fileNode = (INodeFile)inode; INodeFile fileNode = (INodeFile)inode;
final short oldRepl = fileNode.getReplication(); final short oldRepl = fileNode.getBlockReplication();
// check disk quota // check disk quota
long dsDelta = (replication - oldRepl) * (fileNode.diskspaceConsumed()/oldRepl); long dsDelta = (replication - oldRepl) * (fileNode.diskspaceConsumed()/oldRepl);
@ -2061,7 +2061,7 @@ private HdfsFileStatus createFileStatus(byte[] path, INode node) {
if (node instanceof INodeFile) { if (node instanceof INodeFile) {
INodeFile fileNode = (INodeFile)node; INodeFile fileNode = (INodeFile)node;
size = fileNode.computeFileSize(true); size = fileNode.computeFileSize(true);
replication = fileNode.getReplication(); replication = fileNode.getBlockReplication();
blocksize = fileNode.getPreferredBlockSize(); blocksize = fileNode.getPreferredBlockSize();
} }
return new HdfsFileStatus( return new HdfsFileStatus(
@ -2091,7 +2091,7 @@ private HdfsLocatedFileStatus createLocatedFileStatus(
if (node instanceof INodeFile) { if (node instanceof INodeFile) {
INodeFile fileNode = (INodeFile)node; INodeFile fileNode = (INodeFile)node;
size = fileNode.computeFileSize(true); size = fileNode.computeFileSize(true);
replication = fileNode.getReplication(); replication = fileNode.getBlockReplication();
blocksize = fileNode.getPreferredBlockSize(); blocksize = fileNode.getPreferredBlockSize();
loc = getFSNamesystem().getBlockManager().createLocatedBlocks( loc = getFSNamesystem().getBlockManager().createLocatedBlocks(
fileNode.getBlocks(), fileNode.computeFileSize(false), fileNode.getBlocks(), fileNode.computeFileSize(false),

View File

@ -602,7 +602,7 @@ private void printStatistics(boolean force) {
public void logOpenFile(String path, INodeFileUnderConstruction newNode) { public void logOpenFile(String path, INodeFileUnderConstruction newNode) {
AddOp op = AddOp.getInstance(cache.get()) AddOp op = AddOp.getInstance(cache.get())
.setPath(path) .setPath(path)
.setReplication(newNode.getReplication()) .setReplication(newNode.getBlockReplication())
.setModificationTime(newNode.getModificationTime()) .setModificationTime(newNode.getModificationTime())
.setAccessTime(newNode.getAccessTime()) .setAccessTime(newNode.getAccessTime())
.setBlockSize(newNode.getPreferredBlockSize()) .setBlockSize(newNode.getPreferredBlockSize())
@ -620,7 +620,7 @@ public void logOpenFile(String path, INodeFileUnderConstruction newNode) {
public void logCloseFile(String path, INodeFile newNode) { public void logCloseFile(String path, INodeFile newNode) {
CloseOp op = CloseOp.getInstance(cache.get()) CloseOp op = CloseOp.getInstance(cache.get())
.setPath(path) .setPath(path)
.setReplication(newNode.getReplication()) .setReplication(newNode.getBlockReplication())
.setModificationTime(newNode.getModificationTime()) .setModificationTime(newNode.getModificationTime())
.setAccessTime(newNode.getAccessTime()) .setAccessTime(newNode.getAccessTime())
.setBlockSize(newNode.getPreferredBlockSize()) .setBlockSize(newNode.getPreferredBlockSize())

View File

@ -594,13 +594,13 @@ private void updateBlocks(FSDirectory fsDir, BlockListUpdatingOp op,
// what about an old-version fsync() where fsync isn't called // what about an old-version fsync() where fsync isn't called
// until several blocks in? // until several blocks in?
newBI = new BlockInfoUnderConstruction( newBI = new BlockInfoUnderConstruction(
newBlock, file.getReplication()); newBlock, file.getBlockReplication());
} else { } else {
// OP_CLOSE should add finalized blocks. This code path // OP_CLOSE should add finalized blocks. This code path
// is only executed when loading edits written by prior // is only executed when loading edits written by prior
// versions of Hadoop. Current versions always log // versions of Hadoop. Current versions always log
// OP_ADD operations as each block is allocated. // OP_ADD operations as each block is allocated.
newBI = new BlockInfo(newBlock, file.getReplication()); newBI = new BlockInfo(newBlock, file.getBlockReplication());
} }
fsNamesys.getBlockManager().addBlockCollection(newBI, file); fsNamesys.getBlockManager().addBlockCollection(newBI, file);
file.addBlock(newBI); file.addBlock(newBI);

View File

@ -126,7 +126,7 @@ static void writeINodeUnderConstruction(DataOutputStream out,
String path) String path)
throws IOException { throws IOException {
writeString(path, out); writeString(path, out);
out.writeShort(cons.getReplication()); out.writeShort(cons.getBlockReplication());
out.writeLong(cons.getModificationTime()); out.writeLong(cons.getModificationTime());
out.writeLong(cons.getPreferredBlockSize()); out.writeLong(cons.getPreferredBlockSize());
int nrBlocks = cons.getBlocks().length; int nrBlocks = cons.getBlocks().length;
@ -175,7 +175,7 @@ static void saveINode2Image(INode node,
filePerm); filePerm);
} else { } else {
INodeFile fileINode = (INodeFile)node; INodeFile fileINode = (INodeFile)node;
out.writeShort(fileINode.getReplication()); out.writeShort(fileINode.getBlockReplication());
out.writeLong(fileINode.getModificationTime()); out.writeLong(fileINode.getModificationTime());
out.writeLong(fileINode.getAccessTime()); out.writeLong(fileINode.getAccessTime());
out.writeLong(fileINode.getPreferredBlockSize()); out.writeLong(fileINode.getPreferredBlockSize());

View File

@ -1396,7 +1396,7 @@ private void concatInternal(String target, String [] srcs)
} }
si.add(trgInode); si.add(trgInode);
short repl = trgInode.getReplication(); short repl = trgInode.getBlockReplication();
// now check the srcs // now check the srcs
boolean endSrc = false; // final src file doesn't have to have full end block boolean endSrc = false; // final src file doesn't have to have full end block
@ -1416,10 +1416,10 @@ private void concatInternal(String target, String [] srcs)
} }
// check replication and blocks size // check replication and blocks size
if(repl != srcInode.getReplication()) { if(repl != srcInode.getBlockReplication()) {
throw new IllegalArgumentException(src + " and " + target + " " + throw new IllegalArgumentException(src + " and " + target + " " +
"should have same replication: " "should have same replication: "
+ repl + " vs. " + srcInode.getReplication()); + repl + " vs. " + srcInode.getBlockReplication());
} }
//boolean endBlock=false; //boolean endBlock=false;
@ -1862,7 +1862,7 @@ LocatedBlock prepareFileForWrite(String src, INodeFile file,
boolean writeToEditLog) throws IOException { boolean writeToEditLog) throws IOException {
INodeFileUnderConstruction cons = new INodeFileUnderConstruction( INodeFileUnderConstruction cons = new INodeFileUnderConstruction(
file.getLocalNameBytes(), file.getLocalNameBytes(),
file.getReplication(), file.getBlockReplication(),
file.getModificationTime(), file.getModificationTime(),
file.getPreferredBlockSize(), file.getPreferredBlockSize(),
file.getBlocks(), file.getBlocks(),
@ -2176,7 +2176,7 @@ LocatedBlock getAdditionalBlock(String src,
fileLength = pendingFile.computeContentSummary().getLength(); fileLength = pendingFile.computeContentSummary().getLength();
blockSize = pendingFile.getPreferredBlockSize(); blockSize = pendingFile.getPreferredBlockSize();
clientNode = pendingFile.getClientNode(); clientNode = pendingFile.getClientNode();
replication = pendingFile.getReplication(); replication = pendingFile.getBlockReplication();
} finally { } finally {
writeUnlock(); writeUnlock();
} }
@ -2420,7 +2420,7 @@ private boolean completeFileInternal(String src,
* them into invalidateBlocks. * them into invalidateBlocks.
*/ */
private void checkReplicationFactor(INodeFile file) { private void checkReplicationFactor(INodeFile file) {
short numExpectedReplicas = file.getReplication(); short numExpectedReplicas = file.getBlockReplication();
Block[] pendingBlocks = file.getBlocks(); Block[] pendingBlocks = file.getBlocks();
int nrBlocks = pendingBlocks.length; int nrBlocks = pendingBlocks.length;
for (int i = 0; i < nrBlocks; i++) { for (int i = 0; i < nrBlocks; i++) {
@ -3139,7 +3139,7 @@ private void commitOrCompleteLastBlock(final INodeFileUnderConstruction fileINod
if (diff > 0) { if (diff > 0) {
try { try {
String path = leaseManager.findPath(fileINode); String path = leaseManager.findPath(fileINode);
dir.updateSpaceConsumed(path, 0, -diff * fileINode.getReplication()); dir.updateSpaceConsumed(path, 0, -diff * fileINode.getBlockReplication());
} catch (IOException e) { } catch (IOException e) {
LOG.warn("Unexpected exception while updating disk space.", e); LOG.warn("Unexpected exception while updating disk space.", e);
} }

View File

@ -71,7 +71,7 @@ boolean isDirectory() {
/** @return the replication factor of the file. */ /** @return the replication factor of the file. */
@Override @Override
public short getReplication() { public short getBlockReplication() {
return (short) ((header & HEADERMASK) >> BLOCKBITS); return (short) ((header & HEADERMASK) >> BLOCKBITS);
} }
@ -215,7 +215,7 @@ private long diskspaceConsumed(Block[] blkArr) {
isUnderConstruction()) { isUnderConstruction()) {
size += getPreferredBlockSize() - blkArr[blkArr.length-1].getNumBytes(); size += getPreferredBlockSize() - blkArr[blkArr.length-1].getNumBytes();
} }
return size * getReplication(); return size * getBlockReplication();
} }
/** /**

View File

@ -104,7 +104,7 @@ assert allBlocksComplete() :
"non-complete blocks! Blocks are: " + blocksAsString(); "non-complete blocks! Blocks are: " + blocksAsString();
INodeFile obj = new INodeFile(getPermissionStatus(), INodeFile obj = new INodeFile(getPermissionStatus(),
getBlocks(), getBlocks(),
getReplication(), getBlockReplication(),
getModificationTime(), getModificationTime(),
getModificationTime(), getModificationTime(),
getPreferredBlockSize()); getPreferredBlockSize());

View File

@ -785,7 +785,7 @@ public void toXML(XMLOutputter doc) throws IOException {
doc.endTag(); doc.endTag();
doc.startTag("replication"); doc.startTag("replication");
doc.pcdata(""+inode.getReplication()); doc.pcdata(""+inode.getBlockReplication());
doc.endTag(); doc.endTag();
doc.startTag("disk_space_consumed"); doc.startTag("disk_space_consumed");

View File

@ -379,7 +379,7 @@ private List<DatanodeDescriptor> startDecommission(int ... indexes) {
private BlockInfo addBlockOnNodes(long blockId, List<DatanodeDescriptor> nodes) { private BlockInfo addBlockOnNodes(long blockId, List<DatanodeDescriptor> nodes) {
BlockCollection bc = Mockito.mock(BlockCollection.class); BlockCollection bc = Mockito.mock(BlockCollection.class);
Mockito.doReturn((short)3).when(bc).getReplication(); Mockito.doReturn((short)3).when(bc).getBlockReplication();
BlockInfo blockInfo = blockOnNodes(blockId, nodes); BlockInfo blockInfo = blockOnNodes(blockId, nodes);
bm.blocksMap.addBlockCollection(blockInfo, bc); bm.blocksMap.addBlockCollection(blockInfo, bc);

View File

@ -48,7 +48,7 @@ public void testReplication () {
FsPermission.getDefault()), null, replication, FsPermission.getDefault()), null, replication,
0L, 0L, preferredBlockSize); 0L, 0L, preferredBlockSize);
assertEquals("True has to be returned in this case", replication, assertEquals("True has to be returned in this case", replication,
inf.getReplication()); inf.getBlockReplication());
} }
/** /**