HDFS-14384. When lastLocatedBlock token expire, it will take 1~3s second to refetch it. Contributed by Surendra Singh Lilhore.

This commit is contained in:
Surendra Singh Lilhore 2019-11-06 19:28:55 +05:30
parent ee8addbec4
commit c36014165c
2 changed files with 64 additions and 3 deletions

View File

@ -257,6 +257,13 @@ public class DFSInputStream extends FSInputStream
}
}
locatedBlocks = newInfo;
long lastBlkBeingWrittenLength = getLastBlockLength();
fileEncryptionInfo = locatedBlocks.getFileEncryptionInfo();
return lastBlkBeingWrittenLength;
}
private long getLastBlockLength() throws IOException{
long lastBlockBeingWrittenLength = 0;
if (!locatedBlocks.isLastBlockComplete()) {
final LocatedBlock last = locatedBlocks.getLastLocatedBlock();
@ -275,8 +282,6 @@ public class DFSInputStream extends FSInputStream
}
}
fileEncryptionInfo = locatedBlocks.getFileEncryptionInfo();
return lastBlockBeingWrittenLength;
}
@ -459,7 +464,14 @@ public class DFSInputStream extends FSInputStream
if (newBlocks == null || newBlocks.locatedBlockCount() == 0) {
throw new EOFException("Could not find target position " + offset);
}
locatedBlocks.insertRange(targetBlockIdx, newBlocks.getLocatedBlocks());
// Update the LastLocatedBlock, if offset is for last block.
if (offset >= locatedBlocks.getFileLength()) {
locatedBlocks = newBlocks;
lastBlockBeingWrittenLength = getLastBlockLength();
} else {
locatedBlocks.insertRange(targetBlockIdx,
newBlocks.getLocatedBlocks());
}
}
return locatedBlocks.get(targetBlockIdx);
}

View File

@ -34,6 +34,7 @@ import java.io.File;
import java.io.IOException;
import java.io.DataOutput;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Calendar;
import java.util.EnumSet;
@ -44,12 +45,14 @@ import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.Block;
@ -64,6 +67,8 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetRep
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReplicaVisibleLengthResponseProto;
import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.TestWritable;
@ -91,6 +96,7 @@ import org.mockito.stubbing.Answer;
import com.google.protobuf.BlockingService;
import com.google.protobuf.ServiceException;
import org.apache.hadoop.fs.StorageType;
/** Unit tests for block tokens */
@ -891,4 +897,47 @@ public class TestBlockToken {
new DataInputStream(new ByteArrayInputStream(masterId.getBytes())));
assertArrayEquals(password, sm.retrievePassword(slaveId));
}
/** Test for last in-progress block token expiry.
* 1. Write file with one block which is in-progress.
* 2. Open input stream and close the output stream.
* 3. Wait for block token expiration and read the data.
* 4. Read should be success.
*/
@Test
public void testLastLocatedBlockTokenExpiry()
throws IOException, InterruptedException {
Configuration conf = new Configuration();
conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(1).build()) {
cluster.waitClusterUp();
final NameNode nn = cluster.getNameNode();
final BlockManager bm = nn.getNamesystem().getBlockManager();
final BlockTokenSecretManager sm = bm.getBlockTokenSecretManager();
// set a short token lifetime (1 second)
SecurityTestUtil.setBlockTokenLifetime(sm, 1000L);
DistributedFileSystem fs = cluster.getFileSystem();
Path p = new Path("/tmp/abc.log");
FSDataOutputStream out = fs.create(p);
byte[] data = "hello\n".getBytes(StandardCharsets.UTF_8);
out.write(data);
out.hflush();
FSDataInputStream in = fs.open(p);
out.close();
// wait for last block token to expire
Thread.sleep(2000L);
byte[] readData = new byte[data.length];
long startTime = System.currentTimeMillis();
in.read(readData);
// DFSInputStream#refetchLocations() minimum wait for 1sec to refetch
// complete located blocks.
assertTrue("Should not wait for refetch complete located blocks",
1000L > (System.currentTimeMillis() - startTime));
}
}
}