HDFS-16598. Fix DataNode FsDatasetImpl lock issue without GS checks. (#4366). Contributed by ZanderXu.

Reviewed-by: Mingxiang Li <liaiphag0@gmail.com>
Signed-off-by: He Xiaoqiao <hexiaoqiao@apache.org>
This commit is contained in:
xuzq 2022-06-14 21:48:03 +08:00 committed by GitHub
parent bebf03a66e
commit d0715b1024
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 19 additions and 11 deletions

View File

@ -906,6 +906,12 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
return info;
}
String getStorageUuidForLock(ExtendedBlock b)
throws ReplicaNotFoundException {
return getReplicaInfo(b.getBlockPoolId(), b.getBlockId())
.getStorageUuid();
}
/**
* Returns handles to the block file and its metadata file
*/
@ -913,7 +919,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
public ReplicaInputStreams getTmpInputStreams(ExtendedBlock b,
long blkOffset, long metaOffset) throws IOException {
try (AutoCloseDataSetLock l = lockManager.readLock(LockLevel.VOLUME,
b.getBlockPoolId(), getReplicaInfo(b).getStorageUuid())) {
b.getBlockPoolId(), getStorageUuidForLock(b))) {
ReplicaInfo info = getReplicaInfo(b);
FsVolumeReference ref = info.getVolume().obtainReference();
try {
@ -1379,7 +1385,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
public ReplicaHandler append(ExtendedBlock b,
long newGS, long expectedBlockLen) throws IOException {
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME,
b.getBlockPoolId(), getReplicaInfo(b).getStorageUuid())) {
b.getBlockPoolId(), getStorageUuidForLock(b))) {
// If the block was successfully finalized because all packets
// were successfully processed at the Datanode but the ack for
// some of the packets were not received by the client. The client
@ -1562,7 +1568,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
while (true) {
try {
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME,
b.getBlockPoolId(), getReplicaInfo(b).getStorageUuid())) {
b.getBlockPoolId(), getStorageUuidForLock(b))) {
// check replica's state
ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen);
// bump the replica's GS
@ -1665,7 +1671,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
while (true) {
try {
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME,
b.getBlockPoolId(), getReplicaInfo(b).getStorageUuid())) {
b.getBlockPoolId(), getStorageUuidForLock(b))) {
ReplicaInfo replicaInfo =
getReplicaInfo(b.getBlockPoolId(), b.getBlockId());
// check the replica's state
@ -1697,7 +1703,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
ExtendedBlock b, long newGS, long minBytesRcvd, long maxBytesRcvd)
throws IOException {
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME,
b.getBlockPoolId(), getReplicaInfo(b).getStorageUuid())) {
b.getBlockPoolId(), getStorageUuidForLock(b))) {
// check generation stamp
long replicaGenerationStamp = rbw.getGenerationStamp();
if (replicaGenerationStamp < b.getGenerationStamp() ||
@ -1759,7 +1765,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
final ExtendedBlock b) throws IOException {
long startTimeMs = Time.monotonicNow();
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME,
b.getBlockPoolId(), getReplicaInfo(b).getStorageUuid())) {
b.getBlockPoolId(), getStorageUuidForLock(b))) {
final long blockId = b.getBlockId();
final long expectedGs = b.getGenerationStamp();
final long visible = b.getNumBytes();
@ -1957,7 +1963,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
ReplicaInfo finalizedReplicaInfo = null;
long startTimeMs = Time.monotonicNow();
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME,
b.getBlockPoolId(), getReplicaInfo(b).getStorageUuid())) {
b.getBlockPoolId(), getStorageUuidForLock(b))) {
if (Thread.interrupted()) {
// Don't allow data modifications from interrupted threads
throw new IOException("Cannot finalize block from Interrupted Thread");
@ -2041,7 +2047,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
public void unfinalizeBlock(ExtendedBlock b) throws IOException {
long startTimeMs = Time.monotonicNow();
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME,
b.getBlockPoolId(), getReplicaInfo(b).getStorageUuid())) {
b.getBlockPoolId(), getStorageUuidForLock(b))) {
ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
b.getLocalBlock());
if (replicaInfo != null &&
@ -2992,7 +2998,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
final long newlength) throws IOException {
long startTimeMs = Time.monotonicNow();
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME,
oldBlock.getBlockPoolId(), getReplicaInfo(oldBlock).getStorageUuid())) {
oldBlock.getBlockPoolId(), getStorageUuidForLock(oldBlock))) {
//get replica
final String bpid = oldBlock.getBlockPoolId();
final ReplicaInfo replica = volumeMap.get(bpid, oldBlock.getBlockId());

View File

@ -245,8 +245,10 @@ public class TestWriteToReplica {
Assert.fail("Should not have appended to a non-existent replica " +
blocks[NON_EXISTENT]);
} catch (ReplicaNotFoundException e) {
Assert.assertEquals(ReplicaNotFoundException.NON_EXISTENT_REPLICA +
blocks[NON_EXISTENT], e.getMessage());
String expectMessage = ReplicaNotFoundException.NON_EXISTENT_REPLICA
+ blocks[NON_EXISTENT].getBlockPoolId() + ":"
+ blocks[NON_EXISTENT].getBlockId();
Assert.assertEquals(expectMessage, e.getMessage());
}
newGS = blocks[FINALIZED].getGenerationStamp()+1;