HDFS-13339. Volume reference can't be released and may lead to deadlock when DataXceiver does a check volume. Contributed by Jim Brennan, Zsolt Venczel.
(cherry picked from commit c675719c3f
)
This commit is contained in:
parent
7a309f1f91
commit
d5f7aa2285
|
@ -47,6 +47,7 @@ import java.util.HashSet;
|
|||
import java.util.Set;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
@ -104,6 +105,8 @@ public class DatasetVolumeChecker {
|
|||
private static final VolumeCheckContext IGNORED_CONTEXT =
|
||||
new VolumeCheckContext();
|
||||
|
||||
private final ExecutorService checkVolumeResultHandlerExecutorService;
|
||||
|
||||
/**
|
||||
* @param conf Configuration object.
|
||||
* @param timer {@link Timer} object used for throttling checks.
|
||||
|
@ -165,6 +168,12 @@ public class DatasetVolumeChecker {
|
|||
.setNameFormat("DataNode DiskChecker thread %d")
|
||||
.setDaemon(true)
|
||||
.build()));
|
||||
|
||||
checkVolumeResultHandlerExecutorService = Executors.newCachedThreadPool(
|
||||
new ThreadFactoryBuilder()
|
||||
.setNameFormat("VolumeCheck ResultHandler thread %d")
|
||||
.setDaemon(true)
|
||||
.build());
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -295,7 +304,9 @@ public class DatasetVolumeChecker {
|
|||
Futures.addCallback(olf.get(),
|
||||
new ResultHandler(volumeReference, new HashSet<FsVolumeSpi>(),
|
||||
new HashSet<FsVolumeSpi>(),
|
||||
new AtomicLong(1), callback));
|
||||
new AtomicLong(1), callback),
|
||||
checkVolumeResultHandlerExecutorService
|
||||
);
|
||||
return true;
|
||||
} else {
|
||||
IOUtils.cleanup(null, volumeReference);
|
||||
|
|
|
@ -19,11 +19,13 @@
|
|||
package org.apache.hadoop.hdfs.server.datanode.checker;
|
||||
|
||||
import com.google.common.base.Optional;
|
||||
import com.google.common.base.Supplier;
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.*;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.VolumeCheckContext;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
|
||||
import org.apache.hadoop.util.FakeTimer;
|
||||
import org.junit.Rule;
|
||||
|
@ -121,6 +123,13 @@ public class TestDatasetVolumeChecker {
|
|||
}
|
||||
});
|
||||
|
||||
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||
@Override
|
||||
public Boolean get() {
|
||||
return (numCallbackInvocations.get() > 0);
|
||||
}
|
||||
}, 5, 10000);
|
||||
|
||||
// Ensure that the check was invoked at least once.
|
||||
verify(volume, times(1)).check(any(VolumeCheckContext.class));
|
||||
if (result) {
|
||||
|
|
Loading…
Reference in New Issue