HDFS-14384. When lastLocatedBlock token expire, it will take 1~3s second to refetch it. Contributed by Surendra Singh Lilhore.
(cherry picked from commit c36014165c
)
This commit is contained in:
parent
cbd501636c
commit
4690142e86
|
@ -254,6 +254,13 @@ public class DFSInputStream extends FSInputStream
|
|||
}
|
||||
}
|
||||
locatedBlocks = newInfo;
|
||||
long lastBlkBeingWrittenLength = getLastBlockLength();
|
||||
fileEncryptionInfo = locatedBlocks.getFileEncryptionInfo();
|
||||
|
||||
return lastBlkBeingWrittenLength;
|
||||
}
|
||||
|
||||
private long getLastBlockLength() throws IOException{
|
||||
long lastBlockBeingWrittenLength = 0;
|
||||
if (!locatedBlocks.isLastBlockComplete()) {
|
||||
final LocatedBlock last = locatedBlocks.getLastLocatedBlock();
|
||||
|
@ -272,8 +279,6 @@ public class DFSInputStream extends FSInputStream
|
|||
}
|
||||
}
|
||||
|
||||
fileEncryptionInfo = locatedBlocks.getFileEncryptionInfo();
|
||||
|
||||
return lastBlockBeingWrittenLength;
|
||||
}
|
||||
|
||||
|
@ -456,7 +461,14 @@ public class DFSInputStream extends FSInputStream
|
|||
if (newBlocks == null || newBlocks.locatedBlockCount() == 0) {
|
||||
throw new EOFException("Could not find target position " + offset);
|
||||
}
|
||||
locatedBlocks.insertRange(targetBlockIdx, newBlocks.getLocatedBlocks());
|
||||
// Update the LastLocatedBlock, if offset is for last block.
|
||||
if (offset >= locatedBlocks.getFileLength()) {
|
||||
locatedBlocks = newBlocks;
|
||||
lastBlockBeingWrittenLength = getLastBlockLength();
|
||||
} else {
|
||||
locatedBlocks.insertRange(targetBlockIdx,
|
||||
newBlocks.getLocatedBlocks());
|
||||
}
|
||||
}
|
||||
return locatedBlocks.get(targetBlockIdx);
|
||||
}
|
||||
|
|
|
@ -34,6 +34,7 @@ import java.io.File;
|
|||
import java.io.IOException;
|
||||
import java.io.DataOutput;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Arrays;
|
||||
import java.util.Calendar;
|
||||
import java.util.EnumSet;
|
||||
|
@ -44,12 +45,14 @@ import org.apache.commons.logging.Log;
|
|||
import org.apache.commons.logging.LogFactory;
|
||||
import org.mockito.Mockito;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||
import org.apache.hadoop.hdfs.DFSUtilClient;
|
||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
|
@ -64,6 +67,8 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetRep
|
|||
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReplicaVisibleLengthResponseProto;
|
||||
import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolPB;
|
||||
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
import org.apache.hadoop.io.DataInputBuffer;
|
||||
import org.apache.hadoop.io.DataOutputBuffer;
|
||||
import org.apache.hadoop.io.TestWritable;
|
||||
|
@ -92,6 +97,7 @@ import org.mockito.stubbing.Answer;
|
|||
import com.google.protobuf.BlockingService;
|
||||
import com.google.protobuf.RpcController;
|
||||
import com.google.protobuf.ServiceException;
|
||||
|
||||
import org.apache.hadoop.fs.StorageType;
|
||||
|
||||
/** Unit tests for block tokens */
|
||||
|
@ -886,4 +892,47 @@ public class TestBlockToken {
|
|||
new DataInputStream(new ByteArrayInputStream(masterId.getBytes())));
|
||||
assertArrayEquals(password, sm.retrievePassword(slaveId));
|
||||
}
|
||||
|
||||
/** Test for last in-progress block token expiry.
|
||||
* 1. Write file with one block which is in-progress.
|
||||
* 2. Open input stream and close the output stream.
|
||||
* 3. Wait for block token expiration and read the data.
|
||||
* 4. Read should be success.
|
||||
*/
|
||||
@Test
|
||||
public void testLastLocatedBlockTokenExpiry()
|
||||
throws IOException, InterruptedException {
|
||||
Configuration conf = new Configuration();
|
||||
conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
|
||||
try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
|
||||
.numDataNodes(1).build()) {
|
||||
cluster.waitClusterUp();
|
||||
final NameNode nn = cluster.getNameNode();
|
||||
final BlockManager bm = nn.getNamesystem().getBlockManager();
|
||||
final BlockTokenSecretManager sm = bm.getBlockTokenSecretManager();
|
||||
|
||||
// set a short token lifetime (1 second)
|
||||
SecurityTestUtil.setBlockTokenLifetime(sm, 1000L);
|
||||
|
||||
DistributedFileSystem fs = cluster.getFileSystem();
|
||||
Path p = new Path("/tmp/abc.log");
|
||||
FSDataOutputStream out = fs.create(p);
|
||||
byte[] data = "hello\n".getBytes(StandardCharsets.UTF_8);
|
||||
out.write(data);
|
||||
out.hflush();
|
||||
FSDataInputStream in = fs.open(p);
|
||||
out.close();
|
||||
|
||||
// wait for last block token to expire
|
||||
Thread.sleep(2000L);
|
||||
|
||||
byte[] readData = new byte[data.length];
|
||||
long startTime = System.currentTimeMillis();
|
||||
in.read(readData);
|
||||
// DFSInputStream#refetchLocations() minimum wait for 1sec to refetch
|
||||
// complete located blocks.
|
||||
assertTrue("Should not wait for refetch complete located blocks",
|
||||
1000L > (System.currentTimeMillis() - startTime));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue