HDFS-15382. Split one FsDatasetImpl lock to block pool grain locks. (#3941). Contributed by limingxiang.

Signed-off-by: He Xiaoqiao <hexiaoqiao@apache.org>
Signed-off-by: litao <tomleescut@gmail.com>
This commit is contained in:
He Xiaoqiao 2022-03-12 18:40:09 +08:00
parent 672e380c4f
commit a32cfc2169
No known key found for this signature in database
GPG Key ID: A80CC124E9A0FA63
22 changed files with 384 additions and 447 deletions

View File

@ -55,6 +55,7 @@ import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
import org.apache.hadoop.hdfs.protocolPB.DatanodeLifelineProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
import org.apache.hadoop.hdfs.server.common.DataNodeLockManager.LockLevel;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
@ -309,6 +310,10 @@ class BPServiceActor implements Runnable {
// info.
NamespaceInfo nsInfo = retrieveNamespaceInfo();
// init block pool lock when init.
dn.getDataSetLockManager().addLock(LockLevel.BLOCK_POOl,
nsInfo.getBlockPoolID());
// Verify that this matches the other NN in this HA pair.
// This also initializes our block pool in the DN if we are
// the first NN connection for this BP.

View File

@ -41,6 +41,7 @@ import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.common.DataNodeLockManager.LockLevel;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
@ -256,7 +257,8 @@ class BlockSender implements java.io.Closeable {
// the append write.
ChunkChecksum chunkChecksum = null;
final long replicaVisibleLength;
try(AutoCloseableLock lock = datanode.data.acquireDatasetReadLock()) {
try (AutoCloseableLock lock = datanode.getDataSetLockManager().readLock(
LockLevel.BLOCK_POOl, block.getBlockPoolId())) {
replica = getReplica(block, datanode);
replicaVisibleLength = replica.getVisibleLength();
}

View File

@ -146,6 +146,7 @@ import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.HDFSPolicyProvider;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.server.common.DataNodeLockManager.LockLevel;
import org.apache.hadoop.hdfs.server.datanode.checker.DatasetVolumeChecker;
import org.apache.hadoop.hdfs.server.datanode.checker.StorageLocationChecker;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
@ -433,6 +434,7 @@ public class DataNode extends ReconfigurableBase
.availableProcessors();
private static final double CONGESTION_RATIO = 1.5;
private DiskBalancer diskBalancer;
private DataSetLockManager dataSetLockManager;
private final ExecutorService xferService;
@ -474,6 +476,7 @@ public class DataNode extends ReconfigurableBase
this.pipelineSupportSlownode = false;
this.socketFactory = NetUtils.getDefaultSocketFactory(conf);
this.dnConf = new DNConf(this);
this.dataSetLockManager = new DataSetLockManager(conf);
initOOBTimeout();
storageLocationChecker = null;
volumeChecker = new DatasetVolumeChecker(conf, new Timer());
@ -492,6 +495,7 @@ public class DataNode extends ReconfigurableBase
super(conf);
this.tracer = createTracer(conf);
this.fileIoProvider = new FileIoProvider(conf, this);
this.dataSetLockManager = new DataSetLockManager(conf);
this.blockScanner = new BlockScanner(this);
this.lastDiskErrorCheck = 0;
this.maxNumberOfBlocksToLog = conf.getLong(DFS_MAX_NUM_BLOCKS_TO_LOG_KEY,
@ -2461,6 +2465,7 @@ public class DataNode extends ReconfigurableBase
notifyAll();
}
tracer.close();
dataSetLockManager.lockLeakCheck();
}
/**
@ -3367,7 +3372,8 @@ public class DataNode extends ReconfigurableBase
final BlockConstructionStage stage;
//get replica information
try(AutoCloseableLock lock = data.acquireDatasetReadLock()) {
try (AutoCloseableLock lock = dataSetLockManager.readLock(
LockLevel.BLOCK_POOl, b.getBlockPoolId())) {
Block storedBlock = data.getStoredBlock(b.getBlockPoolId(),
b.getBlockId());
if (null == storedBlock) {
@ -4084,6 +4090,10 @@ public class DataNode extends ReconfigurableBase
|| stage == PIPELINE_SETUP_APPEND_RECOVERY);
}
public DataSetLockManager getDataSetLockManager() {
return dataSetLockManager;
}
boolean isSlownodeByNameserviceId(String nsId) {
return blockPoolManager.isSlownodeByNameserviceId(nsId);
}

View File

@ -25,7 +25,6 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi
.FsVolumeReferences;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus
@ -502,16 +501,13 @@ public class DiskBalancer {
private Map<String, String> getStorageIDToVolumeBasePathMap()
throws DiskBalancerException {
Map<String, String> storageIDToVolBasePathMap = new HashMap<>();
FsDatasetSpi.FsVolumeReferences references;
try {
try(AutoCloseableLock lock = this.dataset.acquireDatasetReadLock()) {
references = this.dataset.getFsVolumeReferences();
for (int ndx = 0; ndx < references.size(); ndx++) {
FsVolumeSpi vol = references.get(ndx);
storageIDToVolBasePathMap.put(vol.getStorageID(),
vol.getBaseURI().getPath());
}
references.close();
// Get volumes snapshot so no need to acquire dataset lock.
try (FsDatasetSpi.FsVolumeReferences references = dataset.
getFsVolumeReferences()) {
for (int ndx = 0; ndx < references.size(); ndx++) {
FsVolumeSpi vol = references.get(ndx);
storageIDToVolBasePathMap.put(vol.getStorageID(),
vol.getBaseURI().getPath());
}
} catch (IOException ex) {
LOG.error("Disk Balancer - Internal Error.", ex);

View File

@ -35,8 +35,9 @@ import java.util.Set;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.server.common.AutoCloseDataSetLock;
import org.apache.hadoop.hdfs.server.common.DataNodeLockManager;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MountVolumeMap;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
@ -241,7 +242,7 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
* Gets a list of references to the finalized blocks for the given block pool.
* <p>
* Callers of this function should call
* {@link FsDatasetSpi#acquireDatasetLock} to avoid blocks' status being
* {@link FsDatasetSpi#acquireDatasetLockManager} to avoid blocks' status being
* changed during list iteration.
* </p>
* @return a list of references to the finalized blocks for the given block
@ -657,21 +658,12 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
ReplicaInfo moveBlockAcrossVolumes(final ExtendedBlock block,
FsVolumeSpi destination) throws IOException;
/**
* Acquire the lock of the data set. This prevents other threads from
* modifying the volume map structure inside the datanode, but other changes
* are still possible. For example modifying the genStamp of a block instance.
*/
AutoCloseableLock acquireDatasetLock();
/***
* Acquire the read lock of the data set. This prevents other threads from
* modifying the volume map structure inside the datanode, but other changes
* are still possible. For example modifying the genStamp of a block instance.
* Acquire lock Manager for the data set. This prevents other threads from
* modifying the volume map structure inside the datanode.
* @return The AutoClosable read lock instance.
*/
AutoCloseableLock acquireDatasetReadLock();
DataNodeLockManager<? extends AutoCloseDataSetLock> acquireDatasetLockManager();
/**
* Deep copy the replica info belonging to given block pool.

View File

@ -43,7 +43,6 @@ import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hdfs.server.datanode.FSCachingGetSpaceUsed;
@ -914,7 +913,7 @@ class BlockPoolSlice {
private boolean readReplicasFromCache(ReplicaMap volumeMap,
final RamDiskReplicaTracker lazyWriteReplicaMap) {
ReplicaMap tmpReplicaMap = new ReplicaMap(new ReentrantReadWriteLock());
ReplicaMap tmpReplicaMap = new ReplicaMap();
File replicaFile = new File(replicaCacheDir, REPLICA_CACHE_FILE);
// Check whether the file exists or not.
if (!replicaFile.exists()) {

View File

@ -32,7 +32,6 @@ import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@ -41,9 +40,6 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.management.NotCompliantMBeanException;
import javax.management.ObjectName;
@ -64,6 +60,10 @@ import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.ExtendedBlockId;
import org.apache.hadoop.hdfs.server.common.AutoCloseDataSetLock;
import org.apache.hadoop.hdfs.server.common.DataNodeLockManager;
import org.apache.hadoop.hdfs.server.common.DataNodeLockManager.LockLevel;
import org.apache.hadoop.hdfs.server.datanode.DataSetLockManager;
import org.apache.hadoop.hdfs.server.datanode.FileIoProvider;
import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
import org.apache.hadoop.hdfs.server.datanode.LocalReplica;
@ -119,7 +119,6 @@ import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
import org.apache.hadoop.util.InstrumentedReadWriteLock;
import org.apache.hadoop.util.Lists;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Sets;
@ -188,7 +187,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
@Override
public FsVolumeImpl getVolume(final ExtendedBlock b) {
try (AutoCloseableLock lock = datasetReadLock.acquire()) {
try (AutoCloseableLock lock = lockManager.readLock(LockLevel.BLOCK_POOl,
b.getBlockPoolId())) {
final ReplicaInfo r =
volumeMap.get(b.getBlockPoolId(), b.getLocalBlock());
return r != null ? (FsVolumeImpl) r.getVolume() : null;
@ -198,7 +198,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
@Override // FsDatasetSpi
public Block getStoredBlock(String bpid, long blkid)
throws IOException {
try (AutoCloseableLock lock = datasetReadLock.acquire()) {
try (AutoCloseableLock lock = lockManager.readLock(LockLevel.BLOCK_POOl,
bpid)) {
ReplicaInfo r = volumeMap.get(bpid, blkid);
if (r == null) {
return null;
@ -210,12 +211,16 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
@Override
public Set<? extends Replica> deepCopyReplica(String bpid)
throws IOException {
Set<? extends Replica> replicas = null;
try (AutoCloseableLock lock = datasetReadLock.acquire()) {
replicas = new HashSet<>(volumeMap.replicas(bpid) == null ? Collections.
EMPTY_SET : volumeMap.replicas(bpid));
try (AutoCloseDataSetLock l = lockManager.readLock(LockLevel.BLOCK_POOl, bpid)) {
Set<ReplicaInfo> replicas = new HashSet<>();
volumeMap.replicas(bpid, (iterator) -> {
while (iterator.hasNext()) {
ReplicaInfo b = iterator.next();
replicas.add(b);
}
});
return replicas;
}
return Collections.unmodifiableSet(replicas);
}
/**
@ -275,13 +280,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
private boolean blockPinningEnabled;
private final int maxDataLength;
@VisibleForTesting
final AutoCloseableLock datasetWriteLock;
@VisibleForTesting
final AutoCloseableLock datasetReadLock;
@VisibleForTesting
final InstrumentedReadWriteLock datasetRWLock;
private final Condition datasetWriteLockCondition;
private final DataSetLockManager lockManager;
private static String blockPoolId = "";
// Make limited notify times from DirectoryScanner to NameNode.
@ -300,33 +299,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
this.dataStorage = storage;
this.conf = conf;
this.smallBufferSize = DFSUtilClient.getSmallBufferSize(conf);
this.datasetRWLock = new InstrumentedReadWriteLock(
conf.getBoolean(DFSConfigKeys.DFS_DATANODE_LOCK_FAIR_KEY,
DFSConfigKeys.DFS_DATANODE_LOCK_FAIR_DEFAULT),
"FsDatasetRWLock", LOG, conf.getTimeDuration(
DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_KEY,
DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_DEFAULT,
TimeUnit.MILLISECONDS),
conf.getTimeDuration(
DFSConfigKeys.DFS_DATANODE_LOCK_REPORTING_THRESHOLD_MS_KEY,
DFSConfigKeys.DFS_DATANODE_LOCK_REPORTING_THRESHOLD_MS_DEFAULT,
TimeUnit.MILLISECONDS));
this.datasetWriteLock = new AutoCloseableLock(datasetRWLock.writeLock());
boolean enableRL = conf.getBoolean(
DFSConfigKeys.DFS_DATANODE_LOCK_READ_WRITE_ENABLED_KEY,
DFSConfigKeys.DFS_DATANODE_LOCK_READ_WRITE_ENABLED_DEFAULT);
// The read lock can be disabled by the above config key. If it is disabled
// then we simply make the both the read and write lock variables hold
// the write lock. All accesses to the lock are via these variables, so that
// effectively disables the read lock.
if (enableRL) {
LOG.info("The datanode lock is a read write lock");
this.datasetReadLock = new AutoCloseableLock(datasetRWLock.readLock());
} else {
LOG.info("The datanode lock is an exclusive write lock");
this.datasetReadLock = this.datasetWriteLock;
}
this.datasetWriteLockCondition = datasetWriteLock.newCondition();
this.lockManager = datanode.getDataSetLockManager();
// The number of volumes required for operation is the total number
// of volumes minus the number of failed volumes we can tolerate.
@ -365,7 +338,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
}
storageMap = new ConcurrentHashMap<String, DatanodeStorage>();
volumeMap = new ReplicaMap(datasetReadLock, datasetWriteLock);
volumeMap = new ReplicaMap(lockManager);
ramDiskReplicaTracker = RamDiskReplicaTracker.getInstance(conf, this);
@SuppressWarnings("unchecked")
@ -421,16 +394,6 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
lastDirScannerNotifyTime = System.currentTimeMillis();
}
@Override
public AutoCloseableLock acquireDatasetLock() {
return datasetWriteLock.acquire();
}
@Override
public AutoCloseableLock acquireDatasetReadLock() {
return datasetReadLock.acquire();
}
/**
* Gets initial volume failure information for all volumes that failed
* immediately at startup. The method works by determining the set difference
@ -465,42 +428,40 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
* Activate a volume to serve requests.
* @throws IOException if the storage UUID already exists.
*/
private void activateVolume(
private synchronized void activateVolume(
ReplicaMap replicaMap,
Storage.StorageDirectory sd, StorageType storageType,
FsVolumeReference ref) throws IOException {
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
DatanodeStorage dnStorage = storageMap.get(sd.getStorageUuid());
if (dnStorage != null) {
final String errorMsg = String.format(
"Found duplicated storage UUID: %s in %s.",
sd.getStorageUuid(), sd.getVersionFile());
LOG.error(errorMsg);
throw new IOException(errorMsg);
}
// Check if there is same storage type on the mount.
// Only useful when same disk tiering is turned on.
FsVolumeImpl volumeImpl = (FsVolumeImpl) ref.getVolume();
FsVolumeReference checkRef = volumes
.getMountVolumeMap()
.getVolumeRefByMountAndStorageType(
volumeImpl.getMount(), volumeImpl.getStorageType());
if (checkRef != null) {
final String errorMsg = String.format(
"Storage type %s already exists on same mount: %s.",
volumeImpl.getStorageType(), volumeImpl.getMount());
checkRef.close();
LOG.error(errorMsg);
throw new IOException(errorMsg);
}
volumeMap.mergeAll(replicaMap);
storageMap.put(sd.getStorageUuid(),
new DatanodeStorage(sd.getStorageUuid(),
DatanodeStorage.State.NORMAL,
storageType));
asyncDiskService.addVolume(volumeImpl);
volumes.addVolume(ref);
DatanodeStorage dnStorage = storageMap.get(sd.getStorageUuid());
if (dnStorage != null) {
final String errorMsg = String.format(
"Found duplicated storage UUID: %s in %s.",
sd.getStorageUuid(), sd.getVersionFile());
LOG.error(errorMsg);
throw new IOException(errorMsg);
}
// Check if there is same storage type on the mount.
// Only useful when same disk tiering is turned on.
FsVolumeImpl volumeImpl = (FsVolumeImpl) ref.getVolume();
FsVolumeReference checkRef = volumes
.getMountVolumeMap()
.getVolumeRefByMountAndStorageType(
volumeImpl.getMount(), volumeImpl.getStorageType());
if (checkRef != null) {
final String errorMsg = String.format(
"Storage type %s already exists on same mount: %s.",
volumeImpl.getStorageType(), volumeImpl.getMount());
checkRef.close();
LOG.error(errorMsg);
throw new IOException(errorMsg);
}
volumeMap.mergeAll(replicaMap);
storageMap.put(sd.getStorageUuid(),
new DatanodeStorage(sd.getStorageUuid(),
DatanodeStorage.State.NORMAL,
storageType));
asyncDiskService.addVolume(volumeImpl);
volumes.addVolume(ref);
}
private void addVolume(Storage.StorageDirectory sd) throws IOException {
@ -517,8 +478,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
.setConf(this.conf)
.build();
FsVolumeReference ref = fsVolume.obtainReference();
ReplicaMap tempVolumeMap =
new ReplicaMap(datasetReadLock, datasetWriteLock);
// no need to acquire lock.
ReplicaMap tempVolumeMap = new ReplicaMap();
fsVolume.getVolumeMap(tempVolumeMap, ramDiskReplicaTracker);
activateVolume(tempVolumeMap, sd, storageLocation.getStorageType(), ref);
@ -557,13 +518,13 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
StorageType storageType = location.getStorageType();
final FsVolumeImpl fsVolume =
createFsVolume(sd.getStorageUuid(), sd, location);
final ReplicaMap tempVolumeMap =
new ReplicaMap(new ReentrantReadWriteLock());
// no need to add lock
final ReplicaMap tempVolumeMap = new ReplicaMap();
ArrayList<IOException> exceptions = Lists.newArrayList();
for (final NamespaceInfo nsInfo : nsInfos) {
String bpid = nsInfo.getBlockPoolID();
try {
try (AutoCloseDataSetLock l = lockManager.writeLock(LockLevel.BLOCK_POOl, bpid)) {
fsVolume.addBlockPool(bpid, this.conf, this.timer);
fsVolume.getVolumeMap(bpid, tempVolumeMap, ramDiskReplicaTracker);
} catch (IOException e) {
@ -603,7 +564,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
new ArrayList<>(storageLocsToRemove);
Map<String, List<ReplicaInfo>> blkToInvalidate = new HashMap<>();
List<String> storageToRemove = new ArrayList<>();
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
// This object lock is protect data structure related volumes like add and
// remove.This will obtain volumeMap lock again if access replicaInfo.
synchronized (this) {
for (int idx = 0; idx < dataStorage.getNumStorageDirs(); idx++) {
Storage.StorageDirectory sd = dataStorage.getStorageDir(idx);
final StorageLocation sdLocation = sd.getStorageLocation();
@ -615,7 +578,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
// Disable the volume from the service.
asyncDiskService.removeVolume(sd.getStorageUuid());
volumes.removeVolume(sdLocation, clearFailure);
volumes.waitVolumeRemoved(5000, datasetWriteLockCondition);
volumes.waitVolumeRemoved(5000, this);
// Removed all replica information for the blocks on the volume.
// Unlike updating the volumeMap in addVolume(), this operation does
@ -623,18 +586,19 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
for (String bpid : volumeMap.getBlockPoolList()) {
List<ReplicaInfo> blocks = blkToInvalidate
.computeIfAbsent(bpid, (k) -> new ArrayList<>());
for (Iterator<ReplicaInfo> it =
volumeMap.replicas(bpid).iterator(); it.hasNext();) {
ReplicaInfo block = it.next();
final StorageLocation blockStorageLocation =
block.getVolume().getStorageLocation();
LOG.trace("checking for block " + block.getBlockId() +
" with storageLocation " + blockStorageLocation);
if (blockStorageLocation.equals(sdLocation)) {
blocks.add(block);
it.remove();
volumeMap.replicas(bpid, (iterator) -> {
while (iterator.hasNext()) {
ReplicaInfo block = iterator.next();
final StorageLocation blockStorageLocation =
block.getVolume().getStorageLocation();
LOG.trace("checking for block " + block.getBlockId() +
" with storageLocation " + blockStorageLocation);
if (blockStorageLocation.equals(sdLocation)) {
blocks.add(block);
iterator.remove();
}
}
}
});
}
storageToRemove.add(sd.getStorageUuid());
storageLocationsToRemove.remove(sdLocation);
@ -662,8 +626,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
}
}
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
for(String storageUuid : storageToRemove) {
synchronized (this) {
for (String storageUuid : storageToRemove) {
storageMap.remove(storageUuid);
}
}
@ -853,7 +817,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
long seekOffset) throws IOException {
ReplicaInfo info;
try (AutoCloseableLock lock = datasetReadLock.acquire()) {
try (AutoCloseableLock lock = lockManager.readLock(LockLevel.BLOCK_POOl,
b.getBlockPoolId())) {
info = volumeMap.get(b.getBlockPoolId(), b.getLocalBlock());
}
@ -941,7 +906,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
@Override // FsDatasetSpi
public ReplicaInputStreams getTmpInputStreams(ExtendedBlock b,
long blkOffset, long metaOffset) throws IOException {
try (AutoCloseableLock lock = datasetReadLock.acquire()) {
try (AutoCloseDataSetLock l = lockManager.readLock(LockLevel.BLOCK_POOl,
b.getBlockPoolId())) {
ReplicaInfo info = getReplicaInfo(b);
FsVolumeReference ref = info.getVolume().obtainReference();
try {
@ -1117,7 +1083,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
targetStorageType, targetStorageId);
boolean useVolumeOnSameMount = false;
try (AutoCloseableLock lock = datasetReadLock.acquire()) {
try (AutoCloseableLock lock = lockManager.readLock(LockLevel.BLOCK_POOl,
block.getBlockPoolId())) {
if (shouldConsiderSameMountVolume) {
volumeRef = volumes.getVolumeByMount(targetStorageType,
((FsVolumeImpl) replicaInfo.getVolume()).getMount(),
@ -1311,7 +1278,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
FsVolumeReference volumeRef = null;
try (AutoCloseableLock lock = datasetReadLock.acquire()) {
try (AutoCloseableLock lock = lockManager.readLock(LockLevel.BLOCK_POOl,
block.getBlockPoolId())) {
volumeRef = destination.obtainReference();
}
@ -1325,6 +1293,11 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
return replicaInfo;
}
@Override
public DataNodeLockManager<AutoCloseDataSetLock> acquireDatasetLockManager() {
return lockManager;
}
/**
* Compute and store the checksum for a block file that does not already have
* its checksum computed.
@ -1399,7 +1372,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
@Override // FsDatasetSpi
public ReplicaHandler append(ExtendedBlock b,
long newGS, long expectedBlockLen) throws IOException {
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl,
b.getBlockPoolId())) {
// If the block was successfully finalized because all packets
// were successfully processed at the Datanode but the ack for
// some of the packets were not received by the client. The client
@ -1451,7 +1425,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
private ReplicaInPipeline append(String bpid,
ReplicaInfo replicaInfo, long newGS, long estimateBlockLen)
throws IOException {
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl, bpid)) {
// If the block is cached, start uncaching it.
if (replicaInfo.getState() != ReplicaState.FINALIZED) {
throw new IOException("Only a Finalized replica can be appended to; "
@ -1547,7 +1521,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
while (true) {
try {
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl,
b.getBlockPoolId())) {
ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen);
FsVolumeReference ref = replicaInfo.getVolume().obtainReference();
ReplicaInPipeline replica;
@ -1579,7 +1554,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
LOG.info("Recover failed close " + b);
while (true) {
try {
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl,
b.getBlockPoolId())) {
// check replica's state
ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen);
// bump the replica's GS
@ -1602,7 +1578,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
StorageType storageType, String storageId, ExtendedBlock b,
boolean allowLazyPersist) throws IOException {
long startTimeMs = Time.monotonicNow();
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl,
b.getBlockPoolId())) {
ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
b.getBlockId());
if (replicaInfo != null) {
@ -1680,7 +1657,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
try {
while (true) {
try {
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl,
b.getBlockPoolId())) {
ReplicaInfo replicaInfo =
getReplicaInfo(b.getBlockPoolId(), b.getBlockId());
// check the replica's state
@ -1711,7 +1689,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
private ReplicaHandler recoverRbwImpl(ReplicaInPipeline rbw,
ExtendedBlock b, long newGS, long minBytesRcvd, long maxBytesRcvd)
throws IOException {
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl,
b.getBlockPoolId())) {
// check generation stamp
long replicaGenerationStamp = rbw.getGenerationStamp();
if (replicaGenerationStamp < b.getGenerationStamp() ||
@ -1772,7 +1751,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
public ReplicaInPipeline convertTemporaryToRbw(
final ExtendedBlock b) throws IOException {
long startTimeMs = Time.monotonicNow();
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl,
b.getBlockPoolId())) {
final long blockId = b.getBlockId();
final long expectedGs = b.getGenerationStamp();
final long visible = b.getNumBytes();
@ -1851,7 +1831,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
ReplicaInfo lastFoundReplicaInfo = null;
boolean isInPipeline = false;
do {
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl,
b.getBlockPoolId())) {
ReplicaInfo currentReplicaInfo =
volumeMap.get(b.getBlockPoolId(), b.getBlockId());
if (currentReplicaInfo == lastFoundReplicaInfo) {
@ -1906,7 +1887,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
false);
}
long startHoldLockTimeMs = Time.monotonicNow();
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl,
b.getBlockPoolId())) {
FsVolumeReference ref = volumes.getNextVolume(storageType, storageId, b
.getNumBytes());
FsVolumeImpl v = (FsVolumeImpl) ref.getVolume();
@ -1967,7 +1949,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
ReplicaInfo replicaInfo = null;
ReplicaInfo finalizedReplicaInfo = null;
long startTimeMs = Time.monotonicNow();
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl,
b.getBlockPoolId())) {
if (Thread.interrupted()) {
// Don't allow data modifications from interrupted threads
throw new IOException("Cannot finalize block from Interrupted Thread");
@ -2003,7 +1986,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
private ReplicaInfo finalizeReplica(String bpid, ReplicaInfo replicaInfo)
throws IOException {
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl, bpid)) {
// Compare generation stamp of old and new replica before finalizing
if (volumeMap.get(bpid, replicaInfo.getBlockId()).getGenerationStamp()
> replicaInfo.getGenerationStamp()) {
@ -2049,7 +2032,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
@Override // FsDatasetSpi
public void unfinalizeBlock(ExtendedBlock b) throws IOException {
long startTimeMs = Time.monotonicNow();
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl,
b.getBlockPoolId())) {
ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
b.getLocalBlock());
if (replicaInfo != null &&
@ -2107,47 +2091,50 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
new HashMap<String, BlockListAsLongs.Builder>();
List<FsVolumeImpl> curVolumes = null;
try (AutoCloseableLock lock = datasetReadLock.acquire()) {
try (AutoCloseableLock lock = lockManager.readLock(LockLevel.BLOCK_POOl, bpid)) {
curVolumes = volumes.getVolumes();
for (FsVolumeSpi v : curVolumes) {
builders.put(v.getStorageID(), BlockListAsLongs.builder(maxDataLength));
}
Set<String> missingVolumesReported = new HashSet<>();
for (ReplicaInfo b : volumeMap.replicas(bpid)) {
// skip PROVIDED replicas.
if (b.getVolume().getStorageType() == StorageType.PROVIDED) {
continue;
}
String volStorageID = b.getVolume().getStorageID();
switch(b.getState()) {
case FINALIZED:
case RBW:
case RWR:
break;
case RUR:
// use the original replica.
b = b.getOriginalReplica();
break;
case TEMPORARY:
continue;
default:
assert false : "Illegal ReplicaInfo state.";
continue;
}
BlockListAsLongs.Builder storageBuilder = builders.get(volStorageID);
// a storage in the process of failing will not be in the volumes list
// but will be in the replica map.
if (storageBuilder != null) {
storageBuilder.add(b);
} else {
if (!missingVolumesReported.contains(volStorageID)) {
LOG.warn("Storage volume: " + volStorageID + " missing for the"
+ " replica block: " + b + ". Probably being removed!");
missingVolumesReported.add(volStorageID);
volumeMap.replicas(bpid, (iterator) -> {
while (iterator.hasNext()) {
ReplicaInfo b = iterator.next();
// skip PROVIDED replicas.
if (b.getVolume().getStorageType() == StorageType.PROVIDED) {
continue;
}
String volStorageID = b.getVolume().getStorageID();
switch(b.getState()) {
case FINALIZED:
case RBW:
case RWR:
break;
case RUR:
// use the original replica.
b = b.getOriginalReplica();
break;
case TEMPORARY:
continue;
default:
assert false : "Illegal ReplicaInfo state.";
continue;
}
BlockListAsLongs.Builder storageBuilder = builders.get(volStorageID);
// a storage in the process of failing will not be in the volumes list
// but will be in the replica map.
if (storageBuilder != null) {
storageBuilder.add(b);
} else {
if (!missingVolumesReported.contains(volStorageID)) {
LOG.warn("Storage volume: " + volStorageID + " missing for the"
+ " replica block: " + b + ". Probably being removed!");
missingVolumesReported.add(volStorageID);
}
}
}
}
});
}
for (FsVolumeImpl v : curVolumes) {
@ -2162,7 +2149,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
* Gets a list of references to the finalized blocks for the given block pool.
* <p>
* Callers of this function should call
* {@link FsDatasetSpi#acquireDatasetLock()} to avoid blocks' status being
* {@link FsDatasetSpi#acquireDatasetLockManager()} ()} to avoid blocks' status being
* changed during list iteration.
* </p>
* @return a list of references to the finalized blocks for the given block
@ -2170,14 +2157,17 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
*/
@Override
public List<ReplicaInfo> getFinalizedBlocks(String bpid) {
try (AutoCloseableLock lock = datasetReadLock.acquire()) {
final List<ReplicaInfo> finalized = new ArrayList<ReplicaInfo>(
volumeMap.size(bpid));
for (ReplicaInfo b : volumeMap.replicas(bpid)) {
if (b.getState() == ReplicaState.FINALIZED) {
finalized.add(b);
try (AutoCloseDataSetLock l = lockManager.readLock(LockLevel.BLOCK_POOl, bpid)) {
ArrayList<ReplicaInfo> finalized =
new ArrayList<>(volumeMap.size(bpid));
volumeMap.replicas(bpid, (iterator) -> {
while (iterator.hasNext()) {
ReplicaInfo b = iterator.next();
if (b.getState() == ReplicaState.FINALIZED) {
finalized.add(new FinalizedReplica((FinalizedReplica)b));
}
}
}
});
return finalized;
}
}
@ -2310,7 +2300,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
for (int i = 0; i < invalidBlks.length; i++) {
final ReplicaInfo removing;
final FsVolumeImpl v;
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl, bpid)) {
final ReplicaInfo info = volumeMap.get(bpid, invalidBlks[i]);
if (info == null) {
ReplicaInfo infoByBlockId =
@ -2433,7 +2423,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
long length, genstamp;
Executor volumeExecutor;
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl, bpid)) {
ReplicaInfo info = volumeMap.get(bpid, blockId);
boolean success = false;
try {
@ -2501,7 +2491,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
@Override // FsDatasetSpi
public boolean contains(final ExtendedBlock block) {
try (AutoCloseableLock lock = datasetReadLock.acquire()) {
try (AutoCloseableLock lock = lockManager.readLock(LockLevel.BLOCK_POOl,
block.getBlockPoolId())) {
final long blockId = block.getLocalBlock().getBlockId();
final String bpid = block.getBlockPoolId();
final ReplicaInfo r = volumeMap.get(bpid, blockId);
@ -2628,7 +2619,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
curDirScannerNotifyCount = 0;
lastDirScannerNotifyTime = startTimeMs;
}
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl, bpid)) {
memBlockInfo = volumeMap.get(bpid, blockId);
if (memBlockInfo != null &&
memBlockInfo.getState() != ReplicaState.FINALIZED) {
@ -2851,7 +2842,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
@Override
public String getReplicaString(String bpid, long blockId) {
try (AutoCloseableLock lock = datasetReadLock.acquire()) {
try (AutoCloseableLock lock = lockManager.readLock(LockLevel.BLOCK_POOl, bpid)) {
final Replica r = volumeMap.get(bpid, blockId);
return r == null ? "null" : r.toString();
}
@ -2865,12 +2856,26 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
datanode.getDnConf().getXceiverStopTimeout());
}
/** static version of {@link #initReplicaRecovery(RecoveringBlock)}. */
static ReplicaRecoveryInfo initReplicaRecovery(String bpid, ReplicaMap map,
ReplicaRecoveryInfo initReplicaRecovery(String bpid, ReplicaMap map,
Block block, long recoveryId, long xceiverStopTimeout) throws IOException {
while (true) {
try {
try (AutoCloseableLock lock = map.getLock().acquire()) {
try (AutoCloseDataSetLock l = lockManager.writeLock(LockLevel.BLOCK_POOl, bpid)) {
return initReplicaRecoveryImpl(bpid, map, block, recoveryId);
}
} catch (MustStopExistingWriter e) {
e.getReplicaInPipeline().stopWriter(xceiverStopTimeout);
}
}
}
/** static version of {@link #initReplicaRecovery(RecoveringBlock)}. */
static ReplicaRecoveryInfo initReplicaRecovery(String bpid, ReplicaMap map,
Block block, long recoveryId, long xceiverStopTimeout, DataSetLockManager
lockManager) throws IOException {
while (true) {
try {
try (AutoCloseDataSetLock l = lockManager.writeLock(LockLevel.BLOCK_POOl, bpid)) {
return initReplicaRecoveryImpl(bpid, map, block, recoveryId);
}
} catch (MustStopExistingWriter e) {
@ -2959,7 +2964,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
final long newBlockId,
final long newlength) throws IOException {
long startTimeMs = Time.monotonicNow();
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl,
oldBlock.getBlockPoolId())) {
//get replica
final String bpid = oldBlock.getBlockPoolId();
final ReplicaInfo replica = volumeMap.get(bpid, oldBlock.getBlockId());
@ -3078,7 +3084,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
@Override // FsDatasetSpi
public long getReplicaVisibleLength(final ExtendedBlock block)
throws IOException {
try (AutoCloseableLock lock = datasetReadLock.acquire()) {
try (AutoCloseableLock lock = lockManager.readLock(LockLevel.BLOCK_POOl,
block.getBlockPoolId())) {
final Replica replica = getReplicaInfo(block.getBlockPoolId(),
block.getBlockId());
if (replica.getGenerationStamp() < block.getGenerationStamp()) {
@ -3095,7 +3102,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
throws IOException {
LOG.info("Adding block pool " + bpid);
AddBlockPoolException volumeExceptions = new AddBlockPoolException();
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl, bpid)) {
try {
volumes.addBlockPool(bpid, conf);
} catch (AddBlockPoolException e) {
@ -3125,7 +3132,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
@Override
public void shutdownBlockPool(String bpid) {
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl, bpid)) {
LOG.info("Removing block pool " + bpid);
Map<DatanodeStorage, BlockListAsLongs> blocksPerVolume
= getBlockReports(bpid);
@ -3199,7 +3206,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
@Override //FsDatasetSpi
public void deleteBlockPool(String bpid, boolean force)
throws IOException {
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl, bpid)) {
List<FsVolumeImpl> curVolumes = volumes.getVolumes();
if (!force) {
for (FsVolumeImpl volume : curVolumes) {
@ -3228,7 +3235,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
@Override // FsDatasetSpi
public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block)
throws IOException {
try (AutoCloseableLock lock = datasetReadLock.acquire()) {
try (AutoCloseableLock lock = lockManager.readLock(LockLevel.BLOCK_POOl,
block.getBlockPoolId())) {
final Replica replica = volumeMap.get(block.getBlockPoolId(),
block.getBlockId());
if (replica == null) {
@ -3282,7 +3290,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
@Override
public void onCompleteLazyPersist(String bpId, long blockId,
long creationTime, File[] savedFiles, FsVolumeImpl targetVolume) {
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl, bpId)) {
ramDiskReplicaTracker.recordEndLazyPersist(bpId, blockId, savedFiles);
targetVolume.incDfsUsedAndNumBlocks(bpId, savedFiles[0].length()
@ -3416,7 +3424,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
try {
block = ramDiskReplicaTracker.dequeueNextReplicaToPersist();
if (block != null) {
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl,
block.getBlockPoolId())) {
replicaInfo = volumeMap.get(block.getBlockPoolId(), block.getBlockId());
// If replicaInfo is null, the block was either deleted before
@ -3483,7 +3492,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
ReplicaInfo replicaInfo, newReplicaInfo;
final String bpid = replicaState.getBlockPoolId();
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl, bpid)) {
replicaInfo = getReplicaInfo(replicaState.getBlockPoolId(),
replicaState.getBlockId());
Preconditions.checkState(replicaInfo.getVolume().isTransientStorage());
@ -3661,18 +3670,21 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
}
void stopAllDataxceiverThreads(FsVolumeImpl volume) {
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
for (String bpid : volumeMap.getBlockPoolList()) {
Collection<ReplicaInfo> replicas = volumeMap.replicas(bpid);
for (ReplicaInfo replicaInfo : replicas) {
if ((replicaInfo.getState() == ReplicaState.TEMPORARY
|| replicaInfo.getState() == ReplicaState.RBW)
&& replicaInfo.getVolume().equals(volume)) {
ReplicaInPipeline replicaInPipeline =
(ReplicaInPipeline) replicaInfo;
replicaInPipeline.interruptThread();
for (String bpid : volumeMap.getBlockPoolList()) {
try (AutoCloseDataSetLock lock = lockManager
.writeLock(LockLevel.BLOCK_POOl, bpid)) {
volumeMap.replicas(bpid, (iterator) -> {
while (iterator.hasNext()) {
ReplicaInfo replicaInfo = iterator.next();
if ((replicaInfo.getState() == ReplicaState.TEMPORARY
|| replicaInfo.getState() == ReplicaState.RBW)
&& replicaInfo.getVolume().equals(volume)) {
ReplicaInPipeline replicaInPipeline =
(ReplicaInPipeline) replicaInfo;
replicaInPipeline.interruptThread();
}
}
}
});
}
}
}

View File

@ -345,6 +345,28 @@ class FsVolumeList {
FsDatasetImpl.LOG.info("Volume reference is released.");
}
/**
* Wait for the reference of the volume removed from a previous
* {@link #removeVolume(FsVolumeImpl)} call to be released.
*
* @param sleepMillis interval to recheck.
*/
void waitVolumeRemoved(int sleepMillis, Object condition) {
while (!checkVolumesRemoved()) {
if (FsDatasetImpl.LOG.isDebugEnabled()) {
FsDatasetImpl.LOG.debug("Waiting for volume reference to be released.");
}
try {
condition.wait(sleepMillis);
} catch (InterruptedException e) {
FsDatasetImpl.LOG.info("Thread interrupted when waiting for "
+ "volume reference to be released.");
Thread.currentThread().interrupt();
}
}
FsDatasetImpl.LOG.info("Volume reference is released.");
}
@Override
public String toString() {
return volumes.toString();

View File

@ -30,7 +30,6 @@ import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
@ -134,7 +133,7 @@ class ProvidedVolumeImpl extends FsVolumeImpl {
ProvidedBlockPoolSlice(String bpid, ProvidedVolumeImpl volume,
Configuration conf) {
this.providedVolume = volume;
bpVolumeMap = new ReplicaMap(new ReentrantReadWriteLock());
bpVolumeMap = new ReplicaMap();
Class<? extends BlockAliasMap> fmt =
conf.getClass(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_CLASS,
TextFileRegionAliasMap.class, BlockAliasMap.class);
@ -219,7 +218,7 @@ class ProvidedVolumeImpl extends FsVolumeImpl {
}
public boolean isEmpty() {
return bpVolumeMap.replicas(bpid).size() == 0;
return bpVolumeMap.size(bpid) == 0;
}
public void shutdown(BlockListAsLongs blocksListsAsLongs) {

View File

@ -18,46 +18,49 @@
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.common.AutoCloseDataSetLock;
import org.apache.hadoop.hdfs.server.common.DataNodeLockManager;
import org.apache.hadoop.hdfs.server.common.DataNodeLockManager.LockLevel;
import org.apache.hadoop.hdfs.server.common.NoLockManager;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
import org.apache.hadoop.util.LightWeightResizableGSet;
import org.apache.hadoop.util.AutoCloseableLock;
/**
* Maintains the replica map.
*/
class ReplicaMap {
// Lock object to synchronize this instance.
private final AutoCloseableLock readLock;
private final AutoCloseableLock writeLock;
private DataNodeLockManager<AutoCloseDataSetLock> lockManager;
// Map of block pool Id to another map of block Id to ReplicaInfo.
private final Map<String, LightWeightResizableGSet<Block, ReplicaInfo>> map =
new HashMap<>();
new ConcurrentHashMap<>();
ReplicaMap(AutoCloseableLock readLock, AutoCloseableLock writeLock) {
if (readLock == null || writeLock == null) {
ReplicaMap(DataNodeLockManager<AutoCloseDataSetLock> manager) {
if (manager == null) {
throw new HadoopIllegalArgumentException(
"Lock to synchronize on cannot be null");
"Object to synchronize on cannot be null");
}
this.readLock = readLock;
this.writeLock = writeLock;
this.lockManager = manager;
}
ReplicaMap(ReadWriteLock lock) {
this(new AutoCloseableLock(lock.readLock()),
new AutoCloseableLock(lock.writeLock()));
// Used for ut or temp replicaMap that no need to protected by lock.
ReplicaMap() {
this.lockManager = new NoLockManager();
}
String[] getBlockPoolList() {
try (AutoCloseableLock l = readLock.acquire()) {
return map.keySet().toArray(new String[map.keySet().size()]);
}
Set<String> bpset = map.keySet();
return bpset.toArray(new String[bpset.size()]);
}
private void checkBlockPool(String bpid) {
@ -100,7 +103,7 @@ class ReplicaMap {
*/
ReplicaInfo get(String bpid, long blockId) {
checkBlockPool(bpid);
try (AutoCloseableLock l = readLock.acquire()) {
try (AutoCloseDataSetLock l = lockManager.readLock(LockLevel.BLOCK_POOl, bpid)) {
LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
return m != null ? m.get(new Block(blockId)) : null;
}
@ -117,7 +120,7 @@ class ReplicaMap {
ReplicaInfo add(String bpid, ReplicaInfo replicaInfo) {
checkBlockPool(bpid);
checkBlock(replicaInfo);
try (AutoCloseableLock l = writeLock.acquire()) {
try (AutoCloseDataSetLock l = lockManager.writeLock(LockLevel.BLOCK_POOl, bpid)) {
LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
if (m == null) {
// Add an entry for block pool if it does not exist already
@ -135,7 +138,7 @@ class ReplicaMap {
ReplicaInfo addAndGet(String bpid, ReplicaInfo replicaInfo) {
checkBlockPool(bpid);
checkBlock(replicaInfo);
try (AutoCloseableLock l = writeLock.acquire()) {
try (AutoCloseDataSetLock l = lockManager.writeLock(LockLevel.BLOCK_POOl, bpid)) {
LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
if (m == null) {
// Add an entry for block pool if it does not exist already
@ -164,13 +167,28 @@ class ReplicaMap {
* Merge all entries from the given replica map into the local replica map.
*/
void mergeAll(ReplicaMap other) {
other.map.forEach(
(bp, replicaInfos) -> {
replicaInfos.forEach(
replicaInfo -> add(bp, replicaInfo)
);
Set<String> bplist = other.map.keySet();
for (String bp : bplist) {
checkBlockPool(bp);
try (AutoCloseDataSetLock l = lockManager.writeLock(LockLevel.BLOCK_POOl, bp)) {
LightWeightResizableGSet<Block, ReplicaInfo> replicaInfos = other.map.get(bp);
LightWeightResizableGSet<Block, ReplicaInfo> curSet = map.get(bp);
HashSet<ReplicaInfo> replicaSet = new HashSet<>();
//Can't add to GSet while in another GSet iterator may cause endlessLoop
for (ReplicaInfo replicaInfo : replicaInfos) {
replicaSet.add(replicaInfo);
}
);
for (ReplicaInfo replicaInfo : replicaSet) {
checkBlock(replicaInfo);
if (curSet == null) {
// Add an entry for block pool if it does not exist already
curSet = new LightWeightResizableGSet<>();
map.put(bp, curSet);
}
curSet.put(replicaInfo);
}
}
}
}
/**
@ -184,7 +202,7 @@ class ReplicaMap {
ReplicaInfo remove(String bpid, Block block) {
checkBlockPool(bpid);
checkBlock(block);
try (AutoCloseableLock l = writeLock.acquire()) {
try (AutoCloseDataSetLock l = lockManager.writeLock(LockLevel.BLOCK_POOl, bpid)) {
LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
if (m != null) {
ReplicaInfo replicaInfo = m.get(block);
@ -206,7 +224,7 @@ class ReplicaMap {
*/
ReplicaInfo remove(String bpid, long blockId) {
checkBlockPool(bpid);
try (AutoCloseableLock l = writeLock.acquire()) {
try (AutoCloseDataSetLock l = lockManager.writeLock(LockLevel.BLOCK_POOl, bpid)) {
LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
if (m != null) {
return m.remove(new Block(blockId));
@ -221,7 +239,7 @@ class ReplicaMap {
* @return the number of replicas in the map
*/
int size(String bpid) {
try (AutoCloseableLock l = readLock.acquire()) {
try (AutoCloseDataSetLock l = lockManager.readLock(LockLevel.BLOCK_POOl, bpid)) {
LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
return m != null ? m.size() : 0;
}
@ -229,11 +247,9 @@ class ReplicaMap {
/**
* Get a collection of the replicas for given block pool
* This method is <b>not synchronized</b>. It needs to be synchronized
* externally using the lock, both for getting the replicas
* values from the map and iterating over it. Mutex can be accessed using
* {@link #getLock()} method.
*
* This method is <b>not synchronized</b>. If you want to keep thread safe
* Use method {@link #replicas(String, Consumer<Iterator<ReplicaInfo>>)}.
*
* @param bpid block pool id
* @return a collection of the replicas belonging to the block pool
*/
@ -243,9 +259,25 @@ class ReplicaMap {
return m != null ? m.values() : null;
}
/**
* execute function for one block pool and protect by LockManager.
* This method is <b>synchronized</b>.
*
* @param bpid block pool id
*/
void replicas(String bpid, Consumer<Iterator<ReplicaInfo>> consumer) {
LightWeightResizableGSet<Block, ReplicaInfo> m = null;
try (AutoCloseDataSetLock l = lockManager.readLock(LockLevel.BLOCK_POOl, bpid)) {
m = map.get(bpid);
if (m !=null) {
m.getIterator(consumer);
}
}
}
void initBlockPool(String bpid) {
checkBlockPool(bpid);
try (AutoCloseableLock l = writeLock.acquire()) {
try (AutoCloseDataSetLock l = lockManager.writeLock(LockLevel.BLOCK_POOl, bpid)) {
LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
if (m == null) {
// Add an entry for block pool if it does not exist already
@ -257,26 +289,8 @@ class ReplicaMap {
void cleanUpBlockPool(String bpid) {
checkBlockPool(bpid);
try (AutoCloseableLock l = writeLock.acquire()) {
try (AutoCloseDataSetLock l = lockManager.writeLock(LockLevel.BLOCK_POOl, bpid)) {
map.remove(bpid);
}
}
/**
* Get the lock object used for synchronizing ReplicasMap
* @return lock object
*/
AutoCloseableLock getLock() {
return writeLock;
}
/**
* Get the lock object used for synchronizing the ReplicasMap for read only
* operations.
* @return The read lock object
*/
AutoCloseableLock getReadLock() {
return readLock;
}
}

View File

@ -40,6 +40,8 @@ import javax.management.NotCompliantMBeanException;
import javax.management.ObjectName;
import javax.management.StandardMBean;
import org.apache.hadoop.hdfs.server.common.AutoCloseDataSetLock;
import org.apache.hadoop.hdfs.server.common.DataNodeLockManager;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MountVolumeMap;
import org.apache.hadoop.thirdparty.com.google.common.math.LongMath;
import org.apache.commons.lang3.ArrayUtils;
@ -48,7 +50,6 @@ import org.apache.hadoop.fs.DF;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetImplTestUtils;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
@ -162,7 +163,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
static final byte[] nullCrcFileData;
private final AutoCloseableLock datasetLock;
private final DataNodeLockManager datasetLockManager;
private final FileIoProvider fileIoProvider;
static {
@ -707,6 +708,8 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
public SimulatedFSDataset(DataNode datanode, DataStorage storage, Configuration conf) {
this.datanode = datanode;
this.datasetLockManager = datanode == null ? new DataSetLockManager() :
datanode.getDataSetLockManager();
int storageCount;
if (storage != null && storage.getNumStorageDirs() > 0) {
storageCount = storage.getNumStorageDirs();
@ -721,9 +724,6 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
registerMBean(datanodeUuid);
this.fileIoProvider = new FileIoProvider(conf, datanode);
this.datasetLock = new AutoCloseableLock();
this.storages = new ArrayList<>();
for (int i = 0; i < storageCount; i++) {
this.storages.add(new SimulatedStorage(
@ -1587,14 +1587,8 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
}
@Override
public AutoCloseableLock acquireDatasetLock() {
return datasetLock.acquire();
}
@Override
public AutoCloseableLock acquireDatasetReadLock() {
// No RW lock implementation in simulated dataset currently.
return datasetLock.acquire();
public DataNodeLockManager<AutoCloseDataSetLock> acquireDatasetLockManager() {
return null;
}
@Override

View File

@ -128,6 +128,7 @@ public class TestBPOfferService {
private final int[] heartbeatCounts = new int[3];
private DataNode mockDn;
private FsDatasetSpi<?> mockFSDataset;
private DataSetLockManager dataSetLockManager = new DataSetLockManager();
private boolean isSlownode;
@Before
@ -153,6 +154,7 @@ public class TestBPOfferService {
// Wire the dataset to the DN.
Mockito.doReturn(mockFSDataset).when(mockDn).getFSDataset();
Mockito.doReturn(dataSetLockManager).when(mockDn).getDataSetLockManager();
}
/**
@ -508,6 +510,7 @@ public class TestBPOfferService {
public void testBPInitErrorHandling() throws Exception {
final DataNode mockDn = Mockito.mock(DataNode.class);
Mockito.doReturn(true).when(mockDn).shouldRun();
Mockito.doReturn(dataSetLockManager).when(mockDn).getDataSetLockManager();
Configuration conf = new Configuration();
File dnDataDir = new File(
new File(TEST_BUILD_DATA, "testBPInitErrorHandling"), "data");

View File

@ -46,7 +46,6 @@ import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.AutoCloseableLock;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@ -252,10 +251,8 @@ public class TestBlockRecovery2 {
final BlockRecoveryCommand.RecoveringBlock recoveringBlock =
new BlockRecoveryCommand.RecoveringBlock(block.getBlock(),
locations, block.getBlock().getGenerationStamp() + 1);
try(AutoCloseableLock lock = dataNode.data.acquireDatasetLock()) {
Thread.sleep(2000);
dataNode.initReplicaRecovery(recoveringBlock);
}
Thread.sleep(2000);
dataNode.initReplicaRecovery(recoveringBlock);
} catch (Exception e) {
LOG.error("Something went wrong.", e);
recoveryInitResult.set(false);

View File

@ -63,6 +63,7 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.DataNodeLockManager.LockLevel;
import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner.ReportCompiler;
import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.DataNodeVolumeMetrics;
@ -133,7 +134,8 @@ public class TestDirectoryScanner {
/** Truncate a block file. */
private long truncateBlockFile() throws IOException {
try (AutoCloseableLock lock = fds.acquireDatasetLock()) {
try (AutoCloseableLock lock = fds.acquireDatasetLockManager().writeLock(
LockLevel.BLOCK_POOl, bpid)) {
for (ReplicaInfo b : FsDatasetTestUtil.getReplicas(fds, bpid)) {
File f = new File(b.getBlockURI());
File mf = new File(b.getMetadataURI());
@ -158,7 +160,8 @@ public class TestDirectoryScanner {
/** Delete a block file */
private long deleteBlockFile() {
try (AutoCloseableLock lock = fds.acquireDatasetLock()) {
try (AutoCloseableLock lock = fds.acquireDatasetLockManager().
writeLock(LockLevel.BLOCK_POOl, bpid)) {
for (ReplicaInfo b : FsDatasetTestUtil.getReplicas(fds, bpid)) {
File f = new File(b.getBlockURI());
File mf = new File(b.getMetadataURI());
@ -174,7 +177,8 @@ public class TestDirectoryScanner {
/** Delete block meta file */
private long deleteMetaFile() {
try (AutoCloseableLock lock = fds.acquireDatasetLock()) {
try (AutoCloseableLock lock = fds.acquireDatasetLockManager().
writeLock(LockLevel.BLOCK_POOl, bpid)) {
for (ReplicaInfo b : FsDatasetTestUtil.getReplicas(fds, bpid)) {
// Delete a metadata file
if (b.metadataExists() && b.deleteMetadata()) {
@ -193,7 +197,8 @@ public class TestDirectoryScanner {
* @throws IOException
*/
private void duplicateBlock(long blockId) throws IOException {
try (AutoCloseableLock lock = fds.acquireDatasetLock()) {
try (AutoCloseableLock lock = fds.acquireDatasetLockManager().
writeLock(LockLevel.BLOCK_POOl, bpid)) {
ReplicaInfo b = FsDatasetTestUtil.fetchReplicaInfo(fds, bpid, blockId);
try (FsDatasetSpi.FsVolumeReferences volumes =
fds.getFsVolumeReferences()) {

View File

@ -23,8 +23,9 @@ import java.util.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.server.common.AutoCloseDataSetLock;
import org.apache.hadoop.hdfs.server.common.DataNodeLockManager;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MountVolumeMap;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
@ -452,12 +453,7 @@ public class ExternalDatasetImpl implements FsDatasetSpi<ExternalVolumeImpl> {
}
@Override
public AutoCloseableLock acquireDatasetLock() {
return null;
}
@Override
public AutoCloseableLock acquireDatasetReadLock() {
public DataNodeLockManager<AutoCloseDataSetLock> acquireDatasetLockManager() {
return null;
}

View File

@ -434,7 +434,7 @@ public class FsDatasetImplTestUtils implements FsDatasetTestUtils {
@Override
public Iterator<Replica> getStoredReplicas(String bpid) throws IOException {
// Reload replicas from the disk.
ReplicaMap replicaMap = new ReplicaMap(dataset.datasetRWLock);
ReplicaMap replicaMap = new ReplicaMap(dataset.acquireDatasetLockManager());
try (FsVolumeReferences refs = dataset.getFsVolumeReferences()) {
for (FsVolumeSpi vol : refs) {
FsVolumeImpl volume = (FsVolumeImpl) vol;

View File

@ -21,11 +21,11 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import org.apache.hadoop.fs.DF;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.datanode.DataSetLockManager;
import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner;
import org.apache.hadoop.hdfs.server.datanode.LocalReplica;
@ -70,7 +70,6 @@ import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.io.MultipleIOException;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.LambdaTestUtils;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.util.DiskChecker;
import org.apache.hadoop.util.FakeTimer;
import org.apache.hadoop.util.Lists;
@ -94,7 +93,6 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DN_CACHED_DFSUSED_CHECK_INTERVAL_MS;
@ -140,6 +138,7 @@ public class TestFsDatasetImpl {
private DataNode datanode;
private DataStorage storage;
private FsDatasetImpl dataset;
private DataSetLockManager manager = new DataSetLockManager();
private final static String BLOCKPOOL = "BP-TEST";
@ -213,6 +212,7 @@ public class TestFsDatasetImpl {
this.conf.set(DFSConfigKeys.DFS_DATANODE_REPLICA_CACHE_ROOT_DIR_KEY,
replicaCacheRootDir);
when(datanode.getDataSetLockManager()).thenReturn(manager);
when(datanode.getConf()).thenReturn(conf);
final DNConf dnConf = new DNConf(datanode);
when(datanode.getDnConf()).thenReturn(dnConf);
@ -232,118 +232,6 @@ public class TestFsDatasetImpl {
assertEquals(0, dataset.getNumFailedVolumes());
}
@Test(timeout=10000)
public void testReadLockEnabledByDefault()
throws Exception {
final FsDatasetSpi ds = dataset;
AtomicBoolean accessed = new AtomicBoolean(false);
CountDownLatch latch = new CountDownLatch(1);
CountDownLatch waiterLatch = new CountDownLatch(1);
Thread holder = new Thread() {
public void run() {
try (AutoCloseableLock l = ds.acquireDatasetReadLock()) {
latch.countDown();
// wait for the waiter thread to access the lock.
waiterLatch.await();
} catch (Exception e) {
}
}
};
Thread waiter = new Thread() {
public void run() {
try {
latch.await();
} catch (InterruptedException e) {
waiterLatch.countDown();
return;
}
try (AutoCloseableLock l = ds.acquireDatasetReadLock()) {
accessed.getAndSet(true);
// signal the holder thread.
waiterLatch.countDown();
} catch (Exception e) {
}
}
};
waiter.start();
holder.start();
holder.join();
waiter.join();
// The holder thread is still holding the lock, but the waiter can still
// run as the lock is a shared read lock.
// Otherwise test will timeout with deadlock.
assertEquals(true, accessed.get());
holder.interrupt();
}
@Test(timeout=20000)
public void testReadLockCanBeDisabledByConfig()
throws Exception {
HdfsConfiguration conf = new HdfsConfiguration();
conf.setBoolean(
DFSConfigKeys.DFS_DATANODE_LOCK_READ_WRITE_ENABLED_KEY, false);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(1).build();
try {
AtomicBoolean accessed = new AtomicBoolean(false);
cluster.waitActive();
DataNode dn = cluster.getDataNodes().get(0);
final FsDatasetSpi<?> ds = DataNodeTestUtils.getFSDataset(dn);
CountDownLatch latch = new CountDownLatch(1);
CountDownLatch waiterLatch = new CountDownLatch(1);
Thread holder = new Thread() {
public void run() {
try (AutoCloseableLock l = ds.acquireDatasetReadLock()) {
latch.countDown();
// wait for the waiter thread to access the lock.
waiterLatch.await();
} catch (Exception e) {
}
}
};
Thread waiter = new Thread() {
public void run() {
try {
// Wait for holder to get ds read lock.
latch.await();
} catch (InterruptedException e) {
waiterLatch.countDown();
return;
}
try (AutoCloseableLock l = ds.acquireDatasetReadLock()) {
accessed.getAndSet(true);
// signal the holder thread.
waiterLatch.countDown();
} catch (Exception e) {
}
}
};
waiter.start();
holder.start();
// Wait for sometime to make sure we are in deadlock,
try {
GenericTestUtils.waitFor(() ->
accessed.get(),
100, 10000);
fail("Waiter thread should not execute.");
} catch (TimeoutException e) {
}
// Release waiterLatch to exit deadlock.
waiterLatch.countDown();
holder.join();
waiter.join();
// After releasing waiterLatch water
// thread will be able to execute.
assertTrue(accessed.get());
} finally {
cluster.shutdown();
}
}
@Test
public void testAddVolumes() throws IOException {
final int numNewVolumes = 3;
@ -413,6 +301,7 @@ public class TestFsDatasetImpl {
final ShortCircuitRegistry shortCircuitRegistry =
new ShortCircuitRegistry(conf);
when(datanode.getShortCircuitRegistry()).thenReturn(shortCircuitRegistry);
when(datanode.getDataSetLockManager()).thenReturn(manager);
createStorageDirs(storage, conf, 1);
dataset = new FsDatasetImpl(datanode, storage, conf);
@ -480,6 +369,8 @@ public class TestFsDatasetImpl {
final ShortCircuitRegistry shortCircuitRegistry =
new ShortCircuitRegistry(conf);
when(datanode.getShortCircuitRegistry()).thenReturn(shortCircuitRegistry);
when(datanode.getDataSetLockManager()).thenReturn(manager);
createStorageDirs(storage, conf, 0);

View File

@ -57,7 +57,6 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DU_RESERVED_PERCENTAGE_KEY;
@ -407,7 +406,7 @@ public class TestFsVolumeList {
fs.close();
FsDatasetImpl fsDataset = (FsDatasetImpl) cluster.getDataNodes().get(0)
.getFSDataset();
ReplicaMap volumeMap = new ReplicaMap(new ReentrantReadWriteLock());
ReplicaMap volumeMap = new ReplicaMap(fsDataset.acquireDatasetLockManager());
RamDiskReplicaTracker ramDiskReplicaMap = RamDiskReplicaTracker
.getInstance(conf, fsDataset);
FsVolumeImpl vol = (FsVolumeImpl) fsDataset.getFsVolumeReferences().get(0);

View File

@ -25,7 +25,6 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.util.List;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@ -47,6 +46,7 @@ import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.DataSetLockManager;
import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
import org.apache.hadoop.hdfs.server.datanode.Replica;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
@ -236,7 +236,8 @@ public class TestInterDatanodeProtocol {
final long firstblockid = 10000L;
final long gs = 7777L;
final long length = 22L;
final ReplicaMap map = new ReplicaMap(new ReentrantReadWriteLock());
DataSetLockManager manager = new DataSetLockManager();
final ReplicaMap map = new ReplicaMap(manager);
String bpid = "BP-TEST";
final Block[] blocks = new Block[5];
for(int i = 0; i < blocks.length; i++) {
@ -252,7 +253,7 @@ public class TestInterDatanodeProtocol {
final long recoveryid = gs + 1;
final ReplicaRecoveryInfo recoveryInfo = FsDatasetImpl
.initReplicaRecovery(bpid, map, blocks[0], recoveryid,
DFSConfigKeys.DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_DEFAULT);
DFSConfigKeys.DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_DEFAULT, manager);
assertEquals(originalInfo, recoveryInfo);
final ReplicaUnderRecovery updatedInfo = (ReplicaUnderRecovery)map.get(bpid, b);
@ -263,7 +264,7 @@ public class TestInterDatanodeProtocol {
final long recoveryid2 = gs + 2;
final ReplicaRecoveryInfo recoveryInfo2 = FsDatasetImpl
.initReplicaRecovery(bpid, map, blocks[0], recoveryid2,
DFSConfigKeys.DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_DEFAULT);
DFSConfigKeys.DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_DEFAULT, manager);
assertEquals(originalInfo, recoveryInfo2);
final ReplicaUnderRecovery updatedInfo2 = (ReplicaUnderRecovery)map.get(bpid, b);
@ -273,7 +274,7 @@ public class TestInterDatanodeProtocol {
//case RecoveryInProgressException
try {
FsDatasetImpl.initReplicaRecovery(bpid, map, b, recoveryid,
DFSConfigKeys.DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_DEFAULT);
DFSConfigKeys.DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_DEFAULT, manager);
Assert.fail();
}
catch(RecoveryInProgressException ripe) {
@ -286,7 +287,7 @@ public class TestInterDatanodeProtocol {
final Block b = new Block(firstblockid - 1, length, gs);
ReplicaRecoveryInfo r = FsDatasetImpl.initReplicaRecovery(bpid, map, b,
recoveryid,
DFSConfigKeys.DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_DEFAULT);
DFSConfigKeys.DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_DEFAULT, manager);
Assert.assertNull("Data-node should not have this replica.", r);
}
@ -295,7 +296,7 @@ public class TestInterDatanodeProtocol {
final Block b = new Block(firstblockid + 1, length, gs);
try {
FsDatasetImpl.initReplicaRecovery(bpid, map, b, recoveryid,
DFSConfigKeys.DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_DEFAULT);
DFSConfigKeys.DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_DEFAULT, manager);
Assert.fail();
}
catch(IOException ioe) {
@ -309,7 +310,7 @@ public class TestInterDatanodeProtocol {
final Block b = new Block(firstblockid, length, gs+1);
try {
FsDatasetImpl.initReplicaRecovery(bpid, map, b, recoveryid,
DFSConfigKeys.DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_DEFAULT);
DFSConfigKeys.DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_DEFAULT, manager);
fail("InitReplicaRecovery should fail because replica's " +
"gs is less than the block's gs");
} catch (IOException e) {

View File

@ -44,7 +44,6 @@ import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
@ -66,6 +65,7 @@ import org.apache.hadoop.hdfs.server.common.blockaliasmap.BlockAliasMap;
import org.apache.hadoop.hdfs.server.datanode.BlockScanner;
import org.apache.hadoop.hdfs.server.datanode.DNConf;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataSetLockManager;
import org.apache.hadoop.hdfs.server.datanode.DataStorage;
import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner;
import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner.ScanInfoVolumeReport;
@ -109,6 +109,7 @@ public class TestProvidedImpl {
private DataNode datanode;
private DataStorage storage;
private FsDatasetImpl dataset;
private DataSetLockManager manager = new DataSetLockManager();
private static Map<Long, String> blkToPathMap;
private static List<FsVolumeImpl> providedVolumes;
private static long spaceUsed = 0;
@ -319,6 +320,7 @@ public class TestProvidedImpl {
conf.setLong(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, 0);
when(datanode.getConf()).thenReturn(conf);
when(datanode.getDataSetLockManager()).thenReturn(manager);
final DNConf dnConf = new DNConf(datanode);
when(datanode.getDnConf()).thenReturn(dnConf);
// reset the space used
@ -400,7 +402,7 @@ public class TestProvidedImpl {
public void testBlockLoad() throws IOException {
for (int i = 0; i < providedVolumes.size(); i++) {
FsVolumeImpl vol = providedVolumes.get(i);
ReplicaMap volumeMap = new ReplicaMap(new ReentrantReadWriteLock());
ReplicaMap volumeMap = new ReplicaMap();
vol.getVolumeMap(volumeMap, null);
assertEquals(vol.getBlockPoolList().length, BLOCK_POOL_IDS.length);
@ -476,7 +478,7 @@ public class TestProvidedImpl {
vol.setFileRegionProvider(BLOCK_POOL_IDS[CHOSEN_BP_ID],
new TestFileRegionBlockAliasMap(fileRegionIterator, minBlockId,
numBlocks));
ReplicaMap volumeMap = new ReplicaMap(new ReentrantReadWriteLock());
ReplicaMap volumeMap = new ReplicaMap();
vol.getVolumeMap(BLOCK_POOL_IDS[CHOSEN_BP_ID], volumeMap, null);
totalBlocks += volumeMap.size(BLOCK_POOL_IDS[CHOSEN_BP_ID]);
}
@ -586,7 +588,7 @@ public class TestProvidedImpl {
public void testProvidedReplicaPrefix() throws Exception {
for (int i = 0; i < providedVolumes.size(); i++) {
FsVolumeImpl vol = providedVolumes.get(i);
ReplicaMap volumeMap = new ReplicaMap(new ReentrantReadWriteLock());
ReplicaMap volumeMap = new ReplicaMap();
vol.getVolumeMap(volumeMap, null);
Path expectedPrefix = new Path(

View File

@ -26,13 +26,12 @@ import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
import org.junit.Before;
import org.junit.Test;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* Unit test for ReplicasMap class
*/
public class TestReplicaMap {
private final ReplicaMap map = new ReplicaMap(new ReentrantReadWriteLock());
private final ReplicaMap map = new ReplicaMap();
private final String bpid = "BP-TEST";
private final Block block = new Block(1234, 1234, 1234);
@ -112,7 +111,7 @@ public class TestReplicaMap {
@Test
public void testMergeAll() {
ReplicaMap temReplicaMap = new ReplicaMap(new ReentrantReadWriteLock());
ReplicaMap temReplicaMap = new ReplicaMap();
Block tmpBlock = new Block(5678, 5678, 5678);
temReplicaMap.add(bpid, new FinalizedReplica(tmpBlock, null, null));
@ -123,7 +122,7 @@ public class TestReplicaMap {
@Test
public void testAddAll() {
ReplicaMap temReplicaMap = new ReplicaMap(new ReentrantReadWriteLock());
ReplicaMap temReplicaMap = new ReplicaMap();
Block tmpBlock = new Block(5678, 5678, 5678);
temReplicaMap.add(bpid, new FinalizedReplica(tmpBlock, null, null));

View File

@ -27,7 +27,6 @@ import java.io.RandomAccessFile;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType;
@ -550,7 +549,7 @@ public class TestWriteToReplica {
bpList.size() == 2);
createReplicas(bpList, volumes, cluster.getFsDatasetTestUtils(dn));
ReplicaMap oldReplicaMap = new ReplicaMap(new ReentrantReadWriteLock());
ReplicaMap oldReplicaMap = new ReplicaMap();
oldReplicaMap.addAll(dataSet.volumeMap);
cluster.restartDataNode(0);