HDFS-7885. Datanode should not trust the generation stamp provided by client. Contributed by Tsz Wo Nicholas Sze.
This commit is contained in:
parent
95bfd087dc
commit
24db0812be
|
@ -1104,6 +1104,9 @@ Release 2.7.0 - UNRELEASED
|
|||
|
||||
HDFS-7434. DatanodeID hashCode should not be mutable. (daryn via kihwal)
|
||||
|
||||
HDFS-7885. Datanode should not trust the generation stamp provided by
|
||||
client. (Tsz Wo Nicholas Sze via jing9)
|
||||
|
||||
BREAKDOWN OF HDFS-7584 SUBTASKS AND RELATED JIRAS
|
||||
|
||||
HDFS-7720. Quota by Storage Type API, tools and ClientNameNode
|
||||
|
|
|
@ -2568,6 +2568,21 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
@Override // FsDatasetSpi
|
||||
public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block)
|
||||
throws IOException {
|
||||
synchronized(this) {
|
||||
final Replica replica = volumeMap.get(block.getBlockPoolId(),
|
||||
block.getBlockId());
|
||||
if (replica == null) {
|
||||
throw new ReplicaNotFoundException(block);
|
||||
}
|
||||
if (replica.getGenerationStamp() < block.getGenerationStamp()) {
|
||||
throw new IOException(
|
||||
"Replica generation stamp < block generation stamp, block="
|
||||
+ block + ", replica=" + replica);
|
||||
} else if (replica.getGenerationStamp() > block.getGenerationStamp()) {
|
||||
block.setGenerationStamp(replica.getGenerationStamp());
|
||||
}
|
||||
}
|
||||
|
||||
File datafile = getBlockFile(block);
|
||||
File metafile = FsDatasetUtil.getMetaFile(datafile, block.getGenerationStamp());
|
||||
BlockLocalPathInfo info = new BlockLocalPathInfo(block,
|
||||
|
|
|
@ -30,11 +30,16 @@ import org.apache.hadoop.fs.FSDataInputStream;
|
|||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.net.unix.DomainSocket;
|
||||
import org.apache.hadoop.net.unix.TemporarySocketDirectory;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Assume;
|
||||
import org.junit.BeforeClass;
|
||||
|
@ -153,4 +158,62 @@ public class TestBlockReaderLocalLegacy {
|
|||
Arrays.equals(orig, buf);
|
||||
cluster.shutdown();
|
||||
}
|
||||
|
||||
@Test(timeout=20000)
|
||||
public void testBlockReaderLocalLegacyWithAppend() throws Exception {
|
||||
final short REPL_FACTOR = 1;
|
||||
final HdfsConfiguration conf = getConfiguration(null);
|
||||
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL, true);
|
||||
|
||||
final MiniDFSCluster cluster =
|
||||
new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
|
||||
cluster.waitActive();
|
||||
|
||||
final DistributedFileSystem dfs = cluster.getFileSystem();
|
||||
final Path path = new Path("/testBlockReaderLocalLegacy");
|
||||
DFSTestUtil.createFile(dfs, path, 10, REPL_FACTOR, 0);
|
||||
DFSTestUtil.waitReplication(dfs, path, REPL_FACTOR);
|
||||
|
||||
final ClientDatanodeProtocol proxy;
|
||||
final Token<BlockTokenIdentifier> token;
|
||||
final ExtendedBlock originalBlock;
|
||||
final long originalGS;
|
||||
{
|
||||
final LocatedBlock lb = cluster.getNameNode().getRpcServer()
|
||||
.getBlockLocations(path.toString(), 0, 1).get(0);
|
||||
proxy = DFSUtil.createClientDatanodeProtocolProxy(
|
||||
lb.getLocations()[0], conf, 60000, false);
|
||||
token = lb.getBlockToken();
|
||||
|
||||
// get block and generation stamp
|
||||
final ExtendedBlock blk = new ExtendedBlock(lb.getBlock());
|
||||
originalBlock = new ExtendedBlock(blk);
|
||||
originalGS = originalBlock.getGenerationStamp();
|
||||
|
||||
// test getBlockLocalPathInfo
|
||||
final BlockLocalPathInfo info = proxy.getBlockLocalPathInfo(blk, token);
|
||||
Assert.assertEquals(originalGS, info.getBlock().getGenerationStamp());
|
||||
}
|
||||
|
||||
{ // append one byte
|
||||
FSDataOutputStream out = dfs.append(path);
|
||||
out.write(1);
|
||||
out.close();
|
||||
}
|
||||
|
||||
{
|
||||
// get new generation stamp
|
||||
final LocatedBlock lb = cluster.getNameNode().getRpcServer()
|
||||
.getBlockLocations(path.toString(), 0, 1).get(0);
|
||||
final long newGS = lb.getBlock().getGenerationStamp();
|
||||
Assert.assertTrue(newGS > originalGS);
|
||||
|
||||
// getBlockLocalPathInfo using the original block.
|
||||
Assert.assertEquals(originalGS, originalBlock.getGenerationStamp());
|
||||
final BlockLocalPathInfo info = proxy.getBlockLocalPathInfo(
|
||||
originalBlock, token);
|
||||
Assert.assertEquals(newGS, info.getBlock().getGenerationStamp());
|
||||
}
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue