diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java index f9e7d6ec70d..757924d8a70 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java @@ -257,6 +257,13 @@ public class DFSInputStream extends FSInputStream } } locatedBlocks = newInfo; + long lastBlkBeingWrittenLength = getLastBlockLength(); + fileEncryptionInfo = locatedBlocks.getFileEncryptionInfo(); + + return lastBlkBeingWrittenLength; + } + + private long getLastBlockLength() throws IOException{ long lastBlockBeingWrittenLength = 0; if (!locatedBlocks.isLastBlockComplete()) { final LocatedBlock last = locatedBlocks.getLastLocatedBlock(); @@ -275,8 +282,6 @@ public class DFSInputStream extends FSInputStream } } - fileEncryptionInfo = locatedBlocks.getFileEncryptionInfo(); - return lastBlockBeingWrittenLength; } @@ -459,7 +464,14 @@ public class DFSInputStream extends FSInputStream if (newBlocks == null || newBlocks.locatedBlockCount() == 0) { throw new EOFException("Could not find target position " + offset); } - locatedBlocks.insertRange(targetBlockIdx, newBlocks.getLocatedBlocks()); + // Update the LastLocatedBlock, if offset is for last block. + if (offset >= locatedBlocks.getFileLength()) { + locatedBlocks = newBlocks; + lastBlockBeingWrittenLength = getLastBlockLength(); + } else { + locatedBlocks.insertRange(targetBlockIdx, + newBlocks.getLocatedBlocks()); + } } return locatedBlocks.get(targetBlockIdx); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java index 59b28c1fc5b..4e2225967f3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java @@ -34,6 +34,7 @@ import java.io.File; import java.io.IOException; import java.io.DataOutput; import java.net.InetSocketAddress; +import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.Calendar; import java.util.EnumSet; @@ -44,12 +45,14 @@ import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DFSUtilClient; +import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocol.Block; @@ -64,6 +67,8 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetRep import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReplicaVisibleLengthResponseProto; import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolPB; import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; +import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.TestWritable; @@ -91,6 +96,7 @@ import org.mockito.stubbing.Answer; import com.google.protobuf.BlockingService; import com.google.protobuf.ServiceException; + import org.apache.hadoop.fs.StorageType; /** Unit tests for block tokens */ @@ -891,4 +897,47 @@ public class TestBlockToken { new DataInputStream(new ByteArrayInputStream(masterId.getBytes()))); assertArrayEquals(password, sm.retrievePassword(slaveId)); } + + /** Test for last in-progress block token expiry. + * 1. Write file with one block which is in-progress. + * 2. Open input stream and close the output stream. + * 3. Wait for block token expiration and read the data. + * 4. Read should be success. + */ + @Test + public void testLastLocatedBlockTokenExpiry() + throws IOException, InterruptedException { + Configuration conf = new Configuration(); + conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true); + try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(1).build()) { + cluster.waitClusterUp(); + final NameNode nn = cluster.getNameNode(); + final BlockManager bm = nn.getNamesystem().getBlockManager(); + final BlockTokenSecretManager sm = bm.getBlockTokenSecretManager(); + + // set a short token lifetime (1 second) + SecurityTestUtil.setBlockTokenLifetime(sm, 1000L); + + DistributedFileSystem fs = cluster.getFileSystem(); + Path p = new Path("/tmp/abc.log"); + FSDataOutputStream out = fs.create(p); + byte[] data = "hello\n".getBytes(StandardCharsets.UTF_8); + out.write(data); + out.hflush(); + FSDataInputStream in = fs.open(p); + out.close(); + + // wait for last block token to expire + Thread.sleep(2000L); + + byte[] readData = new byte[data.length]; + long startTime = System.currentTimeMillis(); + in.read(readData); + // DFSInputStream#refetchLocations() minimum wait for 1sec to refetch + // complete located blocks. + assertTrue("Should not wait for refetch complete located blocks", + 1000L > (System.currentTimeMillis() - startTime)); + } + } }