HDFS-10828. Fix usage of FsDatasetImpl object lock in ReplicaMap. (Arpit Agarwal)
This commit is contained in:
parent
d17f03b625
commit
80628ee2c3
|
@ -54,6 +54,7 @@ import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten;
|
|||
import org.apache.hadoop.hdfs.server.datanode.ReplicaWaitingToBeRecovered;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.io.nativeio.NativeIO;
|
||||
import org.apache.hadoop.util.AutoCloseableLock;
|
||||
import org.apache.hadoop.util.DataChecksum;
|
||||
import org.apache.hadoop.util.DiskChecker;
|
||||
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
|
||||
|
@ -751,7 +752,7 @@ class BlockPoolSlice {
|
|||
|
||||
private boolean readReplicasFromCache(ReplicaMap volumeMap,
|
||||
final RamDiskReplicaTracker lazyWriteReplicaMap) {
|
||||
ReplicaMap tmpReplicaMap = new ReplicaMap(this);
|
||||
ReplicaMap tmpReplicaMap = new ReplicaMap(new AutoCloseableLock());
|
||||
File replicaFile = new File(currentDir, REPLICA_CACHE_FILE);
|
||||
// Check whether the file exists or not.
|
||||
if (!replicaFile.exists()) {
|
||||
|
|
|
@ -273,7 +273,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
private boolean blockPinningEnabled;
|
||||
private final int maxDataLength;
|
||||
|
||||
private final AutoCloseableLock datasetLock;
|
||||
@VisibleForTesting
|
||||
final AutoCloseableLock datasetLock;
|
||||
private final Condition datasetLockCondition;
|
||||
|
||||
/**
|
||||
|
@ -315,7 +316,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
}
|
||||
|
||||
storageMap = new ConcurrentHashMap<String, DatanodeStorage>();
|
||||
volumeMap = new ReplicaMap(this);
|
||||
volumeMap = new ReplicaMap(datasetLock);
|
||||
ramDiskReplicaTracker = RamDiskReplicaTracker.getInstance(conf, this);
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
|
@ -436,7 +437,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
FsVolumeImpl fsVolume = new FsVolumeImpl(
|
||||
this, sd.getStorageUuid(), dir, this.conf, storageType);
|
||||
FsVolumeReference ref = fsVolume.obtainReference();
|
||||
ReplicaMap tempVolumeMap = new ReplicaMap(this);
|
||||
ReplicaMap tempVolumeMap = new ReplicaMap(datasetLock);
|
||||
fsVolume.getVolumeMap(tempVolumeMap, ramDiskReplicaTracker);
|
||||
|
||||
activateVolume(tempVolumeMap, sd, storageType, ref);
|
||||
|
@ -470,7 +471,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
StorageType storageType = location.getStorageType();
|
||||
final FsVolumeImpl fsVolume =
|
||||
createFsVolume(sd.getStorageUuid(), sd.getCurrentDir(), storageType);
|
||||
final ReplicaMap tempVolumeMap = new ReplicaMap(fsVolume);
|
||||
final ReplicaMap tempVolumeMap = new ReplicaMap(new AutoCloseableLock());
|
||||
ArrayList<IOException> exceptions = Lists.newArrayList();
|
||||
|
||||
for (final NamespaceInfo nsInfo : nsInfos) {
|
||||
|
@ -2476,7 +2477,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
Block block, long recoveryId, long xceiverStopTimeout) throws IOException {
|
||||
while (true) {
|
||||
try {
|
||||
synchronized (map.getMutex()) {
|
||||
try (AutoCloseableLock lock = map.getLock().acquire()) {
|
||||
return initReplicaRecoveryImpl(bpid, map, block, recoveryId);
|
||||
}
|
||||
} catch (MustStopExistingWriter e) {
|
||||
|
|
|
@ -25,28 +25,29 @@ import org.apache.hadoop.HadoopIllegalArgumentException;
|
|||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
|
||||
import org.apache.hadoop.util.LightWeightResizableGSet;
|
||||
import org.apache.hadoop.util.AutoCloseableLock;
|
||||
|
||||
/**
|
||||
* Maintains the replica map.
|
||||
*/
|
||||
class ReplicaMap {
|
||||
// Object using which this class is synchronized
|
||||
private final Object mutex;
|
||||
// Lock object to synchronize this instance.
|
||||
private final AutoCloseableLock lock;
|
||||
|
||||
// Map of block pool Id to another map of block Id to ReplicaInfo.
|
||||
private final Map<String, LightWeightResizableGSet<Block, ReplicaInfo>> map =
|
||||
new HashMap<String, LightWeightResizableGSet<Block, ReplicaInfo>>();
|
||||
|
||||
ReplicaMap(Object mutex) {
|
||||
if (mutex == null) {
|
||||
ReplicaMap(AutoCloseableLock lock) {
|
||||
if (lock == null) {
|
||||
throw new HadoopIllegalArgumentException(
|
||||
"Object to synchronize on cannot be null");
|
||||
"Lock to synchronize on cannot be null");
|
||||
}
|
||||
this.mutex = mutex;
|
||||
this.lock = lock;
|
||||
}
|
||||
|
||||
String[] getBlockPoolList() {
|
||||
synchronized(mutex) {
|
||||
try (AutoCloseableLock l = lock.acquire()) {
|
||||
return map.keySet().toArray(new String[map.keySet().size()]);
|
||||
}
|
||||
}
|
||||
|
@ -91,7 +92,7 @@ class ReplicaMap {
|
|||
*/
|
||||
ReplicaInfo get(String bpid, long blockId) {
|
||||
checkBlockPool(bpid);
|
||||
synchronized(mutex) {
|
||||
try (AutoCloseableLock l = lock.acquire()) {
|
||||
LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
|
||||
return m != null ? m.get(new Block(blockId)) : null;
|
||||
}
|
||||
|
@ -108,7 +109,7 @@ class ReplicaMap {
|
|||
ReplicaInfo add(String bpid, ReplicaInfo replicaInfo) {
|
||||
checkBlockPool(bpid);
|
||||
checkBlock(replicaInfo);
|
||||
synchronized(mutex) {
|
||||
try (AutoCloseableLock l = lock.acquire()) {
|
||||
LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
|
||||
if (m == null) {
|
||||
// Add an entry for block pool if it does not exist already
|
||||
|
@ -137,7 +138,7 @@ class ReplicaMap {
|
|||
ReplicaInfo remove(String bpid, Block block) {
|
||||
checkBlockPool(bpid);
|
||||
checkBlock(block);
|
||||
synchronized(mutex) {
|
||||
try (AutoCloseableLock l = lock.acquire()) {
|
||||
LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
|
||||
if (m != null) {
|
||||
ReplicaInfo replicaInfo = m.get(block);
|
||||
|
@ -159,7 +160,7 @@ class ReplicaMap {
|
|||
*/
|
||||
ReplicaInfo remove(String bpid, long blockId) {
|
||||
checkBlockPool(bpid);
|
||||
synchronized(mutex) {
|
||||
try (AutoCloseableLock l = lock.acquire()) {
|
||||
LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
|
||||
if (m != null) {
|
||||
return m.remove(new Block(blockId));
|
||||
|
@ -175,7 +176,7 @@ class ReplicaMap {
|
|||
*/
|
||||
int size(String bpid) {
|
||||
LightWeightResizableGSet<Block, ReplicaInfo> m = null;
|
||||
synchronized(mutex) {
|
||||
try (AutoCloseableLock l = lock.acquire()) {
|
||||
m = map.get(bpid);
|
||||
return m != null ? m.size() : 0;
|
||||
}
|
||||
|
@ -184,9 +185,9 @@ class ReplicaMap {
|
|||
/**
|
||||
* Get a collection of the replicas for given block pool
|
||||
* This method is <b>not synchronized</b>. It needs to be synchronized
|
||||
* externally using the mutex, both for getting the replicas
|
||||
* externally using the lock, both for getting the replicas
|
||||
* values from the map and iterating over it. Mutex can be accessed using
|
||||
* {@link #getMutext()} method.
|
||||
* {@link #getLock()} method.
|
||||
*
|
||||
* @param bpid block pool id
|
||||
* @return a collection of the replicas belonging to the block pool
|
||||
|
@ -199,7 +200,7 @@ class ReplicaMap {
|
|||
|
||||
void initBlockPool(String bpid) {
|
||||
checkBlockPool(bpid);
|
||||
synchronized(mutex) {
|
||||
try (AutoCloseableLock l = lock.acquire()) {
|
||||
LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
|
||||
if (m == null) {
|
||||
// Add an entry for block pool if it does not exist already
|
||||
|
@ -211,16 +212,16 @@ class ReplicaMap {
|
|||
|
||||
void cleanUpBlockPool(String bpid) {
|
||||
checkBlockPool(bpid);
|
||||
synchronized(mutex) {
|
||||
try (AutoCloseableLock l = lock.acquire()) {
|
||||
map.remove(bpid);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Give access to mutex used for synchronizing ReplicasMap
|
||||
* @return object used as lock
|
||||
* Get the lock object used for synchronizing ReplicasMap
|
||||
* @return lock object
|
||||
*/
|
||||
Object getMutex() {
|
||||
return mutex;
|
||||
AutoCloseableLock getLock() {
|
||||
return lock;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -408,7 +408,7 @@ public class FsDatasetImplTestUtils implements FsDatasetTestUtils {
|
|||
@Override
|
||||
public Iterator<Replica> getStoredReplicas(String bpid) throws IOException {
|
||||
// Reload replicas from the disk.
|
||||
ReplicaMap replicaMap = new ReplicaMap(dataset);
|
||||
ReplicaMap replicaMap = new ReplicaMap(dataset.datasetLock);
|
||||
try (FsVolumeReferences refs = dataset.getFsVolumeReferences()) {
|
||||
for (FsVolumeSpi vol : refs) {
|
||||
FsVolumeImpl volume = (FsVolumeImpl) vol;
|
||||
|
|
|
@ -58,6 +58,7 @@ import org.apache.hadoop.io.Writable;
|
|||
import org.apache.hadoop.ipc.RPC;
|
||||
import org.apache.hadoop.ipc.Server;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.util.AutoCloseableLock;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
|
@ -234,7 +235,7 @@ public class TestInterDatanodeProtocol {
|
|||
final long firstblockid = 10000L;
|
||||
final long gs = 7777L;
|
||||
final long length = 22L;
|
||||
final ReplicaMap map = new ReplicaMap(this);
|
||||
final ReplicaMap map = new ReplicaMap(new AutoCloseableLock());
|
||||
String bpid = "BP-TEST";
|
||||
final Block[] blocks = new Block[5];
|
||||
for(int i = 0; i < blocks.length; i++) {
|
||||
|
|
|
@ -23,6 +23,7 @@ import static org.junit.Assert.fail;
|
|||
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
|
||||
import org.apache.hadoop.util.AutoCloseableLock;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
|
@ -30,7 +31,7 @@ import org.junit.Test;
|
|||
* Unit test for ReplicasMap class
|
||||
*/
|
||||
public class TestReplicaMap {
|
||||
private final ReplicaMap map = new ReplicaMap(TestReplicaMap.class);
|
||||
private final ReplicaMap map = new ReplicaMap(new AutoCloseableLock());
|
||||
private final String bpid = "BP-TEST";
|
||||
private final Block block = new Block(1234, 1234, 1234);
|
||||
|
||||
|
|
|
@ -42,6 +42,7 @@ import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
|
|||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
import org.apache.hadoop.util.AutoCloseableLock;
|
||||
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
@ -534,7 +535,7 @@ public class TestWriteToReplica {
|
|||
bpList.size() == 2);
|
||||
|
||||
createReplicas(bpList, volumes, cluster.getFsDatasetTestUtils(dn));
|
||||
ReplicaMap oldReplicaMap = new ReplicaMap(this);
|
||||
ReplicaMap oldReplicaMap = new ReplicaMap(new AutoCloseableLock());
|
||||
oldReplicaMap.addAll(dataSet.volumeMap);
|
||||
|
||||
cluster.restartDataNode(0);
|
||||
|
|
Loading…
Reference in New Issue