HDFS-10828. Fix usage of FsDatasetImpl object lock in ReplicaMap. (Arpit Agarwal)

This commit is contained in:
Arpit Agarwal 2016-09-27 09:38:29 -07:00
parent f55eb981dd
commit 8ae4729107
7 changed files with 36 additions and 30 deletions

View File

@ -54,6 +54,7 @@ import org.apache.hadoop.hdfs.server.datanode.ReplicaBuilder;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaTracker.RamDiskReplica;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.DiskChecker;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
@ -769,7 +770,7 @@ class BlockPoolSlice {
private boolean readReplicasFromCache(ReplicaMap volumeMap,
final RamDiskReplicaTracker lazyWriteReplicaMap) {
ReplicaMap tmpReplicaMap = new ReplicaMap(this);
ReplicaMap tmpReplicaMap = new ReplicaMap(new AutoCloseableLock());
File replicaFile = new File(currentDir, REPLICA_CACHE_FILE);
// Check whether the file exists or not.
if (!replicaFile.exists()) {

View File

@ -251,7 +251,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
private boolean blockPinningEnabled;
private final int maxDataLength;
private final AutoCloseableLock datasetLock;
@VisibleForTesting
final AutoCloseableLock datasetLock;
private final Condition datasetLockCondition;
/**
@ -293,7 +294,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
}
storageMap = new ConcurrentHashMap<String, DatanodeStorage>();
volumeMap = new ReplicaMap(this);
volumeMap = new ReplicaMap(datasetLock);
ramDiskReplicaTracker = RamDiskReplicaTracker.getInstance(conf, this);
@SuppressWarnings("unchecked")
@ -419,7 +420,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
FsVolumeImpl fsVolume = new FsVolumeImpl(
this, sd.getStorageUuid(), dir, this.conf, storageType);
FsVolumeReference ref = fsVolume.obtainReference();
ReplicaMap tempVolumeMap = new ReplicaMap(this);
ReplicaMap tempVolumeMap = new ReplicaMap(datasetLock);
fsVolume.getVolumeMap(tempVolumeMap, ramDiskReplicaTracker);
activateVolume(tempVolumeMap, sd, storageType, ref);
@ -453,7 +454,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
StorageType storageType = location.getStorageType();
final FsVolumeImpl fsVolume =
createFsVolume(sd.getStorageUuid(), sd.getCurrentDir(), storageType);
final ReplicaMap tempVolumeMap = new ReplicaMap(fsVolume);
final ReplicaMap tempVolumeMap = new ReplicaMap(new AutoCloseableLock());
ArrayList<IOException> exceptions = Lists.newArrayList();
for (final NamespaceInfo nsInfo : nsInfos) {
@ -2362,7 +2363,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
Block block, long recoveryId, long xceiverStopTimeout) throws IOException {
while (true) {
try {
synchronized (map.getMutex()) {
try (AutoCloseableLock lock = map.getLock().acquire()) {
return initReplicaRecoveryImpl(bpid, map, block, recoveryId);
}
} catch (MustStopExistingWriter e) {

View File

@ -26,13 +26,14 @@ import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
import org.apache.hadoop.hdfs.util.FoldedTreeSet;
import org.apache.hadoop.util.AutoCloseableLock;
/**
* Maintains the replica map.
*/
class ReplicaMap {
// Object using which this class is synchronized
private final Object mutex;
// Lock object to synchronize this instance.
private final AutoCloseableLock lock;
// Map of block pool Id to a set of ReplicaInfo.
private final Map<String, FoldedTreeSet<ReplicaInfo>> map = new HashMap<>();
@ -49,16 +50,16 @@ class ReplicaMap {
}
};
ReplicaMap(Object mutex) {
if (mutex == null) {
ReplicaMap(AutoCloseableLock lock) {
if (lock == null) {
throw new HadoopIllegalArgumentException(
"Object to synchronize on cannot be null");
"Lock to synchronize on cannot be null");
}
this.mutex = mutex;
this.lock = lock;
}
String[] getBlockPoolList() {
synchronized(mutex) {
try (AutoCloseableLock l = lock.acquire()) {
return map.keySet().toArray(new String[map.keySet().size()]);
}
}
@ -103,7 +104,7 @@ class ReplicaMap {
*/
ReplicaInfo get(String bpid, long blockId) {
checkBlockPool(bpid);
synchronized(mutex) {
try (AutoCloseableLock l = lock.acquire()) {
FoldedTreeSet<ReplicaInfo> set = map.get(bpid);
if (set == null) {
return null;
@ -123,7 +124,7 @@ class ReplicaMap {
ReplicaInfo add(String bpid, ReplicaInfo replicaInfo) {
checkBlockPool(bpid);
checkBlock(replicaInfo);
synchronized(mutex) {
try (AutoCloseableLock l = lock.acquire()) {
FoldedTreeSet<ReplicaInfo> set = map.get(bpid);
if (set == null) {
// Add an entry for block pool if it does not exist already
@ -152,7 +153,7 @@ class ReplicaMap {
ReplicaInfo remove(String bpid, Block block) {
checkBlockPool(bpid);
checkBlock(block);
synchronized(mutex) {
try (AutoCloseableLock l = lock.acquire()) {
FoldedTreeSet<ReplicaInfo> set = map.get(bpid);
if (set != null) {
ReplicaInfo replicaInfo =
@ -175,7 +176,7 @@ class ReplicaMap {
*/
ReplicaInfo remove(String bpid, long blockId) {
checkBlockPool(bpid);
synchronized(mutex) {
try (AutoCloseableLock l = lock.acquire()) {
FoldedTreeSet<ReplicaInfo> set = map.get(bpid);
if (set != null) {
return set.removeAndGet(blockId, LONG_AND_BLOCK_COMPARATOR);
@ -190,7 +191,7 @@ class ReplicaMap {
* @return the number of replicas in the map
*/
int size(String bpid) {
synchronized(mutex) {
try (AutoCloseableLock l = lock.acquire()) {
FoldedTreeSet<ReplicaInfo> set = map.get(bpid);
return set != null ? set.size() : 0;
}
@ -199,9 +200,9 @@ class ReplicaMap {
/**
* Get a collection of the replicas for given block pool
* This method is <b>not synchronized</b>. It needs to be synchronized
* externally using the mutex, both for getting the replicas
* externally using the lock, both for getting the replicas
* values from the map and iterating over it. Mutex can be accessed using
* {@link #getMutext()} method.
* {@link #getLock()} method.
*
* @param bpid block pool id
* @return a collection of the replicas belonging to the block pool
@ -212,7 +213,7 @@ class ReplicaMap {
void initBlockPool(String bpid) {
checkBlockPool(bpid);
synchronized(mutex) {
try (AutoCloseableLock l = lock.acquire()) {
FoldedTreeSet<ReplicaInfo> set = map.get(bpid);
if (set == null) {
// Add an entry for block pool if it does not exist already
@ -224,16 +225,16 @@ class ReplicaMap {
void cleanUpBlockPool(String bpid) {
checkBlockPool(bpid);
synchronized(mutex) {
try (AutoCloseableLock l = lock.acquire()) {
map.remove(bpid);
}
}
/**
* Give access to mutex used for synchronizing ReplicasMap
* @return object used as lock
* Get the lock object used for synchronizing ReplicasMap
* @return lock object
*/
Object getMutex() {
return mutex;
AutoCloseableLock getLock() {
return lock;
}
}

View File

@ -407,7 +407,7 @@ public class FsDatasetImplTestUtils implements FsDatasetTestUtils {
@Override
public Iterator<Replica> getStoredReplicas(String bpid) throws IOException {
// Reload replicas from the disk.
ReplicaMap replicaMap = new ReplicaMap(dataset);
ReplicaMap replicaMap = new ReplicaMap(dataset.datasetLock);
try (FsVolumeReferences refs = dataset.getFsVolumeReferences()) {
for (FsVolumeSpi vol : refs) {
FsVolumeImpl volume = (FsVolumeImpl) vol;

View File

@ -58,6 +58,7 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.AutoCloseableLock;
import org.junit.Assert;
import org.junit.Test;
@ -234,7 +235,7 @@ public class TestInterDatanodeProtocol {
final long firstblockid = 10000L;
final long gs = 7777L;
final long length = 22L;
final ReplicaMap map = new ReplicaMap(this);
final ReplicaMap map = new ReplicaMap(new AutoCloseableLock());
String bpid = "BP-TEST";
final Block[] blocks = new Block[5];
for(int i = 0; i < blocks.length; i++) {

View File

@ -23,6 +23,7 @@ import static org.junit.Assert.fail;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
import org.apache.hadoop.util.AutoCloseableLock;
import org.junit.Before;
import org.junit.Test;
@ -30,7 +31,7 @@ import org.junit.Test;
* Unit test for ReplicasMap class
*/
public class TestReplicaMap {
private final ReplicaMap map = new ReplicaMap(TestReplicaMap.class);
private final ReplicaMap map = new ReplicaMap(new AutoCloseableLock());
private final String bpid = "BP-TEST";
private final Block block = new Block(1234, 1234, 1234);

View File

@ -42,6 +42,7 @@ import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
import org.junit.Assert;
import org.junit.Test;
@ -534,7 +535,7 @@ public class TestWriteToReplica {
bpList.size() == 2);
createReplicas(bpList, volumes, cluster.getFsDatasetTestUtils(dn));
ReplicaMap oldReplicaMap = new ReplicaMap(this);
ReplicaMap oldReplicaMap = new ReplicaMap(new AutoCloseableLock());
oldReplicaMap.addAll(dataSet.volumeMap);
cluster.restartDataNode(0);