HDFS-15160. ReplicaMap, Disk Balancer, Directory Scanner and various FsDatasetImpl methods should use datanode readlock. (#3200)

* HDFS-15150. Introduce read write lock to Datanode. Contributed Stephen O'Donnell.
Signed-off-by: Wei-Chiu Chuang <weichiu@apache.org>
(cherry picked from commit d7c136b9ed6d99e1b03f5b89723b3a20df359ba8)

* HDFS-15160. ReplicaMap, Disk Balancer, Directory Scanner and various FsDatasetImpl methods should use datanode readlock. Contributed by Stephen O'Donnell.
Signed-off-by: Wei-Chiu Chuang <weichiu@apache.org>
(cherry picked from commit 2a67e2b1a0e3a5f91056f5b977ef9c4c07ba6718)

* HDFS-15457. TestFsDatasetImpl fails intermittently (#2407)
(cherry picked from commit 98097b8f19789605b9697f6a959da57261e0fe19)

* HDFS-15818. Fix TestFsDatasetImpl.testReadLockCanBeDisabledByConfig. Contributed by Leon Gao (#2679)
(cherry picked from commit 9434c1eccc255a25ea5e11f6d8c9e1f83996d6b4)

* HDFS-15160. amend to fix javac error supressing unchecked warning
Co-authored-by: Stephen O'Donnell <sodonnel@apache.org>
Co-authored-by: LeonGao <liangg@uber.com>
This commit is contained in:
Ahmed Hussein 2021-09-11 06:52:16 -05:00 committed by GitHub
parent 1944e0d714
commit 318bc5ff69
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 345 additions and 114 deletions

View File

@ -37,7 +37,7 @@ public class InstrumentedReadWriteLock implements ReadWriteLock {
private final Lock readLock;
private final Lock writeLock;
InstrumentedReadWriteLock(boolean fair, String name, Logger logger,
public InstrumentedReadWriteLock(boolean fair, String name, Logger logger,
long minLoggingGapMs, long lockWarningThresholdMs) {
ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(fair);
readLock = new InstrumentedReadLock(name, logger, readWriteLock,

View File

@ -549,6 +549,17 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
"dfs.lock.suppress.warning.interval";
public static final long DFS_LOCK_SUPPRESS_WARNING_INTERVAL_DEFAULT =
10000; //ms
public static final String DFS_DATANODE_LOCK_FAIR_KEY =
"dfs.datanode.lock.fair";
public static final boolean DFS_DATANODE_LOCK_FAIR_DEFAULT = true;
public static final String DFS_DATANODE_LOCK_READ_WRITE_ENABLED_KEY =
"dfs.datanode.lock.read.write.enabled";
public static final Boolean DFS_DATANODE_LOCK_READ_WRITE_ENABLED_DEFAULT =
true;
public static final String DFS_DATANODE_LOCK_REPORTING_THRESHOLD_MS_KEY =
"dfs.datanode.lock-reporting-threshold-ms";
public static final long
DFS_DATANODE_LOCK_REPORTING_THRESHOLD_MS_DEFAULT = 300L;
public static final String DFS_UPGRADE_DOMAIN_FACTOR = "dfs.namenode.upgrade.domain.factor";
public static final int DFS_UPGRADE_DOMAIN_FACTOR_DEFAULT = DFS_REPLICATION_DEFAULT;

View File

@ -254,7 +254,7 @@ class BlockSender implements java.io.Closeable {
// the append write.
ChunkChecksum chunkChecksum = null;
final long replicaVisibleLength;
try(AutoCloseableLock lock = datanode.data.acquireDatasetLock()) {
try(AutoCloseableLock lock = datanode.data.acquireDatasetReadLock()) {
replica = getReplica(block, datanode);
replicaVisibleLength = replica.getVisibleLength();
}

View File

@ -3010,7 +3010,7 @@ void transferReplicaForPipelineRecovery(final ExtendedBlock b,
final BlockConstructionStage stage;
//get replica information
try(AutoCloseableLock lock = data.acquireDatasetLock()) {
try(AutoCloseableLock lock = data.acquireDatasetReadLock()) {
Block storedBlock = data.getStoredBlock(b.getBlockPoolId(),
b.getBlockId());
if (null == storedBlock) {

View File

@ -504,7 +504,7 @@ private Map<String, String> getStorageIDToVolumeBasePathMap()
Map<String, String> storageIDToVolBasePathMap = new HashMap<>();
FsDatasetSpi.FsVolumeReferences references;
try {
try(AutoCloseableLock lock = this.dataset.acquireDatasetLock()) {
try(AutoCloseableLock lock = this.dataset.acquireDatasetReadLock()) {
references = this.dataset.getFsVolumeReferences();
for (int ndx = 0; ndx < references.size(); ndx++) {
FsVolumeSpi vol = references.get(ndx);

View File

@ -657,9 +657,19 @@ ReplicaInfo moveBlockAcrossVolumes(final ExtendedBlock block,
FsVolumeSpi destination) throws IOException;
/**
* Acquire the lock of the data set.
* Acquire the lock of the data set. This prevents other threads from
* modifying the volume map structure inside the datanode, but other changes
* are still possible. For example modifying the genStamp of a block instance.
*/
AutoCloseableLock acquireDatasetLock();
/***
* Acquire the read lock of the data set. This prevents other threads from
* modifying the volume map structure inside the datanode, but other changes
* are still possible. For example modifying the genStamp of a block instance.
* @return The AutoClosable read lock instance.
*/
AutoCloseableLock acquireDatasetReadLock();
Set<? extends Replica> deepCopyReplica(String bpid) throws IOException;
}

View File

@ -42,6 +42,7 @@
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.hdfs.server.datanode.FSCachingGetSpaceUsed;
import org.slf4j.Logger;
@ -66,7 +67,6 @@
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.MultipleIOException;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.DataChecksum.Type;
import org.apache.hadoop.util.DiskChecker;
@ -874,7 +874,7 @@ void shutdown(BlockListAsLongs blocksListToPersist) {
private boolean readReplicasFromCache(ReplicaMap volumeMap,
final RamDiskReplicaTracker lazyWriteReplicaMap) {
ReplicaMap tmpReplicaMap = new ReplicaMap(new AutoCloseableLock());
ReplicaMap tmpReplicaMap = new ReplicaMap(new ReentrantReadWriteLock());
File replicaFile = new File(currentDir, REPLICA_CACHE_FILE);
// Check whether the file exists or not.
if (!replicaFile.exists()) {

View File

@ -40,7 +40,6 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.TimeUnit;
import javax.management.NotCompliantMBeanException;
@ -112,7 +111,7 @@
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
import org.apache.hadoop.util.InstrumentedLock;
import org.apache.hadoop.util.InstrumentedReadWriteLock;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.Timer;
@ -179,7 +178,7 @@ public StorageReport[] getStorageReports(String bpid)
@Override
public FsVolumeImpl getVolume(final ExtendedBlock b) {
try (AutoCloseableLock lock = datasetLock.acquire()) {
try (AutoCloseableLock lock = datasetReadLock.acquire()) {
final ReplicaInfo r =
volumeMap.get(b.getBlockPoolId(), b.getLocalBlock());
return r != null ? (FsVolumeImpl) r.getVolume() : null;
@ -189,7 +188,7 @@ public FsVolumeImpl getVolume(final ExtendedBlock b) {
@Override // FsDatasetSpi
public Block getStoredBlock(String bpid, long blkid)
throws IOException {
try (AutoCloseableLock lock = datasetLock.acquire()) {
try (AutoCloseableLock lock = datasetReadLock.acquire()) {
ReplicaInfo r = volumeMap.get(bpid, blkid);
if (r == null) {
return null;
@ -202,12 +201,16 @@ public Block getStoredBlock(String bpid, long blkid)
* The deepCopyReplica call doesn't use the datasetock since it will lead the
* potential deadlock with the {@link FsVolumeList#addBlockPool} call.
*/
@SuppressWarnings("unchecked")
@Override
public Set<? extends Replica> deepCopyReplica(String bpid)
throws IOException {
Set<? extends Replica> replicas =
new HashSet<>(volumeMap.replicas(bpid) == null ? Collections.EMPTY_SET
: volumeMap.replicas(bpid));
Set<? extends Replica> replicas;
try (AutoCloseableLock lock = datasetReadLock.acquire()) {
replicas =
new HashSet<>(volumeMap.replicas(bpid) == null ? Collections.EMPTY_SET
: volumeMap.replicas(bpid));
}
return Collections.unmodifiableSet(replicas);
}
@ -268,8 +271,12 @@ public LengthInputStream getMetaDataInputStream(ExtendedBlock b)
private final int maxDataLength;
@VisibleForTesting
final AutoCloseableLock datasetLock;
private final Condition datasetLockCondition;
final AutoCloseableLock datasetWriteLock;
@VisibleForTesting
final AutoCloseableLock datasetReadLock;
@VisibleForTesting
final InstrumentedReadWriteLock datasetRWLock;
private final Condition datasetWriteLockCondition;
private static String blockPoolId = "";
/**
@ -282,15 +289,33 @@ public LengthInputStream getMetaDataInputStream(ExtendedBlock b)
this.dataStorage = storage;
this.conf = conf;
this.smallBufferSize = DFSUtilClient.getSmallBufferSize(conf);
this.datasetLock = new AutoCloseableLock(
new InstrumentedLock(getClass().getName(), LOG,
new ReentrantLock(true),
conf.getTimeDuration(
DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_KEY,
DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_DEFAULT,
TimeUnit.MILLISECONDS),
300));
this.datasetLockCondition = datasetLock.newCondition();
this.datasetRWLock = new InstrumentedReadWriteLock(
conf.getBoolean(DFSConfigKeys.DFS_DATANODE_LOCK_FAIR_KEY,
DFSConfigKeys.DFS_DATANODE_LOCK_FAIR_DEFAULT),
"FsDatasetRWLock", LOG, conf.getTimeDuration(
DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_KEY,
DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_DEFAULT,
TimeUnit.MILLISECONDS),
conf.getTimeDuration(
DFSConfigKeys.DFS_DATANODE_LOCK_REPORTING_THRESHOLD_MS_KEY,
DFSConfigKeys.DFS_DATANODE_LOCK_REPORTING_THRESHOLD_MS_DEFAULT,
TimeUnit.MILLISECONDS));
this.datasetWriteLock = new AutoCloseableLock(datasetRWLock.writeLock());
boolean enableRL = conf.getBoolean(
DFSConfigKeys.DFS_DATANODE_LOCK_READ_WRITE_ENABLED_KEY,
DFSConfigKeys.DFS_DATANODE_LOCK_READ_WRITE_ENABLED_DEFAULT);
// The read lock can be disabled by the above config key. If it is disabled
// then we simply make the both the read and write lock variables hold
// the write lock. All accesses to the lock are via these variables, so that
// effectively disables the read lock.
if (enableRL) {
LOG.info("The datanode lock is a read write lock");
this.datasetReadLock = new AutoCloseableLock(datasetRWLock.readLock());
} else {
LOG.info("The datanode lock is an exclusive write lock");
this.datasetReadLock = this.datasetWriteLock;
}
this.datasetWriteLockCondition = datasetWriteLock.newCondition();
// The number of volumes required for operation is the total number
// of volumes minus the number of failed volumes we can tolerate.
@ -329,7 +354,7 @@ public LengthInputStream getMetaDataInputStream(ExtendedBlock b)
}
storageMap = new ConcurrentHashMap<String, DatanodeStorage>();
volumeMap = new ReplicaMap(datasetLock);
volumeMap = new ReplicaMap(datasetReadLock, datasetWriteLock);
ramDiskReplicaTracker = RamDiskReplicaTracker.getInstance(conf, this);
@SuppressWarnings("unchecked")
@ -383,7 +408,12 @@ public LengthInputStream getMetaDataInputStream(ExtendedBlock b)
@Override
public AutoCloseableLock acquireDatasetLock() {
return datasetLock.acquire();
return datasetWriteLock.acquire();
}
@Override
public AutoCloseableLock acquireDatasetReadLock() {
return datasetReadLock.acquire();
}
/**
@ -424,7 +454,7 @@ private void activateVolume(
ReplicaMap replicaMap,
Storage.StorageDirectory sd, StorageType storageType,
FsVolumeReference ref) throws IOException {
try (AutoCloseableLock lock = datasetLock.acquire()) {
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
DatanodeStorage dnStorage = storageMap.get(sd.getStorageUuid());
if (dnStorage != null) {
final String errorMsg = String.format(
@ -457,7 +487,8 @@ private void addVolume(Storage.StorageDirectory sd) throws IOException {
.setConf(this.conf)
.build();
FsVolumeReference ref = fsVolume.obtainReference();
ReplicaMap tempVolumeMap = new ReplicaMap(datasetLock);
ReplicaMap tempVolumeMap =
new ReplicaMap(datasetReadLock, datasetWriteLock);
fsVolume.getVolumeMap(tempVolumeMap, ramDiskReplicaTracker);
activateVolume(tempVolumeMap, sd, storageLocation.getStorageType(), ref);
@ -496,7 +527,8 @@ public void addVolume(final StorageLocation location,
StorageType storageType = location.getStorageType();
final FsVolumeImpl fsVolume =
createFsVolume(sd.getStorageUuid(), sd, location);
final ReplicaMap tempVolumeMap = new ReplicaMap(new AutoCloseableLock());
final ReplicaMap tempVolumeMap =
new ReplicaMap(datasetReadLock, datasetWriteLock);
ArrayList<IOException> exceptions = Lists.newArrayList();
for (final NamespaceInfo nsInfo : nsInfos) {
@ -541,7 +573,7 @@ public void removeVolumes(
new ArrayList<>(storageLocsToRemove);
Map<String, List<ReplicaInfo>> blkToInvalidate = new HashMap<>();
List<String> storageToRemove = new ArrayList<>();
try (AutoCloseableLock lock = datasetLock.acquire()) {
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
for (int idx = 0; idx < dataStorage.getNumStorageDirs(); idx++) {
Storage.StorageDirectory sd = dataStorage.getStorageDir(idx);
final StorageLocation sdLocation = sd.getStorageLocation();
@ -553,7 +585,7 @@ public void removeVolumes(
// Disable the volume from the service.
asyncDiskService.removeVolume(sd.getStorageUuid());
volumes.removeVolume(sdLocation, clearFailure);
volumes.waitVolumeRemoved(5000, datasetLockCondition);
volumes.waitVolumeRemoved(5000, datasetWriteLockCondition);
// Removed all replica information for the blocks on the volume.
// Unlike updating the volumeMap in addVolume(), this operation does
@ -600,7 +632,7 @@ public void removeVolumes(
}
}
try (AutoCloseableLock lock = datasetLock.acquire()) {
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
for(String storageUuid : storageToRemove) {
storageMap.remove(storageUuid);
}
@ -791,7 +823,7 @@ public InputStream getBlockInputStream(ExtendedBlock b,
long seekOffset) throws IOException {
ReplicaInfo info;
try (AutoCloseableLock lock = datasetLock.acquire()) {
try (AutoCloseableLock lock = datasetReadLock.acquire()) {
info = volumeMap.get(b.getBlockPoolId(), b.getLocalBlock());
}
@ -879,7 +911,7 @@ ReplicaInfo getReplicaInfo(String bpid, long blkid)
@Override // FsDatasetSpi
public ReplicaInputStreams getTmpInputStreams(ExtendedBlock b,
long blkOffset, long metaOffset) throws IOException {
try (AutoCloseableLock lock = datasetLock.acquire()) {
try (AutoCloseableLock lock = datasetReadLock.acquire()) {
ReplicaInfo info = getReplicaInfo(b);
FsVolumeReference ref = info.getVolume().obtainReference();
try {
@ -1004,7 +1036,7 @@ public ReplicaInfo moveBlockAcrossStorage(ExtendedBlock block,
}
FsVolumeReference volumeRef = null;
try (AutoCloseableLock lock = datasetLock.acquire()) {
try (AutoCloseableLock lock = datasetReadLock.acquire()) {
volumeRef = volumes.getNextVolume(targetStorageType, targetStorageId,
block.getNumBytes());
}
@ -1118,7 +1150,7 @@ public ReplicaInfo moveBlockAcrossVolumes(ExtendedBlock block, FsVolumeSpi
FsVolumeReference volumeRef = null;
try (AutoCloseableLock lock = datasetLock.acquire()) {
try (AutoCloseableLock lock = datasetReadLock.acquire()) {
volumeRef = destination.obtainReference();
}
@ -1206,7 +1238,7 @@ static void computeChecksum(ReplicaInfo srcReplica, File dstMeta,
@Override // FsDatasetSpi
public ReplicaHandler append(ExtendedBlock b,
long newGS, long expectedBlockLen) throws IOException {
try (AutoCloseableLock lock = datasetLock.acquire()) {
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
// If the block was successfully finalized because all packets
// were successfully processed at the Datanode but the ack for
// some of the packets were not received by the client. The client
@ -1258,7 +1290,7 @@ public ReplicaHandler append(ExtendedBlock b,
private ReplicaInPipeline append(String bpid,
ReplicaInfo replicaInfo, long newGS, long estimateBlockLen)
throws IOException {
try (AutoCloseableLock lock = datasetLock.acquire()) {
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
// If the block is cached, start uncaching it.
if (replicaInfo.getState() != ReplicaState.FINALIZED) {
throw new IOException("Only a Finalized replica can be appended to; "
@ -1354,7 +1386,7 @@ public ReplicaHandler recoverAppend(
while (true) {
try {
try (AutoCloseableLock lock = datasetLock.acquire()) {
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen);
FsVolumeReference ref = replicaInfo.getVolume().obtainReference();
ReplicaInPipeline replica;
@ -1386,7 +1418,7 @@ public Replica recoverClose(ExtendedBlock b, long newGS,
LOG.info("Recover failed close " + b);
while (true) {
try {
try (AutoCloseableLock lock = datasetLock.acquire()) {
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
// check replica's state
ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen);
// bump the replica's GS
@ -1408,7 +1440,7 @@ public Replica recoverClose(ExtendedBlock b, long newGS,
public ReplicaHandler createRbw(
StorageType storageType, String storageId, ExtendedBlock b,
boolean allowLazyPersist) throws IOException {
try (AutoCloseableLock lock = datasetLock.acquire()) {
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
b.getBlockId());
if (replicaInfo != null) {
@ -1479,7 +1511,7 @@ public ReplicaHandler recoverRbw(
while (true) {
try {
try (AutoCloseableLock lock = datasetLock.acquire()) {
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
ReplicaInfo replicaInfo =
getReplicaInfo(b.getBlockPoolId(), b.getBlockId());
// check the replica's state
@ -1504,7 +1536,7 @@ public ReplicaHandler recoverRbw(
private ReplicaHandler recoverRbwImpl(ReplicaInPipeline rbw,
ExtendedBlock b, long newGS, long minBytesRcvd, long maxBytesRcvd)
throws IOException {
try (AutoCloseableLock lock = datasetLock.acquire()) {
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
// check generation stamp
long replicaGenerationStamp = rbw.getGenerationStamp();
if (replicaGenerationStamp < b.getGenerationStamp() ||
@ -1565,7 +1597,7 @@ private ReplicaHandler recoverRbwImpl(ReplicaInPipeline rbw,
public ReplicaInPipeline convertTemporaryToRbw(
final ExtendedBlock b) throws IOException {
try (AutoCloseableLock lock = datasetLock.acquire()) {
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
final long blockId = b.getBlockId();
final long expectedGs = b.getGenerationStamp();
final long visible = b.getNumBytes();
@ -1639,7 +1671,7 @@ public ReplicaHandler createTemporary(StorageType storageType,
ReplicaInfo lastFoundReplicaInfo = null;
boolean isInPipeline = false;
do {
try (AutoCloseableLock lock = datasetLock.acquire()) {
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
ReplicaInfo currentReplicaInfo =
volumeMap.get(b.getBlockPoolId(), b.getBlockId());
if (currentReplicaInfo == lastFoundReplicaInfo) {
@ -1692,7 +1724,7 @@ public ReplicaHandler createTemporary(StorageType storageType,
invalidate(b.getBlockPoolId(), new Block[] { lastFoundReplicaInfo },
false);
}
try (AutoCloseableLock lock = datasetLock.acquire()) {
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
FsVolumeReference ref = volumes.getNextVolume(storageType, storageId, b
.getNumBytes());
FsVolumeImpl v = (FsVolumeImpl) ref.getVolume();
@ -1743,7 +1775,7 @@ public void finalizeBlock(ExtendedBlock b, boolean fsyncDir)
throws IOException {
ReplicaInfo replicaInfo = null;
ReplicaInfo finalizedReplicaInfo = null;
try (AutoCloseableLock lock = datasetLock.acquire()) {
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
if (Thread.interrupted()) {
// Don't allow data modifications from interrupted threads
throw new IOException("Cannot finalize block from Interrupted Thread");
@ -1774,7 +1806,7 @@ public void finalizeBlock(ExtendedBlock b, boolean fsyncDir)
private ReplicaInfo finalizeReplica(String bpid, ReplicaInfo replicaInfo)
throws IOException {
try (AutoCloseableLock lock = datasetLock.acquire()) {
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
// Compare generation stamp of old and new replica before finalizing
if (volumeMap.get(bpid, replicaInfo.getBlockId()).getGenerationStamp()
> replicaInfo.getGenerationStamp()) {
@ -1819,7 +1851,7 @@ private ReplicaInfo finalizeReplica(String bpid, ReplicaInfo replicaInfo)
*/
@Override // FsDatasetSpi
public void unfinalizeBlock(ExtendedBlock b) throws IOException {
try (AutoCloseableLock lock = datasetLock.acquire()) {
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
b.getLocalBlock());
if (replicaInfo != null &&
@ -1872,7 +1904,7 @@ public Map<DatanodeStorage, BlockListAsLongs> getBlockReports(String bpid) {
new HashMap<String, BlockListAsLongs.Builder>();
List<FsVolumeImpl> curVolumes = null;
try (AutoCloseableLock lock = datasetLock.acquire()) {
try (AutoCloseableLock lock = datasetReadLock.acquire()) {
curVolumes = volumes.getVolumes();
for (FsVolumeSpi v : curVolumes) {
builders.put(v.getStorageID(), BlockListAsLongs.builder(maxDataLength));
@ -1927,7 +1959,7 @@ public Map<DatanodeStorage, BlockListAsLongs> getBlockReports(String bpid) {
* Gets a list of references to the finalized blocks for the given block pool.
* <p>
* Callers of this function should call
* {@link FsDatasetSpi#acquireDatasetLock} to avoid blocks' status being
* {@link FsDatasetSpi#acquireDatasetLock()} to avoid blocks' status being
* changed during list iteration.
* </p>
* @return a list of references to the finalized blocks for the given block
@ -1935,7 +1967,7 @@ public Map<DatanodeStorage, BlockListAsLongs> getBlockReports(String bpid) {
*/
@Override
public List<ReplicaInfo> getFinalizedBlocks(String bpid) {
try (AutoCloseableLock lock = datasetLock.acquire()) {
try (AutoCloseableLock lock = datasetReadLock.acquire()) {
final List<ReplicaInfo> finalized = new ArrayList<ReplicaInfo>(
volumeMap.size(bpid));
for (ReplicaInfo b : volumeMap.replicas(bpid)) {
@ -2028,9 +2060,7 @@ private boolean isValid(final ExtendedBlock b, final ReplicaState state) {
ReplicaInfo validateBlockFile(String bpid, long blockId) {
//Should we check for metadata file too?
final ReplicaInfo r;
try (AutoCloseableLock lock = datasetLock.acquire()) {
r = volumeMap.get(bpid, blockId);
}
r = volumeMap.get(bpid, blockId);
if (r != null) {
if (r.blockDataExists()) {
return r;
@ -2079,7 +2109,7 @@ private void invalidate(String bpid, Block[] invalidBlks, boolean async)
for (int i = 0; i < invalidBlks.length; i++) {
final ReplicaInfo removing;
final FsVolumeImpl v;
try (AutoCloseableLock lock = datasetLock.acquire()) {
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
final ReplicaInfo info = volumeMap.get(bpid, invalidBlks[i]);
if (info == null) {
ReplicaInfo infoByBlockId =
@ -2205,7 +2235,7 @@ private void cacheBlock(String bpid, long blockId) {
long length, genstamp;
Executor volumeExecutor;
try (AutoCloseableLock lock = datasetLock.acquire()) {
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
ReplicaInfo info = volumeMap.get(bpid, blockId);
boolean success = false;
try {
@ -2273,7 +2303,7 @@ public boolean isCached(String bpid, long blockId) {
@Override // FsDatasetSpi
public boolean contains(final ExtendedBlock block) {
try (AutoCloseableLock lock = datasetLock.acquire()) {
try (AutoCloseableLock lock = datasetReadLock.acquire()) {
final long blockId = block.getLocalBlock().getBlockId();
final String bpid = block.getBlockPoolId();
final ReplicaInfo r = volumeMap.get(bpid, blockId);
@ -2393,7 +2423,7 @@ public void checkAndUpdate(String bpid, ScanInfo scanInfo)
Block corruptBlock = null;
ReplicaInfo memBlockInfo;
try (AutoCloseableLock lock = datasetLock.acquire()) {
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
memBlockInfo = volumeMap.get(bpid, blockId);
if (memBlockInfo != null &&
memBlockInfo.getState() != ReplicaState.FINALIZED) {
@ -2594,7 +2624,7 @@ public ReplicaInfo getReplica(String bpid, long blockId) {
@Override
public String getReplicaString(String bpid, long blockId) {
try (AutoCloseableLock lock = datasetLock.acquire()) {
try (AutoCloseableLock lock = datasetReadLock.acquire()) {
final Replica r = volumeMap.get(bpid, blockId);
return r == null ? "null" : r.toString();
}
@ -2701,7 +2731,7 @@ public Replica updateReplicaUnderRecovery(
final long recoveryId,
final long newBlockId,
final long newlength) throws IOException {
try (AutoCloseableLock lock = datasetLock.acquire()) {
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
//get replica
final String bpid = oldBlock.getBlockPoolId();
final ReplicaInfo replica = volumeMap.get(bpid, oldBlock.getBlockId());
@ -2814,7 +2844,7 @@ private ReplicaInfo updateReplicaUnderRecovery(
@Override // FsDatasetSpi
public long getReplicaVisibleLength(final ExtendedBlock block)
throws IOException {
try (AutoCloseableLock lock = datasetLock.acquire()) {
try (AutoCloseableLock lock = datasetReadLock.acquire()) {
final Replica replica = getReplicaInfo(block.getBlockPoolId(),
block.getBlockId());
if (replica.getGenerationStamp() < block.getGenerationStamp()) {
@ -2831,7 +2861,7 @@ public void addBlockPool(String bpid, Configuration conf)
throws IOException {
LOG.info("Adding block pool " + bpid);
AddBlockPoolException volumeExceptions = new AddBlockPoolException();
try (AutoCloseableLock lock = datasetLock.acquire()) {
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
try {
volumes.addBlockPool(bpid, conf);
} catch (AddBlockPoolException e) {
@ -2861,7 +2891,7 @@ public static void setBlockPoolId(String bpid) {
@Override
public void shutdownBlockPool(String bpid) {
try (AutoCloseableLock lock = datasetLock.acquire()) {
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
LOG.info("Removing block pool " + bpid);
Map<DatanodeStorage, BlockListAsLongs> blocksPerVolume
= getBlockReports(bpid);
@ -2935,7 +2965,7 @@ public Map<String, Object> getVolumeInfoMap() {
@Override //FsDatasetSpi
public void deleteBlockPool(String bpid, boolean force)
throws IOException {
try (AutoCloseableLock lock = datasetLock.acquire()) {
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
List<FsVolumeImpl> curVolumes = volumes.getVolumes();
if (!force) {
for (FsVolumeImpl volume : curVolumes) {
@ -2964,18 +2994,20 @@ public void deleteBlockPool(String bpid, boolean force)
@Override // FsDatasetSpi
public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block)
throws IOException {
try (AutoCloseableLock lock = datasetLock.acquire()) {
try (AutoCloseableLock lock = datasetReadLock.acquire()) {
final Replica replica = volumeMap.get(block.getBlockPoolId(),
block.getBlockId());
if (replica == null) {
throw new ReplicaNotFoundException(block);
}
if (replica.getGenerationStamp() < block.getGenerationStamp()) {
throw new IOException(
"Replica generation stamp < block generation stamp, block="
+ block + ", replica=" + replica);
} else if (replica.getGenerationStamp() > block.getGenerationStamp()) {
block.setGenerationStamp(replica.getGenerationStamp());
synchronized(replica) {
if (replica.getGenerationStamp() < block.getGenerationStamp()) {
throw new IOException(
"Replica generation stamp < block generation stamp, block="
+ block + ", replica=" + replica);
} else if (replica.getGenerationStamp() > block.getGenerationStamp()) {
block.setGenerationStamp(replica.getGenerationStamp());
}
}
}
@ -3016,7 +3048,7 @@ public void clearRollingUpgradeMarker(String bpid) throws IOException {
@Override
public void onCompleteLazyPersist(String bpId, long blockId,
long creationTime, File[] savedFiles, FsVolumeImpl targetVolume) {
try (AutoCloseableLock lock = datasetLock.acquire()) {
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
ramDiskReplicaTracker.recordEndLazyPersist(bpId, blockId, savedFiles);
targetVolume.incDfsUsedAndNumBlocks(bpId, savedFiles[0].length()
@ -3150,7 +3182,7 @@ private boolean saveNextReplica() {
try {
block = ramDiskReplicaTracker.dequeueNextReplicaToPersist();
if (block != null) {
try (AutoCloseableLock lock = datasetLock.acquire()) {
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
replicaInfo = volumeMap.get(block.getBlockPoolId(), block.getBlockId());
// If replicaInfo is null, the block was either deleted before
@ -3217,7 +3249,7 @@ public void evictBlocks(long bytesNeeded) throws IOException {
ReplicaInfo replicaInfo, newReplicaInfo;
final String bpid = replicaState.getBlockPoolId();
try (AutoCloseableLock lock = datasetLock.acquire()) {
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
replicaInfo = getReplicaInfo(replicaState.getBlockPoolId(),
replicaState.getBlockId());
Preconditions.checkState(replicaInfo.getVolume().isTransientStorage());
@ -3390,7 +3422,7 @@ public int getVolumeCount() {
}
void stopAllDataxceiverThreads(FsVolumeImpl volume) {
try (AutoCloseableLock lock = datasetLock.acquire()) {
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
for (String bpid : volumeMap.getBlockPoolList()) {
Collection<ReplicaInfo> replicas = volumeMap.replicas(bpid);
for (ReplicaInfo replicaInfo : replicas) {

View File

@ -28,6 +28,7 @@
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
@ -53,9 +54,10 @@
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.FileIoProvider;
import org.apache.hadoop.hdfs.server.datanode.ReplicaBuilder;
import org.apache.hadoop.util.Timer;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.Timer;
import org.codehaus.jackson.annotate.JsonProperty;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.map.ObjectReader;
@ -63,8 +65,6 @@
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Time;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PROVIDED_ALIASMAP_LOAD_RETRIES;
@ -135,7 +135,7 @@ static class ProvidedBlockPoolSlice {
ProvidedBlockPoolSlice(String bpid, ProvidedVolumeImpl volume,
Configuration conf) {
this.providedVolume = volume;
bpVolumeMap = new ReplicaMap(new AutoCloseableLock());
bpVolumeMap = new ReplicaMap(new ReentrantReadWriteLock());
Class<? extends BlockAliasMap> fmt =
conf.getClass(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_CLASS,
TextFileRegionAliasMap.class, BlockAliasMap.class);

View File

@ -20,6 +20,7 @@
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReadWriteLock;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.hdfs.protocol.Block;
@ -32,22 +33,29 @@
*/
class ReplicaMap {
// Lock object to synchronize this instance.
private final AutoCloseableLock lock;
private final AutoCloseableLock readLock;
private final AutoCloseableLock writeLock;
// Map of block pool Id to another map of block Id to ReplicaInfo.
private final Map<String, LightWeightResizableGSet<Block, ReplicaInfo>> map =
new HashMap<>();
ReplicaMap(AutoCloseableLock lock) {
if (lock == null) {
ReplicaMap(AutoCloseableLock rLock, AutoCloseableLock wLock) {
if (rLock == null || wLock == null) {
throw new HadoopIllegalArgumentException(
"Lock to synchronize on cannot be null");
}
this.lock = lock;
this.readLock = rLock;
this.writeLock = wLock;
}
ReplicaMap(ReadWriteLock lock) {
this(new AutoCloseableLock(lock.readLock()),
new AutoCloseableLock(lock.writeLock()));
}
String[] getBlockPoolList() {
try (AutoCloseableLock l = lock.acquire()) {
try (AutoCloseableLock l = readLock.acquire()) {
return map.keySet().toArray(new String[map.keySet().size()]);
}
}
@ -92,7 +100,7 @@ ReplicaInfo get(String bpid, Block block) {
*/
ReplicaInfo get(String bpid, long blockId) {
checkBlockPool(bpid);
try (AutoCloseableLock l = lock.acquire()) {
try (AutoCloseableLock l = readLock.acquire()) {
LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
return m != null ? m.get(new Block(blockId)) : null;
}
@ -109,7 +117,7 @@ ReplicaInfo get(String bpid, long blockId) {
ReplicaInfo add(String bpid, ReplicaInfo replicaInfo) {
checkBlockPool(bpid);
checkBlock(replicaInfo);
try (AutoCloseableLock l = lock.acquire()) {
try (AutoCloseableLock l = writeLock.acquire()) {
LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
if (m == null) {
// Add an entry for block pool if it does not exist already
@ -127,7 +135,7 @@ ReplicaInfo add(String bpid, ReplicaInfo replicaInfo) {
ReplicaInfo addAndGet(String bpid, ReplicaInfo replicaInfo) {
checkBlockPool(bpid);
checkBlock(replicaInfo);
try (AutoCloseableLock l = lock.acquire()) {
try (AutoCloseableLock l = writeLock.acquire()) {
LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
if (m == null) {
// Add an entry for block pool if it does not exist already
@ -176,7 +184,7 @@ void mergeAll(ReplicaMap other) {
ReplicaInfo remove(String bpid, Block block) {
checkBlockPool(bpid);
checkBlock(block);
try (AutoCloseableLock l = lock.acquire()) {
try (AutoCloseableLock l = writeLock.acquire()) {
LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
if (m != null) {
ReplicaInfo replicaInfo = m.get(block);
@ -198,7 +206,7 @@ ReplicaInfo remove(String bpid, Block block) {
*/
ReplicaInfo remove(String bpid, long blockId) {
checkBlockPool(bpid);
try (AutoCloseableLock l = lock.acquire()) {
try (AutoCloseableLock l = writeLock.acquire()) {
LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
if (m != null) {
return m.remove(new Block(blockId));
@ -213,7 +221,7 @@ ReplicaInfo remove(String bpid, long blockId) {
* @return the number of replicas in the map
*/
int size(String bpid) {
try (AutoCloseableLock l = lock.acquire()) {
try (AutoCloseableLock l = readLock.acquire()) {
LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
return m != null ? m.size() : 0;
}
@ -237,7 +245,7 @@ Collection<ReplicaInfo> replicas(String bpid) {
void initBlockPool(String bpid) {
checkBlockPool(bpid);
try (AutoCloseableLock l = lock.acquire()) {
try (AutoCloseableLock l = writeLock.acquire()) {
LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
if (m == null) {
// Add an entry for block pool if it does not exist already
@ -249,7 +257,7 @@ void initBlockPool(String bpid) {
void cleanUpBlockPool(String bpid) {
checkBlockPool(bpid);
try (AutoCloseableLock l = lock.acquire()) {
try (AutoCloseableLock l = writeLock.acquire()) {
map.remove(bpid);
}
}
@ -259,6 +267,16 @@ void cleanUpBlockPool(String bpid) {
* @return lock object
*/
AutoCloseableLock getLock() {
return lock;
return writeLock;
}
/**
* Get the lock object used for synchronizing the ReplicasMap for read only
* operations.
* @return The read lock object
*/
AutoCloseableLock getReadLock() {
return readLock;
}
}

View File

@ -2999,6 +2999,40 @@
</description>
</property>
<property>
<name>dfs.datanode.lock.fair</name>
<value>true</value>
<description>If this is true, the Datanode FsDataset lock will be used in Fair
mode, which will help to prevent writer threads from being starved, but can
lower lock throughput. See java.util.concurrent.locks.ReentrantReadWriteLock
for more information on fair/non-fair locks.
</description>
</property>
<property>
<name>dfs.datanode.lock.read.write.enabled</name>
<value>true</value>
<description>If this is true, the FsDataset lock will be a read write lock. If
it is false, all locks will be a write lock.
Enabling this should give better datanode throughput, as many read only
functions can run concurrently under the read lock, when they would
previously have required the exclusive write lock. As the feature is
experimental, this switch can be used to disable the shared read lock, and
cause all lock acquisitions to use the exclusive write lock.
</description>
</property>
<property>
<name>dfs.datanode.lock-reporting-threshold-ms</name>
<value>300</value>
<description>When thread waits to obtain a lock, or a thread holds a lock for
more than the threshold, a log message will be written. Note that
dfs.lock.suppress.warning.interval ensures a single log message is
emitted per interval for waiting threads and a single message for holding
threads to avoid excessive logging.
</description>
</property>
<property>
<name>dfs.namenode.startup.delay.block.deletion.sec</name>
<value>0</value>

View File

@ -1572,6 +1572,12 @@ public AutoCloseableLock acquireDatasetLock() {
return datasetLock.acquire();
}
@Override
public AutoCloseableLock acquireDatasetReadLock() {
// No RW lock implementation in simulated dataset currently.
return datasetLock.acquire();
}
@Override
public Set<? extends Replica> deepCopyReplica(String bpid)
throws IOException {

View File

@ -455,6 +455,11 @@ public AutoCloseableLock acquireDatasetLock() {
return null;
}
@Override
public AutoCloseableLock acquireDatasetReadLock() {
return null;
}
@Override
public Set<? extends Replica> deepCopyReplica(String bpid)
throws IOException {

View File

@ -434,7 +434,7 @@ public void changeStoredGenerationStamp(
@Override
public Iterator<Replica> getStoredReplicas(String bpid) throws IOException {
// Reload replicas from the disk.
ReplicaMap replicaMap = new ReplicaMap(dataset.datasetLock);
ReplicaMap replicaMap = new ReplicaMap(dataset.datasetRWLock);
try (FsVolumeReferences refs = dataset.getFsVolumeReferences()) {
for (FsVolumeSpi vol : refs) {
FsVolumeImpl volume = (FsVolumeImpl) vol;

View File

@ -17,10 +17,10 @@
*/
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import com.google.common.collect.Lists;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Paths;
import org.apache.commons.io.FileUtils;
@ -65,6 +65,7 @@
import org.apache.hadoop.io.MultipleIOException;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.LambdaTestUtils;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.util.FakeTimer;
import org.apache.hadoop.util.StringUtils;
import org.junit.Assert;
@ -85,6 +86,7 @@
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DN_CACHED_DFSUSED_CHECK_INTERVAL_MS;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY;
@ -196,6 +198,118 @@ public void setUp() throws IOException {
assertEquals(0, dataset.getNumFailedVolumes());
}
@Test(timeout=10000)
public void testReadLockEnabledByDefault()
throws Exception {
final FsDatasetSpi ds = dataset;
AtomicBoolean accessed = new AtomicBoolean(false);
CountDownLatch latch = new CountDownLatch(1);
CountDownLatch waiterLatch = new CountDownLatch(1);
Thread holder = new Thread() {
public void run() {
try (AutoCloseableLock l = ds.acquireDatasetReadLock()) {
latch.countDown();
// wait for the waiter thread to access the lock.
waiterLatch.await();
} catch (Exception e) {
}
}
};
Thread waiter = new Thread() {
public void run() {
try {
latch.await();
} catch (InterruptedException e) {
waiterLatch.countDown();
return;
}
try (AutoCloseableLock l = ds.acquireDatasetReadLock()) {
accessed.getAndSet(true);
// signal the holder thread.
waiterLatch.countDown();
} catch (Exception e) {
}
}
};
waiter.start();
holder.start();
holder.join();
waiter.join();
// The holder thread is still holding the lock, but the waiter can still
// run as the lock is a shared read lock.
// Otherwise test will timeout with deadlock.
assertEquals(true, accessed.get());
holder.interrupt();
}
@Test(timeout=20000)
public void testReadLockCanBeDisabledByConfig()
throws Exception {
HdfsConfiguration conf = new HdfsConfiguration();
conf.setBoolean(
DFSConfigKeys.DFS_DATANODE_LOCK_READ_WRITE_ENABLED_KEY, false);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(1).build();
try {
AtomicBoolean accessed = new AtomicBoolean(false);
cluster.waitActive();
DataNode dn = cluster.getDataNodes().get(0);
final FsDatasetSpi<?> ds = DataNodeTestUtils.getFSDataset(dn);
CountDownLatch latch = new CountDownLatch(1);
CountDownLatch waiterLatch = new CountDownLatch(1);
Thread holder = new Thread() {
public void run() {
try (AutoCloseableLock l = ds.acquireDatasetReadLock()) {
latch.countDown();
// wait for the waiter thread to access the lock.
waiterLatch.await();
} catch (Exception e) {
}
}
};
Thread waiter = new Thread() {
public void run() {
try {
// Wait for holder to get ds read lock.
latch.await();
} catch (InterruptedException e) {
waiterLatch.countDown();
return;
}
try (AutoCloseableLock l = ds.acquireDatasetReadLock()) {
accessed.getAndSet(true);
// signal the holder thread.
waiterLatch.countDown();
} catch (Exception e) {
}
}
};
waiter.start();
holder.start();
// Wait for sometime to make sure we are in deadlock,
try {
GenericTestUtils.waitFor(() ->
accessed.get(),
100, 10000);
fail("Waiter thread should not execute.");
} catch (TimeoutException e) {
}
// Release waiterLatch to exit deadlock.
waiterLatch.countDown();
holder.join();
waiter.join();
// After releasing waiterLatch water
// thread will be able to execute.
assertTrue(accessed.get());
} finally {
cluster.shutdown();
}
}
@Test
public void testAddVolumes() throws IOException {
final int numNewVolumes = 3;
@ -242,8 +356,8 @@ public void testAddVolumes() throws IOException {
@Test
public void testAddVolumeWithSameStorageUuid() throws IOException {
HdfsConfiguration conf = new HdfsConfiguration();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
HdfsConfiguration config = new HdfsConfiguration();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(config)
.numDataNodes(1).build();
try {
cluster.waitActive();

View File

@ -37,7 +37,6 @@
import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.util.StringUtils;
import org.junit.Before;
import org.junit.Test;
@ -53,6 +52,7 @@
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DU_RESERVED_PERCENTAGE_KEY;
import static org.junit.Assert.assertEquals;
@ -368,7 +368,7 @@ public void run() {
fs.close();
FsDatasetImpl fsDataset = (FsDatasetImpl) cluster.getDataNodes().get(0)
.getFSDataset();
ReplicaMap volumeMap = new ReplicaMap(new AutoCloseableLock());
ReplicaMap volumeMap = new ReplicaMap(new ReentrantReadWriteLock());
RamDiskReplicaTracker ramDiskReplicaMap = RamDiskReplicaTracker
.getInstance(conf, fsDataset);
FsVolumeImpl vol = (FsVolumeImpl) fsDataset.getFsVolumeReferences().get(0);

View File

@ -25,6 +25,7 @@
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.util.List;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@ -59,7 +60,6 @@
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.AutoCloseableLock;
import org.junit.Assert;
import org.junit.Test;
@ -236,7 +236,7 @@ public void testInitReplicaRecovery() throws IOException {
final long firstblockid = 10000L;
final long gs = 7777L;
final long length = 22L;
final ReplicaMap map = new ReplicaMap(new AutoCloseableLock());
final ReplicaMap map = new ReplicaMap(new ReentrantReadWriteLock());
String bpid = "BP-TEST";
final Block[] blocks = new Block[5];
for(int i = 0; i < blocks.length; i++) {

View File

@ -43,6 +43,7 @@
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
@ -77,7 +78,6 @@
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.BlockIterator;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi.FsVolumeReferences;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.util.StringUtils;
import org.junit.Before;
import org.junit.Test;
@ -399,7 +399,7 @@ public void testProvidedVolumeImpl() throws IOException {
public void testBlockLoad() throws IOException {
for (int i = 0; i < providedVolumes.size(); i++) {
FsVolumeImpl vol = providedVolumes.get(i);
ReplicaMap volumeMap = new ReplicaMap(new AutoCloseableLock());
ReplicaMap volumeMap = new ReplicaMap(new ReentrantReadWriteLock());
vol.getVolumeMap(volumeMap, null);
assertEquals(vol.getBlockPoolList().length, BLOCK_POOL_IDS.length);
@ -475,7 +475,7 @@ private int getBlocksInProvidedVolumes(String basePath, int numBlocks,
vol.setFileRegionProvider(BLOCK_POOL_IDS[CHOSEN_BP_ID],
new TestFileRegionBlockAliasMap(fileRegionIterator, minBlockId,
numBlocks));
ReplicaMap volumeMap = new ReplicaMap(new AutoCloseableLock());
ReplicaMap volumeMap = new ReplicaMap(new ReentrantReadWriteLock());
vol.getVolumeMap(BLOCK_POOL_IDS[CHOSEN_BP_ID], volumeMap, null);
totalBlocks += volumeMap.size(BLOCK_POOL_IDS[CHOSEN_BP_ID]);
}
@ -585,7 +585,7 @@ public void testProvidedReplicaSuffixExtraction() {
public void testProvidedReplicaPrefix() throws Exception {
for (int i = 0; i < providedVolumes.size(); i++) {
FsVolumeImpl vol = providedVolumes.get(i);
ReplicaMap volumeMap = new ReplicaMap(new AutoCloseableLock());
ReplicaMap volumeMap = new ReplicaMap(new ReentrantReadWriteLock());
vol.getVolumeMap(volumeMap, null);
Path expectedPrefix = new Path(

View File

@ -23,15 +23,16 @@
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
import org.apache.hadoop.util.AutoCloseableLock;
import org.junit.Before;
import org.junit.Test;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* Unit test for ReplicasMap class
*/
public class TestReplicaMap {
private final ReplicaMap map = new ReplicaMap(new AutoCloseableLock());
private final ReplicaMap map = new ReplicaMap(new ReentrantReadWriteLock());
private final String bpid = "BP-TEST";
private final Block block = new Block(1234, 1234, 1234);
@ -111,7 +112,7 @@ public void testRemove() {
@Test
public void testMergeAll() {
ReplicaMap temReplicaMap = new ReplicaMap(new AutoCloseableLock());
ReplicaMap temReplicaMap = new ReplicaMap(new ReentrantReadWriteLock());
Block tmpBlock = new Block(5678, 5678, 5678);
temReplicaMap.add(bpid, new FinalizedReplica(tmpBlock, null, null));
@ -122,7 +123,7 @@ public void testMergeAll() {
@Test
public void testAddAll() {
ReplicaMap temReplicaMap = new ReplicaMap(new AutoCloseableLock());
ReplicaMap temReplicaMap = new ReplicaMap(new ReentrantReadWriteLock());
Block tmpBlock = new Block(5678, 5678, 5678);
temReplicaMap.add(bpid, new FinalizedReplica(tmpBlock, null, null));

View File

@ -27,6 +27,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType;
@ -47,7 +48,6 @@
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
import org.junit.Assert;
import org.junit.Test;
@ -550,7 +550,7 @@ public void testReplicaMapAfterDatanodeRestart() throws Exception {
bpList.size() == 2);
createReplicas(bpList, volumes, cluster.getFsDatasetTestUtils(dn));
ReplicaMap oldReplicaMap = new ReplicaMap(new AutoCloseableLock());
ReplicaMap oldReplicaMap = new ReplicaMap(new ReentrantReadWriteLock());
oldReplicaMap.addAll(dataSet.volumeMap);
cluster.restartDataNode(0);