HDFS-14384. When lastLocatedBlock token expire, it will take 1~3s second to refetch it. Contributed by Surendra Singh Lilhore.

(cherry picked from commit c36014165c)
This commit is contained in:
Surendra Singh Lilhore 2019-11-06 19:28:55 +05:30
parent 56988e88f7
commit aa1c795dc9
2 changed files with 64 additions and 3 deletions

View File

@ -255,6 +255,13 @@ public class DFSInputStream extends FSInputStream
} }
} }
locatedBlocks = newInfo; locatedBlocks = newInfo;
long lastBlkBeingWrittenLength = getLastBlockLength();
fileEncryptionInfo = locatedBlocks.getFileEncryptionInfo();
return lastBlkBeingWrittenLength;
}
private long getLastBlockLength() throws IOException{
long lastBlockBeingWrittenLength = 0; long lastBlockBeingWrittenLength = 0;
if (!locatedBlocks.isLastBlockComplete()) { if (!locatedBlocks.isLastBlockComplete()) {
final LocatedBlock last = locatedBlocks.getLastLocatedBlock(); final LocatedBlock last = locatedBlocks.getLastLocatedBlock();
@ -273,8 +280,6 @@ public class DFSInputStream extends FSInputStream
} }
} }
fileEncryptionInfo = locatedBlocks.getFileEncryptionInfo();
return lastBlockBeingWrittenLength; return lastBlockBeingWrittenLength;
} }
@ -457,7 +462,14 @@ public class DFSInputStream extends FSInputStream
if (newBlocks == null || newBlocks.locatedBlockCount() == 0) { if (newBlocks == null || newBlocks.locatedBlockCount() == 0) {
throw new EOFException("Could not find target position " + offset); throw new EOFException("Could not find target position " + offset);
} }
locatedBlocks.insertRange(targetBlockIdx, newBlocks.getLocatedBlocks()); // Update the LastLocatedBlock, if offset is for last block.
if (offset >= locatedBlocks.getFileLength()) {
locatedBlocks = newBlocks;
lastBlockBeingWrittenLength = getLastBlockLength();
} else {
locatedBlocks.insertRange(targetBlockIdx,
newBlocks.getLocatedBlocks());
}
} }
return locatedBlocks.get(targetBlockIdx); return locatedBlocks.get(targetBlockIdx);
} }

View File

@ -34,6 +34,7 @@ import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.DataOutput; import java.io.DataOutput;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.Arrays; import java.util.Arrays;
import java.util.Calendar; import java.util.Calendar;
import java.util.EnumSet; import java.util.EnumSet;
@ -44,12 +45,14 @@ import org.mockito.Mockito;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DFSUtilClient; import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
@ -64,6 +67,8 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetRep
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReplicaVisibleLengthResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReplicaVisibleLengthResponseProto;
import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolPB; import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.TestWritable; import org.apache.hadoop.io.TestWritable;
@ -92,6 +97,7 @@ import org.mockito.stubbing.Answer;
import com.google.protobuf.BlockingService; import com.google.protobuf.BlockingService;
import com.google.protobuf.RpcController; import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException; import com.google.protobuf.ServiceException;
import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.StorageType;
/** Unit tests for block tokens */ /** Unit tests for block tokens */
@ -887,4 +893,47 @@ public class TestBlockToken {
new DataInputStream(new ByteArrayInputStream(masterId.getBytes()))); new DataInputStream(new ByteArrayInputStream(masterId.getBytes())));
assertArrayEquals(password, sm.retrievePassword(slaveId)); assertArrayEquals(password, sm.retrievePassword(slaveId));
} }
/** Test for last in-progress block token expiry.
* 1. Write file with one block which is in-progress.
* 2. Open input stream and close the output stream.
* 3. Wait for block token expiration and read the data.
* 4. Read should be success.
*/
@Test
public void testLastLocatedBlockTokenExpiry()
throws IOException, InterruptedException {
Configuration conf = new Configuration();
conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(1).build()) {
cluster.waitClusterUp();
final NameNode nn = cluster.getNameNode();
final BlockManager bm = nn.getNamesystem().getBlockManager();
final BlockTokenSecretManager sm = bm.getBlockTokenSecretManager();
// set a short token lifetime (1 second)
SecurityTestUtil.setBlockTokenLifetime(sm, 1000L);
DistributedFileSystem fs = cluster.getFileSystem();
Path p = new Path("/tmp/abc.log");
FSDataOutputStream out = fs.create(p);
byte[] data = "hello\n".getBytes(StandardCharsets.UTF_8);
out.write(data);
out.hflush();
FSDataInputStream in = fs.open(p);
out.close();
// wait for last block token to expire
Thread.sleep(2000L);
byte[] readData = new byte[data.length];
long startTime = System.currentTimeMillis();
in.read(readData);
// DFSInputStream#refetchLocations() minimum wait for 1sec to refetch
// complete located blocks.
assertTrue("Should not wait for refetch complete located blocks",
1000L > (System.currentTimeMillis() - startTime));
}
}
} }