HDFS-4873. Merging change r1491957 from trunk to branch-2.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1491964 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
13381835cc
commit
443578ccf7
|
@ -857,6 +857,9 @@ Release 2.1.0-beta - UNRELEASED
|
|||
HDFS-4877. Snapshot: fix the scenario where a directory is renamed under
|
||||
its prior descendant. (jing9)
|
||||
|
||||
HDFS-4873. callGetBlockLocations returns incorrect number of blocks for
|
||||
snapshotted files. (jing9)
|
||||
|
||||
Release 2.0.5-alpha - 06/06/2013
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -682,7 +682,7 @@ public class BlockManager {
|
|||
}
|
||||
return machineSet;
|
||||
}
|
||||
|
||||
|
||||
private List<LocatedBlock> createLocatedBlockList(final BlockInfo[] blocks,
|
||||
final long offset, final long length, final int nrBlocksToReturn,
|
||||
final AccessMode mode) throws IOException {
|
||||
|
@ -713,6 +713,22 @@ public class BlockManager {
|
|||
return results;
|
||||
}
|
||||
|
||||
private LocatedBlock createLocatedBlock(final BlockInfo[] blocks,
|
||||
final long endPos, final AccessMode mode) throws IOException {
|
||||
int curBlk = 0;
|
||||
long curPos = 0;
|
||||
int nrBlocks = (blocks[0].getNumBytes() == 0) ? 0 : blocks.length;
|
||||
for (curBlk = 0; curBlk < nrBlocks; curBlk++) {
|
||||
long blkSize = blocks[curBlk].getNumBytes();
|
||||
if (curPos + blkSize >= endPos) {
|
||||
break;
|
||||
}
|
||||
curPos += blkSize;
|
||||
}
|
||||
|
||||
return createLocatedBlock(blocks[curBlk], curPos, mode);
|
||||
}
|
||||
|
||||
private LocatedBlock createLocatedBlock(final BlockInfo blk, final long pos,
|
||||
final BlockTokenSecretManager.AccessMode mode) throws IOException {
|
||||
final LocatedBlock lb = createLocatedBlock(blk, pos);
|
||||
|
@ -773,9 +789,9 @@ public class BlockManager {
|
|||
/** Create a LocatedBlocks. */
|
||||
public LocatedBlocks createLocatedBlocks(final BlockInfo[] blocks,
|
||||
final long fileSizeExcludeBlocksUnderConstruction,
|
||||
final boolean isFileUnderConstruction,
|
||||
final long offset, final long length, final boolean needBlockToken
|
||||
) throws IOException {
|
||||
final boolean isFileUnderConstruction, final long offset,
|
||||
final long length, final boolean needBlockToken, final boolean inSnapshot)
|
||||
throws IOException {
|
||||
assert namesystem.hasReadOrWriteLock();
|
||||
if (blocks == null) {
|
||||
return null;
|
||||
|
@ -790,14 +806,23 @@ public class BlockManager {
|
|||
final List<LocatedBlock> locatedblocks = createLocatedBlockList(
|
||||
blocks, offset, length, Integer.MAX_VALUE, mode);
|
||||
|
||||
final BlockInfo last = blocks[blocks.length - 1];
|
||||
final long lastPos = last.isComplete()?
|
||||
fileSizeExcludeBlocksUnderConstruction - last.getNumBytes()
|
||||
: fileSizeExcludeBlocksUnderConstruction;
|
||||
final LocatedBlock lastlb = createLocatedBlock(last, lastPos, mode);
|
||||
final LocatedBlock lastlb;
|
||||
final boolean isComplete;
|
||||
if (!inSnapshot) {
|
||||
final BlockInfo last = blocks[blocks.length - 1];
|
||||
final long lastPos = last.isComplete()?
|
||||
fileSizeExcludeBlocksUnderConstruction - last.getNumBytes()
|
||||
: fileSizeExcludeBlocksUnderConstruction;
|
||||
lastlb = createLocatedBlock(last, lastPos, mode);
|
||||
isComplete = last.isComplete();
|
||||
} else {
|
||||
lastlb = createLocatedBlock(blocks,
|
||||
fileSizeExcludeBlocksUnderConstruction, mode);
|
||||
isComplete = true;
|
||||
}
|
||||
return new LocatedBlocks(
|
||||
fileSizeExcludeBlocksUnderConstruction, isFileUnderConstruction,
|
||||
locatedblocks, lastlb, last.isComplete());
|
||||
locatedblocks, lastlb, isComplete);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -2537,46 +2537,40 @@ public class FSDirectory implements Closeable {
|
|||
node.getId());
|
||||
}
|
||||
|
||||
/**
|
||||
* Create FileStatus with location info by file INode
|
||||
*/
|
||||
private HdfsLocatedFileStatus createLocatedFileStatus(
|
||||
byte[] path, INode node, Snapshot snapshot) throws IOException {
|
||||
assert hasReadLock();
|
||||
long size = 0; // length is zero for directories
|
||||
short replication = 0;
|
||||
long blocksize = 0;
|
||||
LocatedBlocks loc = null;
|
||||
if (node.isFile()) {
|
||||
final INodeFile fileNode = node.asFile();
|
||||
size = fileNode.computeFileSize(snapshot);
|
||||
replication = fileNode.getFileReplication(snapshot);
|
||||
blocksize = fileNode.getPreferredBlockSize();
|
||||
/**
|
||||
* Create FileStatus with location info by file INode
|
||||
*/
|
||||
private HdfsLocatedFileStatus createLocatedFileStatus(byte[] path,
|
||||
INode node, Snapshot snapshot) throws IOException {
|
||||
assert hasReadLock();
|
||||
long size = 0; // length is zero for directories
|
||||
short replication = 0;
|
||||
long blocksize = 0;
|
||||
LocatedBlocks loc = null;
|
||||
if (node.isFile()) {
|
||||
final INodeFile fileNode = node.asFile();
|
||||
size = fileNode.computeFileSize(snapshot);
|
||||
replication = fileNode.getFileReplication(snapshot);
|
||||
blocksize = fileNode.getPreferredBlockSize();
|
||||
|
||||
final boolean isUc = fileNode.isUnderConstruction();
|
||||
final long fileSize = snapshot == null && isUc?
|
||||
fileNode.computeFileSizeNotIncludingLastUcBlock(): size;
|
||||
loc = getFSNamesystem().getBlockManager().createLocatedBlocks(
|
||||
fileNode.getBlocks(), fileSize, isUc, 0L, size, false);
|
||||
if (loc==null) {
|
||||
loc = new LocatedBlocks();
|
||||
}
|
||||
}
|
||||
return new HdfsLocatedFileStatus(
|
||||
size,
|
||||
node.isDirectory(),
|
||||
replication,
|
||||
blocksize,
|
||||
node.getModificationTime(snapshot),
|
||||
node.getAccessTime(snapshot),
|
||||
node.getFsPermission(snapshot),
|
||||
node.getUserName(snapshot),
|
||||
node.getGroupName(snapshot),
|
||||
node.isSymlink() ? node.asSymlink().getSymlink() : null,
|
||||
path,
|
||||
node.getId(),
|
||||
loc);
|
||||
final boolean inSnapshot = snapshot != null;
|
||||
final boolean isUc = inSnapshot ? false : fileNode.isUnderConstruction();
|
||||
final long fileSize = !inSnapshot && isUc ?
|
||||
fileNode.computeFileSizeNotIncludingLastUcBlock() : size;
|
||||
loc = getFSNamesystem().getBlockManager().createLocatedBlocks(
|
||||
fileNode.getBlocks(), fileSize, isUc, 0L, size, false,
|
||||
inSnapshot);
|
||||
if (loc == null) {
|
||||
loc = new LocatedBlocks();
|
||||
}
|
||||
}
|
||||
return new HdfsLocatedFileStatus(size, node.isDirectory(), replication,
|
||||
blocksize, node.getModificationTime(snapshot),
|
||||
node.getAccessTime(snapshot), node.getFsPermission(snapshot),
|
||||
node.getUserName(snapshot), node.getGroupName(snapshot),
|
||||
node.isSymlink() ? node.asSymlink().getSymlink() : null, path,
|
||||
node.getId(), loc);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
|
|
|
@ -179,6 +179,8 @@ import org.apache.hadoop.hdfs.server.namenode.ha.HAState;
|
|||
import org.apache.hadoop.hdfs.server.namenode.ha.StandbyCheckpointer;
|
||||
import org.apache.hadoop.hdfs.server.namenode.metrics.FSNamesystemMBean;
|
||||
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
|
||||
import org.apache.hadoop.hdfs.server.namenode.snapshot.FileWithSnapshot;
|
||||
import org.apache.hadoop.hdfs.server.namenode.snapshot.FileWithSnapshot.FileDiff;
|
||||
import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectorySnapshottable;
|
||||
import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectorySnapshottable.SnapshotDiffInfo;
|
||||
import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeFileWithSnapshot;
|
||||
|
@ -1390,11 +1392,18 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
dir.setTimes(src, inode, -1, now, false, iip.getLatestSnapshot());
|
||||
}
|
||||
}
|
||||
final long fileSize = iip.getPathSnapshot() != null?
|
||||
final long fileSize = iip.isSnapshot() ?
|
||||
inode.computeFileSize(iip.getPathSnapshot())
|
||||
: inode.computeFileSizeNotIncludingLastUcBlock();
|
||||
boolean isUc = inode.isUnderConstruction();
|
||||
if (iip.isSnapshot()) {
|
||||
// if src indicates a snapshot file, we need to make sure the returned
|
||||
// blocks do not exceed the size of the snapshot file.
|
||||
length = Math.min(length, fileSize - offset);
|
||||
isUc = false;
|
||||
}
|
||||
return blockManager.createLocatedBlocks(inode.getBlocks(), fileSize,
|
||||
inode.isUnderConstruction(), offset, length, needBlockToken);
|
||||
isUc, offset, length, needBlockToken, iip.isSnapshot());
|
||||
} finally {
|
||||
if (isReadOp) {
|
||||
readUnlock();
|
||||
|
|
|
@ -18,22 +18,28 @@
|
|||
package org.apache.hadoop.hdfs.server.namenode.snapshot;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.EnumSet;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
|
||||
import org.apache.commons.logging.impl.Log4JLogger;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.DFSClientAdapter;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
|
||||
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSDirectory;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||
import org.apache.hadoop.hdfs.server.namenode.INode;
|
||||
|
@ -182,5 +188,97 @@ public class TestINodeFileUnderConstructionWithSnapshot {
|
|||
|
||||
// re-check the size of nodeInDeleted_S1
|
||||
assertEquals(BLOCKSIZE * 3, fileNode.computeFileSize(s1));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* call DFSClient#callGetBlockLocations(...) for snapshot file. Make sure only
|
||||
* blocks within the size range are returned.
|
||||
*/
|
||||
@Test
|
||||
public void testGetBlockLocations() throws Exception {
|
||||
final Path root = new Path("/");
|
||||
final Path file = new Path("/file");
|
||||
DFSTestUtil.createFile(hdfs, file, BLOCKSIZE, REPLICATION, seed);
|
||||
|
||||
// take a snapshot on root
|
||||
SnapshotTestHelper.createSnapshot(hdfs, root, "s1");
|
||||
|
||||
final Path fileInSnapshot = SnapshotTestHelper.getSnapshotPath(root,
|
||||
"s1", file.getName());
|
||||
FileStatus status = hdfs.getFileStatus(fileInSnapshot);
|
||||
// make sure we record the size for the file
|
||||
assertEquals(BLOCKSIZE, status.getLen());
|
||||
|
||||
// append data to file
|
||||
DFSTestUtil.appendFile(hdfs, file, BLOCKSIZE - 1);
|
||||
status = hdfs.getFileStatus(fileInSnapshot);
|
||||
// the size of snapshot file should still be BLOCKSIZE
|
||||
assertEquals(BLOCKSIZE, status.getLen());
|
||||
// the size of the file should be (2 * BLOCKSIZE - 1)
|
||||
status = hdfs.getFileStatus(file);
|
||||
assertEquals(BLOCKSIZE * 2 - 1, status.getLen());
|
||||
|
||||
// call DFSClient#callGetBlockLocations for the file in snapshot
|
||||
LocatedBlocks blocks = DFSClientAdapter.callGetBlockLocations(
|
||||
cluster.getNameNodeRpc(), fileInSnapshot.toString(), 0, Long.MAX_VALUE);
|
||||
List<LocatedBlock> blockList = blocks.getLocatedBlocks();
|
||||
|
||||
// should be only one block
|
||||
assertEquals(BLOCKSIZE, blocks.getFileLength());
|
||||
assertEquals(1, blockList.size());
|
||||
|
||||
// check the last block
|
||||
LocatedBlock lastBlock = blocks.getLastLocatedBlock();
|
||||
assertEquals(0, lastBlock.getStartOffset());
|
||||
assertEquals(BLOCKSIZE, lastBlock.getBlockSize());
|
||||
|
||||
// take another snapshot
|
||||
SnapshotTestHelper.createSnapshot(hdfs, root, "s2");
|
||||
final Path fileInSnapshot2 = SnapshotTestHelper.getSnapshotPath(root,
|
||||
"s2", file.getName());
|
||||
|
||||
// append data to file without closing
|
||||
HdfsDataOutputStream out = appendFileWithoutClosing(file, BLOCKSIZE);
|
||||
out.hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH));
|
||||
|
||||
status = hdfs.getFileStatus(fileInSnapshot2);
|
||||
// the size of snapshot file should be BLOCKSIZE*2-1
|
||||
assertEquals(BLOCKSIZE * 2 - 1, status.getLen());
|
||||
// the size of the file should be (3 * BLOCKSIZE - 1)
|
||||
status = hdfs.getFileStatus(file);
|
||||
assertEquals(BLOCKSIZE * 3 - 1, status.getLen());
|
||||
|
||||
blocks = DFSClientAdapter.callGetBlockLocations(cluster.getNameNodeRpc(),
|
||||
fileInSnapshot2.toString(), 0, Long.MAX_VALUE);
|
||||
assertFalse(blocks.isUnderConstruction());
|
||||
assertTrue(blocks.isLastBlockComplete());
|
||||
blockList = blocks.getLocatedBlocks();
|
||||
|
||||
// should be 2 blocks
|
||||
assertEquals(BLOCKSIZE * 2 - 1, blocks.getFileLength());
|
||||
assertEquals(2, blockList.size());
|
||||
|
||||
// check the last block
|
||||
lastBlock = blocks.getLastLocatedBlock();
|
||||
assertEquals(BLOCKSIZE, lastBlock.getStartOffset());
|
||||
assertEquals(BLOCKSIZE, lastBlock.getBlockSize());
|
||||
|
||||
blocks = DFSClientAdapter.callGetBlockLocations(cluster.getNameNodeRpc(),
|
||||
fileInSnapshot2.toString(), BLOCKSIZE, 0);
|
||||
blockList = blocks.getLocatedBlocks();
|
||||
assertEquals(1, blockList.size());
|
||||
|
||||
// check blocks for file being written
|
||||
blocks = DFSClientAdapter.callGetBlockLocations(cluster.getNameNodeRpc(),
|
||||
file.toString(), 0, Long.MAX_VALUE);
|
||||
blockList = blocks.getLocatedBlocks();
|
||||
assertEquals(3, blockList.size());
|
||||
assertTrue(blocks.isUnderConstruction());
|
||||
assertFalse(blocks.isLastBlockComplete());
|
||||
|
||||
lastBlock = blocks.getLastLocatedBlock();
|
||||
assertEquals(BLOCKSIZE * 2, lastBlock.getStartOffset());
|
||||
assertEquals(BLOCKSIZE - 1, lastBlock.getBlockSize());
|
||||
out.close();
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue