HDFS-15160. ReplicaMap, Disk Balancer, Directory Scanner and various FsDatasetImpl methods should use datanode readlock. Contributed by Stephen O'Donnell.

Signed-off-by: Wei-Chiu Chuang <weichiu@apache.org>
(cherry picked from commit 2a67e2b1a0)
This commit is contained in:
Stephen O'Donnell 2020-06-30 07:09:26 -07:00 committed by He Xiaoqiao
parent 3c715a272a
commit 84e16adab3
No known key found for this signature in database
GPG Key ID: A80CC124E9A0FA63
9 changed files with 187 additions and 44 deletions

View File

@ -552,6 +552,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_DATANODE_LOCK_FAIR_KEY = public static final String DFS_DATANODE_LOCK_FAIR_KEY =
"dfs.datanode.lock.fair"; "dfs.datanode.lock.fair";
public static final boolean DFS_DATANODE_LOCK_FAIR_DEFAULT = true; public static final boolean DFS_DATANODE_LOCK_FAIR_DEFAULT = true;
public static final String DFS_DATANODE_LOCK_READ_WRITE_ENABLED_KEY =
"dfs.datanode.lock.read.write.enabled";
public static final Boolean DFS_DATANODE_LOCK_READ_WRITE_ENABLED_DEFAULT =
true;
public static final String DFS_DATANODE_LOCK_REPORTING_THRESHOLD_MS_KEY = public static final String DFS_DATANODE_LOCK_REPORTING_THRESHOLD_MS_KEY =
"dfs.datanode.lock-reporting-threshold-ms"; "dfs.datanode.lock-reporting-threshold-ms";
public static final long public static final long

View File

@ -254,7 +254,7 @@ class BlockSender implements java.io.Closeable {
// the append write. // the append write.
ChunkChecksum chunkChecksum = null; ChunkChecksum chunkChecksum = null;
final long replicaVisibleLength; final long replicaVisibleLength;
try(AutoCloseableLock lock = datanode.data.acquireDatasetLock()) { try(AutoCloseableLock lock = datanode.data.acquireDatasetReadLock()) {
replica = getReplica(block, datanode); replica = getReplica(block, datanode);
replicaVisibleLength = replica.getVisibleLength(); replicaVisibleLength = replica.getVisibleLength();
} }

View File

@ -3010,7 +3010,7 @@ public class DataNode extends ReconfigurableBase
final BlockConstructionStage stage; final BlockConstructionStage stage;
//get replica information //get replica information
try(AutoCloseableLock lock = data.acquireDatasetLock()) { try(AutoCloseableLock lock = data.acquireDatasetReadLock()) {
Block storedBlock = data.getStoredBlock(b.getBlockPoolId(), Block storedBlock = data.getStoredBlock(b.getBlockPoolId(),
b.getBlockId()); b.getBlockId());
if (null == storedBlock) { if (null == storedBlock) {

View File

@ -504,7 +504,7 @@ public class DiskBalancer {
Map<String, String> storageIDToVolBasePathMap = new HashMap<>(); Map<String, String> storageIDToVolBasePathMap = new HashMap<>();
FsDatasetSpi.FsVolumeReferences references; FsDatasetSpi.FsVolumeReferences references;
try { try {
try(AutoCloseableLock lock = this.dataset.acquireDatasetLock()) { try(AutoCloseableLock lock = this.dataset.acquireDatasetReadLock()) {
references = this.dataset.getFsVolumeReferences(); references = this.dataset.getFsVolumeReferences();
for (int ndx = 0; ndx < references.size(); ndx++) { for (int ndx = 0; ndx < references.size(); ndx++) {
FsVolumeSpi vol = references.get(ndx); FsVolumeSpi vol = references.get(ndx);

View File

@ -657,12 +657,16 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
FsVolumeSpi destination) throws IOException; FsVolumeSpi destination) throws IOException;
/** /**
* Acquire the lock of the data set. * Acquire the lock of the data set. This prevents other threads from
* modifying the volume map structure inside the datanode, but other changes
* are still possible. For example modifying the genStamp of a block instance.
*/ */
AutoCloseableLock acquireDatasetLock(); AutoCloseableLock acquireDatasetLock();
/*** /***
* Acquire the read lock of the data set. * Acquire the read lock of the data set. This prevents other threads from
* modifying the volume map structure inside the datanode, but other changes
* are still possible. For example modifying the genStamp of a block instance.
* @return The AutoClosable read lock instance. * @return The AutoClosable read lock instance.
*/ */
AutoCloseableLock acquireDatasetReadLock(); AutoCloseableLock acquireDatasetReadLock();

View File

@ -41,7 +41,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Condition;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.management.NotCompliantMBeanException; import javax.management.NotCompliantMBeanException;
import javax.management.ObjectName; import javax.management.ObjectName;
@ -179,7 +178,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
@Override @Override
public FsVolumeImpl getVolume(final ExtendedBlock b) { public FsVolumeImpl getVolume(final ExtendedBlock b) {
try (AutoCloseableLock lock = datasetWriteLock.acquire()) { try (AutoCloseableLock lock = datasetReadLock.acquire()) {
final ReplicaInfo r = final ReplicaInfo r =
volumeMap.get(b.getBlockPoolId(), b.getLocalBlock()); volumeMap.get(b.getBlockPoolId(), b.getLocalBlock());
return r != null ? (FsVolumeImpl) r.getVolume() : null; return r != null ? (FsVolumeImpl) r.getVolume() : null;
@ -189,7 +188,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
@Override // FsDatasetSpi @Override // FsDatasetSpi
public Block getStoredBlock(String bpid, long blkid) public Block getStoredBlock(String bpid, long blkid)
throws IOException { throws IOException {
try (AutoCloseableLock lock = datasetWriteLock.acquire()) { try (AutoCloseableLock lock = datasetReadLock.acquire()) {
ReplicaInfo r = volumeMap.get(bpid, blkid); ReplicaInfo r = volumeMap.get(bpid, blkid);
if (r == null) { if (r == null) {
return null; return null;
@ -206,7 +205,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
public Set<? extends Replica> deepCopyReplica(String bpid) public Set<? extends Replica> deepCopyReplica(String bpid)
throws IOException { throws IOException {
Set<? extends Replica> replicas = null; Set<? extends Replica> replicas = null;
try (AutoCloseableLock lock = datasetWriteLock.acquire()) { try (AutoCloseableLock lock = datasetReadLock.acquire()) {
replicas = replicas =
new HashSet<>(volumeMap.replicas(bpid) == null ? Collections.EMPTY_SET new HashSet<>(volumeMap.replicas(bpid) == null ? Collections.EMPTY_SET
: volumeMap.replicas(bpid)); : volumeMap.replicas(bpid));
@ -302,7 +301,20 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
DFSConfigKeys.DFS_DATANODE_LOCK_REPORTING_THRESHOLD_MS_DEFAULT, DFSConfigKeys.DFS_DATANODE_LOCK_REPORTING_THRESHOLD_MS_DEFAULT,
TimeUnit.MILLISECONDS)); TimeUnit.MILLISECONDS));
this.datasetWriteLock = new AutoCloseableLock(datasetRWLock.writeLock()); this.datasetWriteLock = new AutoCloseableLock(datasetRWLock.writeLock());
this.datasetReadLock = new AutoCloseableLock(datasetRWLock.readLock()); boolean enableRL = conf.getBoolean(
DFSConfigKeys.DFS_DATANODE_LOCK_READ_WRITE_ENABLED_KEY,
DFSConfigKeys.DFS_DATANODE_LOCK_READ_WRITE_ENABLED_DEFAULT);
// The read lock can be disabled by the above config key. If it is disabled
// then we simply make the both the read and write lock variables hold
// the write lock. All accesses to the lock are via these variables, so that
// effectively disables the read lock.
if (enableRL) {
LOG.info("The datanode lock is a read write lock");
this.datasetReadLock = new AutoCloseableLock(datasetRWLock.readLock());
} else {
LOG.info("The datanode lock is an exclusive write lock");
this.datasetReadLock = this.datasetWriteLock;
}
this.datasetWriteLockCondition = datasetWriteLock.newCondition(); this.datasetWriteLockCondition = datasetWriteLock.newCondition();
// The number of volumes required for operation is the total number // The number of volumes required for operation is the total number
@ -342,7 +354,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
} }
storageMap = new ConcurrentHashMap<String, DatanodeStorage>(); storageMap = new ConcurrentHashMap<String, DatanodeStorage>();
volumeMap = new ReplicaMap(datasetRWLock); volumeMap = new ReplicaMap(datasetReadLock, datasetWriteLock);
ramDiskReplicaTracker = RamDiskReplicaTracker.getInstance(conf, this); ramDiskReplicaTracker = RamDiskReplicaTracker.getInstance(conf, this);
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@ -475,7 +487,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
.setConf(this.conf) .setConf(this.conf)
.build(); .build();
FsVolumeReference ref = fsVolume.obtainReference(); FsVolumeReference ref = fsVolume.obtainReference();
ReplicaMap tempVolumeMap = new ReplicaMap(datasetRWLock); ReplicaMap tempVolumeMap =
new ReplicaMap(datasetReadLock, datasetWriteLock);
fsVolume.getVolumeMap(tempVolumeMap, ramDiskReplicaTracker); fsVolume.getVolumeMap(tempVolumeMap, ramDiskReplicaTracker);
activateVolume(tempVolumeMap, sd, storageLocation.getStorageType(), ref); activateVolume(tempVolumeMap, sd, storageLocation.getStorageType(), ref);
@ -515,7 +528,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
final FsVolumeImpl fsVolume = final FsVolumeImpl fsVolume =
createFsVolume(sd.getStorageUuid(), sd, location); createFsVolume(sd.getStorageUuid(), sd, location);
final ReplicaMap tempVolumeMap = final ReplicaMap tempVolumeMap =
new ReplicaMap(new ReentrantReadWriteLock()); new ReplicaMap(datasetReadLock, datasetWriteLock);
ArrayList<IOException> exceptions = Lists.newArrayList(); ArrayList<IOException> exceptions = Lists.newArrayList();
for (final NamespaceInfo nsInfo : nsInfos) { for (final NamespaceInfo nsInfo : nsInfos) {
@ -810,7 +823,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
long seekOffset) throws IOException { long seekOffset) throws IOException {
ReplicaInfo info; ReplicaInfo info;
try (AutoCloseableLock lock = datasetWriteLock.acquire()) { try (AutoCloseableLock lock = datasetReadLock.acquire()) {
info = volumeMap.get(b.getBlockPoolId(), b.getLocalBlock()); info = volumeMap.get(b.getBlockPoolId(), b.getLocalBlock());
} }
@ -898,7 +911,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
@Override // FsDatasetSpi @Override // FsDatasetSpi
public ReplicaInputStreams getTmpInputStreams(ExtendedBlock b, public ReplicaInputStreams getTmpInputStreams(ExtendedBlock b,
long blkOffset, long metaOffset) throws IOException { long blkOffset, long metaOffset) throws IOException {
try (AutoCloseableLock lock = datasetWriteLock.acquire()) { try (AutoCloseableLock lock = datasetReadLock.acquire()) {
ReplicaInfo info = getReplicaInfo(b); ReplicaInfo info = getReplicaInfo(b);
FsVolumeReference ref = info.getVolume().obtainReference(); FsVolumeReference ref = info.getVolume().obtainReference();
try { try {
@ -1023,7 +1036,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
} }
FsVolumeReference volumeRef = null; FsVolumeReference volumeRef = null;
try (AutoCloseableLock lock = datasetWriteLock.acquire()) { try (AutoCloseableLock lock = datasetReadLock.acquire()) {
volumeRef = volumes.getNextVolume(targetStorageType, targetStorageId, volumeRef = volumes.getNextVolume(targetStorageType, targetStorageId,
block.getNumBytes()); block.getNumBytes());
} }
@ -1137,7 +1150,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
FsVolumeReference volumeRef = null; FsVolumeReference volumeRef = null;
try (AutoCloseableLock lock = datasetWriteLock.acquire()) { try (AutoCloseableLock lock = datasetReadLock.acquire()) {
volumeRef = destination.obtainReference(); volumeRef = destination.obtainReference();
} }
@ -1891,7 +1904,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
new HashMap<String, BlockListAsLongs.Builder>(); new HashMap<String, BlockListAsLongs.Builder>();
List<FsVolumeImpl> curVolumes = null; List<FsVolumeImpl> curVolumes = null;
try (AutoCloseableLock lock = datasetWriteLock.acquire()) { try (AutoCloseableLock lock = datasetReadLock.acquire()) {
curVolumes = volumes.getVolumes(); curVolumes = volumes.getVolumes();
for (FsVolumeSpi v : curVolumes) { for (FsVolumeSpi v : curVolumes) {
builders.put(v.getStorageID(), BlockListAsLongs.builder(maxDataLength)); builders.put(v.getStorageID(), BlockListAsLongs.builder(maxDataLength));
@ -1954,7 +1967,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
*/ */
@Override @Override
public List<ReplicaInfo> getFinalizedBlocks(String bpid) { public List<ReplicaInfo> getFinalizedBlocks(String bpid) {
try (AutoCloseableLock lock = datasetWriteLock.acquire()) { try (AutoCloseableLock lock = datasetReadLock.acquire()) {
final List<ReplicaInfo> finalized = new ArrayList<ReplicaInfo>( final List<ReplicaInfo> finalized = new ArrayList<ReplicaInfo>(
volumeMap.size(bpid)); volumeMap.size(bpid));
for (ReplicaInfo b : volumeMap.replicas(bpid)) { for (ReplicaInfo b : volumeMap.replicas(bpid)) {
@ -2047,9 +2060,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
ReplicaInfo validateBlockFile(String bpid, long blockId) { ReplicaInfo validateBlockFile(String bpid, long blockId) {
//Should we check for metadata file too? //Should we check for metadata file too?
final ReplicaInfo r; final ReplicaInfo r;
try (AutoCloseableLock lock = datasetWriteLock.acquire()) { r = volumeMap.get(bpid, blockId);
r = volumeMap.get(bpid, blockId);
}
if (r != null) { if (r != null) {
if (r.blockDataExists()) { if (r.blockDataExists()) {
return r; return r;
@ -2292,7 +2303,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
@Override // FsDatasetSpi @Override // FsDatasetSpi
public boolean contains(final ExtendedBlock block) { public boolean contains(final ExtendedBlock block) {
try (AutoCloseableLock lock = datasetWriteLock.acquire()) { try (AutoCloseableLock lock = datasetReadLock.acquire()) {
final long blockId = block.getLocalBlock().getBlockId(); final long blockId = block.getLocalBlock().getBlockId();
final String bpid = block.getBlockPoolId(); final String bpid = block.getBlockPoolId();
final ReplicaInfo r = volumeMap.get(bpid, blockId); final ReplicaInfo r = volumeMap.get(bpid, blockId);
@ -2613,7 +2624,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
@Override @Override
public String getReplicaString(String bpid, long blockId) { public String getReplicaString(String bpid, long blockId) {
try (AutoCloseableLock lock = datasetWriteLock.acquire()) { try (AutoCloseableLock lock = datasetReadLock.acquire()) {
final Replica r = volumeMap.get(bpid, blockId); final Replica r = volumeMap.get(bpid, blockId);
return r == null ? "null" : r.toString(); return r == null ? "null" : r.toString();
} }
@ -2833,7 +2844,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
@Override // FsDatasetSpi @Override // FsDatasetSpi
public long getReplicaVisibleLength(final ExtendedBlock block) public long getReplicaVisibleLength(final ExtendedBlock block)
throws IOException { throws IOException {
try (AutoCloseableLock lock = datasetWriteLock.acquire()) { try (AutoCloseableLock lock = datasetReadLock.acquire()) {
final Replica replica = getReplicaInfo(block.getBlockPoolId(), final Replica replica = getReplicaInfo(block.getBlockPoolId(),
block.getBlockId()); block.getBlockId());
if (replica.getGenerationStamp() < block.getGenerationStamp()) { if (replica.getGenerationStamp() < block.getGenerationStamp()) {
@ -2983,18 +2994,20 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
@Override // FsDatasetSpi @Override // FsDatasetSpi
public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block) public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block)
throws IOException { throws IOException {
try (AutoCloseableLock lock = datasetWriteLock.acquire()) { try (AutoCloseableLock lock = datasetReadLock.acquire()) {
final Replica replica = volumeMap.get(block.getBlockPoolId(), final Replica replica = volumeMap.get(block.getBlockPoolId(),
block.getBlockId()); block.getBlockId());
if (replica == null) { if (replica == null) {
throw new ReplicaNotFoundException(block); throw new ReplicaNotFoundException(block);
} }
if (replica.getGenerationStamp() < block.getGenerationStamp()) { synchronized(replica) {
throw new IOException( if (replica.getGenerationStamp() < block.getGenerationStamp()) {
"Replica generation stamp < block generation stamp, block=" throw new IOException(
+ block + ", replica=" + replica); "Replica generation stamp < block generation stamp, block="
} else if (replica.getGenerationStamp() > block.getGenerationStamp()) { + block + ", replica=" + replica);
block.setGenerationStamp(replica.getGenerationStamp()); } else if (replica.getGenerationStamp() > block.getGenerationStamp()) {
block.setGenerationStamp(replica.getGenerationStamp());
}
} }
} }

View File

@ -32,7 +32,6 @@ import org.apache.hadoop.util.AutoCloseableLock;
* Maintains the replica map. * Maintains the replica map.
*/ */
class ReplicaMap { class ReplicaMap {
private final ReadWriteLock rwLock;
// Lock object to synchronize this instance. // Lock object to synchronize this instance.
private final AutoCloseableLock readLock; private final AutoCloseableLock readLock;
private final AutoCloseableLock writeLock; private final AutoCloseableLock writeLock;
@ -41,18 +40,22 @@ class ReplicaMap {
private final Map<String, LightWeightResizableGSet<Block, ReplicaInfo>> map = private final Map<String, LightWeightResizableGSet<Block, ReplicaInfo>> map =
new HashMap<>(); new HashMap<>();
ReplicaMap(ReadWriteLock lock) { ReplicaMap(AutoCloseableLock rLock, AutoCloseableLock wLock) {
if (lock == null) { if (rLock == null || wLock == null) {
throw new HadoopIllegalArgumentException( throw new HadoopIllegalArgumentException(
"Lock to synchronize on cannot be null"); "Lock to synchronize on cannot be null");
} }
this.rwLock = lock; this.readLock = rLock;
this.readLock = new AutoCloseableLock(rwLock.readLock()); this.writeLock = wLock;
this.writeLock = new AutoCloseableLock(rwLock.writeLock()); }
ReplicaMap(ReadWriteLock lock) {
this(new AutoCloseableLock(lock.readLock()),
new AutoCloseableLock(lock.writeLock()));
} }
String[] getBlockPoolList() { String[] getBlockPoolList() {
try (AutoCloseableLock l = writeLock.acquire()) { try (AutoCloseableLock l = readLock.acquire()) {
return map.keySet().toArray(new String[map.keySet().size()]); return map.keySet().toArray(new String[map.keySet().size()]);
} }
} }
@ -97,7 +100,7 @@ class ReplicaMap {
*/ */
ReplicaInfo get(String bpid, long blockId) { ReplicaInfo get(String bpid, long blockId) {
checkBlockPool(bpid); checkBlockPool(bpid);
try (AutoCloseableLock l = writeLock.acquire()) { try (AutoCloseableLock l = readLock.acquire()) {
LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid); LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
return m != null ? m.get(new Block(blockId)) : null; return m != null ? m.get(new Block(blockId)) : null;
} }
@ -218,7 +221,7 @@ class ReplicaMap {
* @return the number of replicas in the map * @return the number of replicas in the map
*/ */
int size(String bpid) { int size(String bpid) {
try (AutoCloseableLock l = writeLock.acquire()) { try (AutoCloseableLock l = readLock.acquire()) {
LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid); LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
return m != null ? m.size() : 0; return m != null ? m.size() : 0;
} }
@ -266,4 +269,14 @@ class ReplicaMap {
AutoCloseableLock getLock() { AutoCloseableLock getLock() {
return writeLock; return writeLock;
} }
/**
* Get the lock object used for synchronizing the ReplicasMap for read only
* operations.
* @return The read lock object
*/
AutoCloseableLock getReadLock() {
return readLock;
}
} }

View File

@ -3009,6 +3009,19 @@
</description> </description>
</property> </property>
<property>
<name>dfs.datanode.lock.read.write.enabled</name>
<value>true</value>
<description>If this is true, the FsDataset lock will be a read write lock. If
it is false, all locks will be a write lock.
Enabling this should give better datanode throughput, as many read only
functions can run concurrently under the read lock, when they would
previously have required the exclusive write lock. As the feature is
experimental, this switch can be used to disable the shared read lock, and
cause all lock acquisitions to use the exclusive write lock.
</description>
</property>
<property> <property>
<name>dfs.datanode.lock-reporting-threshold-ms</name> <name>dfs.datanode.lock-reporting-threshold-ms</name>
<value>300</value> <value>300</value>

View File

@ -20,7 +20,6 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import java.util.function.Supplier; import java.util.function.Supplier;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import java.io.OutputStream;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Paths; import java.nio.file.Paths;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
@ -65,6 +64,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.io.MultipleIOException; import org.apache.hadoop.io.MultipleIOException;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.LambdaTestUtils; import org.apache.hadoop.test.LambdaTestUtils;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.util.FakeTimer; import org.apache.hadoop.util.FakeTimer;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.junit.Assert; import org.junit.Assert;
@ -85,6 +85,7 @@ import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DN_CACHED_DFSUSED_CHECK_INTERVAL_MS; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DN_CACHED_DFSUSED_CHECK_INTERVAL_MS;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY;
@ -196,6 +197,101 @@ public class TestFsDatasetImpl {
assertEquals(0, dataset.getNumFailedVolumes()); assertEquals(0, dataset.getNumFailedVolumes());
} }
@Test
public void testReadLockEnabledByDefault()
throws IOException, InterruptedException {
final FsDatasetSpi ds = dataset;
AtomicBoolean accessed = new AtomicBoolean(false);
CountDownLatch latch = new CountDownLatch(1);
CountDownLatch waiterLatch = new CountDownLatch(1);
Thread holder = new Thread() {
public void run() {
try (AutoCloseableLock l = ds.acquireDatasetReadLock()) {
latch.countDown();
sleep(10000);
} catch (Exception e) {
}
}
};
Thread waiter = new Thread() {
public void run() {
try (AutoCloseableLock l = ds.acquireDatasetReadLock()) {
waiterLatch.countDown();
accessed.getAndSet(true);
} catch (Exception e) {
}
}
};
holder.start();
latch.await();
waiter.start();
waiterLatch.await();
// The holder thread is still holding the lock, but the waiter can still
// run as the lock is a shared read lock.
assertEquals(true, accessed.get());
holder.interrupt();
holder.join();
waiter.join();
}
@Test(timeout=10000)
public void testReadLockCanBeDisabledByConfig()
throws IOException, InterruptedException {
HdfsConfiguration conf = new HdfsConfiguration();
conf.setBoolean(
DFSConfigKeys.DFS_DATANODE_LOCK_READ_WRITE_ENABLED_KEY, false);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(1).build();
try {
cluster.waitActive();
DataNode dn = cluster.getDataNodes().get(0);
final FsDatasetSpi<?> ds = DataNodeTestUtils.getFSDataset(dn);
CountDownLatch latch = new CountDownLatch(1);
CountDownLatch waiterLatch = new CountDownLatch(1);
AtomicBoolean accessed = new AtomicBoolean(false);
Thread holder = new Thread() {
public void run() {
try (AutoCloseableLock l = ds.acquireDatasetReadLock()) {
latch.countDown();
sleep(10000);
} catch (Exception e) {
}
}
};
Thread waiter = new Thread() {
public void run() {
try (AutoCloseableLock l = ds.acquireDatasetReadLock()) {
accessed.getAndSet(true);
waiterLatch.countDown();
} catch (Exception e) {
}
}
};
holder.start();
latch.await();
waiter.start();
Thread.sleep(200);
// Waiting thread should not have been able to update the variable
// as the read lock is disabled and hence an exclusive lock.
assertEquals(false, accessed.get());
holder.interrupt();
holder.join();
waiterLatch.await();
// After the holder thread exits, the variable is updated.
assertEquals(true, accessed.get());
waiter.join();
} finally {
cluster.shutdown();
}
}
@Test @Test
public void testAddVolumes() throws IOException { public void testAddVolumes() throws IOException {
final int numNewVolumes = 3; final int numNewVolumes = 3;
@ -242,8 +338,8 @@ public class TestFsDatasetImpl {
@Test @Test
public void testAddVolumeWithSameStorageUuid() throws IOException { public void testAddVolumeWithSameStorageUuid() throws IOException {
HdfsConfiguration conf = new HdfsConfiguration(); HdfsConfiguration config = new HdfsConfiguration();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) MiniDFSCluster cluster = new MiniDFSCluster.Builder(config)
.numDataNodes(1).build(); .numDataNodes(1).build();
try { try {
cluster.waitActive(); cluster.waitActive();