HDFS-7885. Datanode should not trust the generation stamp provided by client. Contributed by Tsz Wo Nicholas Sze.

(cherry picked from commit 24db0812be)
This commit is contained in:
Jing Zhao 2015-03-06 10:55:56 -08:00
parent a5f3fb4dc1
commit 994dadb9ba
3 changed files with 81 additions and 0 deletions

View File

@ -798,6 +798,9 @@ Release 2.7.0 - UNRELEASED
HDFS-7434. DatanodeID hashCode should not be mutable. (daryn via kihwal) HDFS-7434. DatanodeID hashCode should not be mutable. (daryn via kihwal)
HDFS-7885. Datanode should not trust the generation stamp provided by
client. (Tsz Wo Nicholas Sze via jing9)
BREAKDOWN OF HDFS-7584 SUBTASKS AND RELATED JIRAS BREAKDOWN OF HDFS-7584 SUBTASKS AND RELATED JIRAS
HDFS-7720. Quota by Storage Type API, tools and ClientNameNode HDFS-7720. Quota by Storage Type API, tools and ClientNameNode

View File

@ -2568,6 +2568,21 @@ public synchronized void deleteBlockPool(String bpid, boolean force)
@Override // FsDatasetSpi @Override // FsDatasetSpi
public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block) public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block)
throws IOException { throws IOException {
synchronized(this) {
final Replica replica = volumeMap.get(block.getBlockPoolId(),
block.getBlockId());
if (replica == null) {
throw new ReplicaNotFoundException(block);
}
if (replica.getGenerationStamp() < block.getGenerationStamp()) {
throw new IOException(
"Replica generation stamp < block generation stamp, block="
+ block + ", replica=" + replica);
} else if (replica.getGenerationStamp() > block.getGenerationStamp()) {
block.setGenerationStamp(replica.getGenerationStamp());
}
}
File datafile = getBlockFile(block); File datafile = getBlockFile(block);
File metafile = FsDatasetUtil.getMetaFile(datafile, block.getGenerationStamp()); File metafile = FsDatasetUtil.getMetaFile(datafile, block.getGenerationStamp());
BlockLocalPathInfo info = new BlockLocalPathInfo(block, BlockLocalPathInfo info = new BlockLocalPathInfo(block,

View File

@ -30,11 +30,16 @@
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.unix.DomainSocket; import org.apache.hadoop.net.unix.DomainSocket;
import org.apache.hadoop.net.unix.TemporarySocketDirectory; import org.apache.hadoop.net.unix.TemporarySocketDirectory;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Assume; import org.junit.Assume;
import org.junit.BeforeClass; import org.junit.BeforeClass;
@ -153,4 +158,62 @@ public void testBothOldAndNewShortCircuitConfigured() throws Exception {
Arrays.equals(orig, buf); Arrays.equals(orig, buf);
cluster.shutdown(); cluster.shutdown();
} }
@Test(timeout=20000)
public void testBlockReaderLocalLegacyWithAppend() throws Exception {
final short REPL_FACTOR = 1;
final HdfsConfiguration conf = getConfiguration(null);
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL, true);
final MiniDFSCluster cluster =
new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
final DistributedFileSystem dfs = cluster.getFileSystem();
final Path path = new Path("/testBlockReaderLocalLegacy");
DFSTestUtil.createFile(dfs, path, 10, REPL_FACTOR, 0);
DFSTestUtil.waitReplication(dfs, path, REPL_FACTOR);
final ClientDatanodeProtocol proxy;
final Token<BlockTokenIdentifier> token;
final ExtendedBlock originalBlock;
final long originalGS;
{
final LocatedBlock lb = cluster.getNameNode().getRpcServer()
.getBlockLocations(path.toString(), 0, 1).get(0);
proxy = DFSUtil.createClientDatanodeProtocolProxy(
lb.getLocations()[0], conf, 60000, false);
token = lb.getBlockToken();
// get block and generation stamp
final ExtendedBlock blk = new ExtendedBlock(lb.getBlock());
originalBlock = new ExtendedBlock(blk);
originalGS = originalBlock.getGenerationStamp();
// test getBlockLocalPathInfo
final BlockLocalPathInfo info = proxy.getBlockLocalPathInfo(blk, token);
Assert.assertEquals(originalGS, info.getBlock().getGenerationStamp());
}
{ // append one byte
FSDataOutputStream out = dfs.append(path);
out.write(1);
out.close();
}
{
// get new generation stamp
final LocatedBlock lb = cluster.getNameNode().getRpcServer()
.getBlockLocations(path.toString(), 0, 1).get(0);
final long newGS = lb.getBlock().getGenerationStamp();
Assert.assertTrue(newGS > originalGS);
// getBlockLocalPathInfo using the original block.
Assert.assertEquals(originalGS, originalBlock.getGenerationStamp());
final BlockLocalPathInfo info = proxy.getBlockLocalPathInfo(
originalBlock, token);
Assert.assertEquals(newGS, info.getBlock().getGenerationStamp());
}
cluster.shutdown();
}
} }