HDFS-10828. Fix usage of FsDatasetImpl object lock in ReplicaMap. (Arpit Agarwal)

This commit is contained in:
Arpit Agarwal 2016-09-27 09:10:21 -07:00
parent 756dbc505e
commit 3f5482bb69
6 changed files with 35 additions and 29 deletions

View File

@ -54,6 +54,7 @@ import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten;
import org.apache.hadoop.hdfs.server.datanode.ReplicaWaitingToBeRecovered; import org.apache.hadoop.hdfs.server.datanode.ReplicaWaitingToBeRecovered;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.DiskChecker; import org.apache.hadoop.util.DiskChecker;
import org.apache.hadoop.util.DiskChecker.DiskErrorException; import org.apache.hadoop.util.DiskChecker.DiskErrorException;
@ -740,7 +741,7 @@ class BlockPoolSlice {
private boolean readReplicasFromCache(ReplicaMap volumeMap, private boolean readReplicasFromCache(ReplicaMap volumeMap,
final RamDiskReplicaTracker lazyWriteReplicaMap) { final RamDiskReplicaTracker lazyWriteReplicaMap) {
ReplicaMap tmpReplicaMap = new ReplicaMap(this); ReplicaMap tmpReplicaMap = new ReplicaMap(new AutoCloseableLock());
File replicaFile = new File(currentDir, REPLICA_CACHE_FILE); File replicaFile = new File(currentDir, REPLICA_CACHE_FILE);
// Check whether the file exists or not. // Check whether the file exists or not.
if (!replicaFile.exists()) { if (!replicaFile.exists()) {

View File

@ -271,7 +271,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
private boolean blockPinningEnabled; private boolean blockPinningEnabled;
private final int maxDataLength; private final int maxDataLength;
private final AutoCloseableLock datasetLock; @VisibleForTesting
final AutoCloseableLock datasetLock;
private final Condition datasetLockCondition; private final Condition datasetLockCondition;
/** /**
@ -313,7 +314,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
} }
storageMap = new ConcurrentHashMap<String, DatanodeStorage>(); storageMap = new ConcurrentHashMap<String, DatanodeStorage>();
volumeMap = new ReplicaMap(this); volumeMap = new ReplicaMap(datasetLock);
ramDiskReplicaTracker = RamDiskReplicaTracker.getInstance(conf, this); ramDiskReplicaTracker = RamDiskReplicaTracker.getInstance(conf, this);
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@ -434,7 +435,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
FsVolumeImpl fsVolume = new FsVolumeImpl( FsVolumeImpl fsVolume = new FsVolumeImpl(
this, sd.getStorageUuid(), dir, this.conf, storageType); this, sd.getStorageUuid(), dir, this.conf, storageType);
FsVolumeReference ref = fsVolume.obtainReference(); FsVolumeReference ref = fsVolume.obtainReference();
ReplicaMap tempVolumeMap = new ReplicaMap(this); ReplicaMap tempVolumeMap = new ReplicaMap(datasetLock);
fsVolume.getVolumeMap(tempVolumeMap, ramDiskReplicaTracker); fsVolume.getVolumeMap(tempVolumeMap, ramDiskReplicaTracker);
activateVolume(tempVolumeMap, sd, storageType, ref); activateVolume(tempVolumeMap, sd, storageType, ref);
@ -468,7 +469,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
StorageType storageType = location.getStorageType(); StorageType storageType = location.getStorageType();
final FsVolumeImpl fsVolume = final FsVolumeImpl fsVolume =
createFsVolume(sd.getStorageUuid(), sd.getCurrentDir(), storageType); createFsVolume(sd.getStorageUuid(), sd.getCurrentDir(), storageType);
final ReplicaMap tempVolumeMap = new ReplicaMap(fsVolume); final ReplicaMap tempVolumeMap = new ReplicaMap(new AutoCloseableLock());
ArrayList<IOException> exceptions = Lists.newArrayList(); ArrayList<IOException> exceptions = Lists.newArrayList();
for (final NamespaceInfo nsInfo : nsInfos) { for (final NamespaceInfo nsInfo : nsInfos) {
@ -2474,7 +2475,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
Block block, long recoveryId, long xceiverStopTimeout) throws IOException { Block block, long recoveryId, long xceiverStopTimeout) throws IOException {
while (true) { while (true) {
try { try {
synchronized (map.getMutex()) { try (AutoCloseableLock lock = map.getLock().acquire()) {
return initReplicaRecoveryImpl(bpid, map, block, recoveryId); return initReplicaRecoveryImpl(bpid, map, block, recoveryId);
} }
} catch (MustStopExistingWriter e) { } catch (MustStopExistingWriter e) {

View File

@ -25,28 +25,29 @@ import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo; import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
import org.apache.hadoop.util.LightWeightResizableGSet; import org.apache.hadoop.util.LightWeightResizableGSet;
import org.apache.hadoop.util.AutoCloseableLock;
/** /**
* Maintains the replica map. * Maintains the replica map.
*/ */
class ReplicaMap { class ReplicaMap {
// Object using which this class is synchronized // Lock object to synchronize this instance.
private final Object mutex; private final AutoCloseableLock lock;
// Map of block pool Id to another map of block Id to ReplicaInfo. // Map of block pool Id to another map of block Id to ReplicaInfo.
private final Map<String, LightWeightResizableGSet<Block, ReplicaInfo>> map = private final Map<String, LightWeightResizableGSet<Block, ReplicaInfo>> map =
new HashMap<String, LightWeightResizableGSet<Block, ReplicaInfo>>(); new HashMap<String, LightWeightResizableGSet<Block, ReplicaInfo>>();
ReplicaMap(Object mutex) { ReplicaMap(AutoCloseableLock lock) {
if (mutex == null) { if (lock == null) {
throw new HadoopIllegalArgumentException( throw new HadoopIllegalArgumentException(
"Object to synchronize on cannot be null"); "Lock to synchronize on cannot be null");
} }
this.mutex = mutex; this.lock = lock;
} }
String[] getBlockPoolList() { String[] getBlockPoolList() {
synchronized(mutex) { try (AutoCloseableLock l = lock.acquire()) {
return map.keySet().toArray(new String[map.keySet().size()]); return map.keySet().toArray(new String[map.keySet().size()]);
} }
} }
@ -91,7 +92,7 @@ class ReplicaMap {
*/ */
ReplicaInfo get(String bpid, long blockId) { ReplicaInfo get(String bpid, long blockId) {
checkBlockPool(bpid); checkBlockPool(bpid);
synchronized(mutex) { try (AutoCloseableLock l = lock.acquire()) {
LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid); LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
return m != null ? m.get(new Block(blockId)) : null; return m != null ? m.get(new Block(blockId)) : null;
} }
@ -108,7 +109,7 @@ class ReplicaMap {
ReplicaInfo add(String bpid, ReplicaInfo replicaInfo) { ReplicaInfo add(String bpid, ReplicaInfo replicaInfo) {
checkBlockPool(bpid); checkBlockPool(bpid);
checkBlock(replicaInfo); checkBlock(replicaInfo);
synchronized(mutex) { try (AutoCloseableLock l = lock.acquire()) {
LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid); LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
if (m == null) { if (m == null) {
// Add an entry for block pool if it does not exist already // Add an entry for block pool if it does not exist already
@ -137,7 +138,7 @@ class ReplicaMap {
ReplicaInfo remove(String bpid, Block block) { ReplicaInfo remove(String bpid, Block block) {
checkBlockPool(bpid); checkBlockPool(bpid);
checkBlock(block); checkBlock(block);
synchronized(mutex) { try (AutoCloseableLock l = lock.acquire()) {
LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid); LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
if (m != null) { if (m != null) {
ReplicaInfo replicaInfo = m.get(block); ReplicaInfo replicaInfo = m.get(block);
@ -159,7 +160,7 @@ class ReplicaMap {
*/ */
ReplicaInfo remove(String bpid, long blockId) { ReplicaInfo remove(String bpid, long blockId) {
checkBlockPool(bpid); checkBlockPool(bpid);
synchronized(mutex) { try (AutoCloseableLock l = lock.acquire()) {
LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid); LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
if (m != null) { if (m != null) {
return m.remove(new Block(blockId)); return m.remove(new Block(blockId));
@ -175,7 +176,7 @@ class ReplicaMap {
*/ */
int size(String bpid) { int size(String bpid) {
LightWeightResizableGSet<Block, ReplicaInfo> m = null; LightWeightResizableGSet<Block, ReplicaInfo> m = null;
synchronized(mutex) { try (AutoCloseableLock l = lock.acquire()) {
m = map.get(bpid); m = map.get(bpid);
return m != null ? m.size() : 0; return m != null ? m.size() : 0;
} }
@ -184,9 +185,9 @@ class ReplicaMap {
/** /**
* Get a collection of the replicas for given block pool * Get a collection of the replicas for given block pool
* This method is <b>not synchronized</b>. It needs to be synchronized * This method is <b>not synchronized</b>. It needs to be synchronized
* externally using the mutex, both for getting the replicas * externally using the lock, both for getting the replicas
* values from the map and iterating over it. Mutex can be accessed using * values from the map and iterating over it. Mutex can be accessed using
* {@link #getMutext()} method. * {@link #getLock()} method.
* *
* @param bpid block pool id * @param bpid block pool id
* @return a collection of the replicas belonging to the block pool * @return a collection of the replicas belonging to the block pool
@ -199,7 +200,7 @@ class ReplicaMap {
void initBlockPool(String bpid) { void initBlockPool(String bpid) {
checkBlockPool(bpid); checkBlockPool(bpid);
synchronized(mutex) { try (AutoCloseableLock l = lock.acquire()) {
LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid); LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
if (m == null) { if (m == null) {
// Add an entry for block pool if it does not exist already // Add an entry for block pool if it does not exist already
@ -211,16 +212,16 @@ class ReplicaMap {
void cleanUpBlockPool(String bpid) { void cleanUpBlockPool(String bpid) {
checkBlockPool(bpid); checkBlockPool(bpid);
synchronized(mutex) { try (AutoCloseableLock l = lock.acquire()) {
map.remove(bpid); map.remove(bpid);
} }
} }
/** /**
* Give access to mutex used for synchronizing ReplicasMap * Get the lock object used for synchronizing ReplicasMap
* @return object used as lock * @return lock object
*/ */
Object getMutex() { AutoCloseableLock getLock() {
return mutex; return lock;
} }
} }

View File

@ -58,6 +58,7 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server; import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.AutoCloseableLock;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -234,7 +235,7 @@ public class TestInterDatanodeProtocol {
final long firstblockid = 10000L; final long firstblockid = 10000L;
final long gs = 7777L; final long gs = 7777L;
final long length = 22L; final long length = 22L;
final ReplicaMap map = new ReplicaMap(this); final ReplicaMap map = new ReplicaMap(new AutoCloseableLock());
String bpid = "BP-TEST"; String bpid = "BP-TEST";
final Block[] blocks = new Block[5]; final Block[] blocks = new Block[5];
for(int i = 0; i < blocks.length; i++) { for(int i = 0; i < blocks.length; i++) {

View File

@ -23,6 +23,7 @@ import static org.junit.Assert.fail;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica; import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
import org.apache.hadoop.util.AutoCloseableLock;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -30,7 +31,7 @@ import org.junit.Test;
* Unit test for ReplicasMap class * Unit test for ReplicasMap class
*/ */
public class TestReplicaMap { public class TestReplicaMap {
private final ReplicaMap map = new ReplicaMap(TestReplicaMap.class); private final ReplicaMap map = new ReplicaMap(new AutoCloseableLock());
private final String bpid = "BP-TEST"; private final String bpid = "BP-TEST";
private final Block block = new Block(1234, 1234, 1234); private final Block block = new Block(1234, 1234, 1234);

View File

@ -42,6 +42,7 @@ import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException; import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -534,7 +535,7 @@ public class TestWriteToReplica {
bpList.size() == 2); bpList.size() == 2);
createReplicas(bpList, volumes, cluster.getFsDatasetTestUtils(dn)); createReplicas(bpList, volumes, cluster.getFsDatasetTestUtils(dn));
ReplicaMap oldReplicaMap = new ReplicaMap(this); ReplicaMap oldReplicaMap = new ReplicaMap(new AutoCloseableLock());
oldReplicaMap.addAll(dataSet.volumeMap); oldReplicaMap.addAll(dataSet.volumeMap);
cluster.restartDataNode(0); cluster.restartDataNode(0);