Revert " HDFS-15160. ReplicaMap, Disk Balancer, Directory Scanner and various FsDatasetImpl methods should use datanode readlock. (#3200)"
This reverts commit 318bc5ff69
.
This commit is contained in:
parent
318bc5ff69
commit
21fc30c1aa
|
@ -37,7 +37,7 @@ public class InstrumentedReadWriteLock implements ReadWriteLock {
|
|||
private final Lock readLock;
|
||||
private final Lock writeLock;
|
||||
|
||||
public InstrumentedReadWriteLock(boolean fair, String name, Logger logger,
|
||||
InstrumentedReadWriteLock(boolean fair, String name, Logger logger,
|
||||
long minLoggingGapMs, long lockWarningThresholdMs) {
|
||||
ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(fair);
|
||||
readLock = new InstrumentedReadLock(name, logger, readWriteLock,
|
||||
|
|
|
@ -549,17 +549,6 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
|||
"dfs.lock.suppress.warning.interval";
|
||||
public static final long DFS_LOCK_SUPPRESS_WARNING_INTERVAL_DEFAULT =
|
||||
10000; //ms
|
||||
public static final String DFS_DATANODE_LOCK_FAIR_KEY =
|
||||
"dfs.datanode.lock.fair";
|
||||
public static final boolean DFS_DATANODE_LOCK_FAIR_DEFAULT = true;
|
||||
public static final String DFS_DATANODE_LOCK_READ_WRITE_ENABLED_KEY =
|
||||
"dfs.datanode.lock.read.write.enabled";
|
||||
public static final Boolean DFS_DATANODE_LOCK_READ_WRITE_ENABLED_DEFAULT =
|
||||
true;
|
||||
public static final String DFS_DATANODE_LOCK_REPORTING_THRESHOLD_MS_KEY =
|
||||
"dfs.datanode.lock-reporting-threshold-ms";
|
||||
public static final long
|
||||
DFS_DATANODE_LOCK_REPORTING_THRESHOLD_MS_DEFAULT = 300L;
|
||||
|
||||
public static final String DFS_UPGRADE_DOMAIN_FACTOR = "dfs.namenode.upgrade.domain.factor";
|
||||
public static final int DFS_UPGRADE_DOMAIN_FACTOR_DEFAULT = DFS_REPLICATION_DEFAULT;
|
||||
|
|
|
@ -254,7 +254,7 @@ class BlockSender implements java.io.Closeable {
|
|||
// the append write.
|
||||
ChunkChecksum chunkChecksum = null;
|
||||
final long replicaVisibleLength;
|
||||
try(AutoCloseableLock lock = datanode.data.acquireDatasetReadLock()) {
|
||||
try(AutoCloseableLock lock = datanode.data.acquireDatasetLock()) {
|
||||
replica = getReplica(block, datanode);
|
||||
replicaVisibleLength = replica.getVisibleLength();
|
||||
}
|
||||
|
|
|
@ -3010,7 +3010,7 @@ public class DataNode extends ReconfigurableBase
|
|||
final BlockConstructionStage stage;
|
||||
|
||||
//get replica information
|
||||
try(AutoCloseableLock lock = data.acquireDatasetReadLock()) {
|
||||
try(AutoCloseableLock lock = data.acquireDatasetLock()) {
|
||||
Block storedBlock = data.getStoredBlock(b.getBlockPoolId(),
|
||||
b.getBlockId());
|
||||
if (null == storedBlock) {
|
||||
|
|
|
@ -504,7 +504,7 @@ public class DiskBalancer {
|
|||
Map<String, String> storageIDToVolBasePathMap = new HashMap<>();
|
||||
FsDatasetSpi.FsVolumeReferences references;
|
||||
try {
|
||||
try(AutoCloseableLock lock = this.dataset.acquireDatasetReadLock()) {
|
||||
try(AutoCloseableLock lock = this.dataset.acquireDatasetLock()) {
|
||||
references = this.dataset.getFsVolumeReferences();
|
||||
for (int ndx = 0; ndx < references.size(); ndx++) {
|
||||
FsVolumeSpi vol = references.get(ndx);
|
||||
|
|
|
@ -657,19 +657,9 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
|
|||
FsVolumeSpi destination) throws IOException;
|
||||
|
||||
/**
|
||||
* Acquire the lock of the data set. This prevents other threads from
|
||||
* modifying the volume map structure inside the datanode, but other changes
|
||||
* are still possible. For example modifying the genStamp of a block instance.
|
||||
* Acquire the lock of the data set.
|
||||
*/
|
||||
AutoCloseableLock acquireDatasetLock();
|
||||
|
||||
/***
|
||||
* Acquire the read lock of the data set. This prevents other threads from
|
||||
* modifying the volume map structure inside the datanode, but other changes
|
||||
* are still possible. For example modifying the genStamp of a block instance.
|
||||
* @return The AutoClosable read lock instance.
|
||||
*/
|
||||
AutoCloseableLock acquireDatasetReadLock();
|
||||
|
||||
Set<? extends Replica> deepCopyReplica(String bpid) throws IOException;
|
||||
}
|
||||
|
|
|
@ -42,7 +42,6 @@ import java.util.concurrent.ForkJoinPool;
|
|||
import java.util.concurrent.ForkJoinTask;
|
||||
import java.util.concurrent.RecursiveAction;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
import org.apache.hadoop.hdfs.server.datanode.FSCachingGetSpaceUsed;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -67,6 +66,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaTrack
|
|||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.io.MultipleIOException;
|
||||
import org.apache.hadoop.util.AutoCloseableLock;
|
||||
import org.apache.hadoop.util.DataChecksum;
|
||||
import org.apache.hadoop.util.DataChecksum.Type;
|
||||
import org.apache.hadoop.util.DiskChecker;
|
||||
|
@ -874,7 +874,7 @@ class BlockPoolSlice {
|
|||
|
||||
private boolean readReplicasFromCache(ReplicaMap volumeMap,
|
||||
final RamDiskReplicaTracker lazyWriteReplicaMap) {
|
||||
ReplicaMap tmpReplicaMap = new ReplicaMap(new ReentrantReadWriteLock());
|
||||
ReplicaMap tmpReplicaMap = new ReplicaMap(new AutoCloseableLock());
|
||||
File replicaFile = new File(currentDir, REPLICA_CACHE_FILE);
|
||||
// Check whether the file exists or not.
|
||||
if (!replicaFile.exists()) {
|
||||
|
|
|
@ -40,6 +40,7 @@ import java.util.Set;
|
|||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.locks.Condition;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import javax.management.NotCompliantMBeanException;
|
||||
|
@ -111,7 +112,7 @@ import org.apache.hadoop.util.Daemon;
|
|||
import org.apache.hadoop.util.DataChecksum;
|
||||
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
|
||||
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
|
||||
import org.apache.hadoop.util.InstrumentedReadWriteLock;
|
||||
import org.apache.hadoop.util.InstrumentedLock;
|
||||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.apache.hadoop.util.Timer;
|
||||
|
@ -178,7 +179,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
|
||||
@Override
|
||||
public FsVolumeImpl getVolume(final ExtendedBlock b) {
|
||||
try (AutoCloseableLock lock = datasetReadLock.acquire()) {
|
||||
try (AutoCloseableLock lock = datasetLock.acquire()) {
|
||||
final ReplicaInfo r =
|
||||
volumeMap.get(b.getBlockPoolId(), b.getLocalBlock());
|
||||
return r != null ? (FsVolumeImpl) r.getVolume() : null;
|
||||
|
@ -188,7 +189,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
@Override // FsDatasetSpi
|
||||
public Block getStoredBlock(String bpid, long blkid)
|
||||
throws IOException {
|
||||
try (AutoCloseableLock lock = datasetReadLock.acquire()) {
|
||||
try (AutoCloseableLock lock = datasetLock.acquire()) {
|
||||
ReplicaInfo r = volumeMap.get(bpid, blkid);
|
||||
if (r == null) {
|
||||
return null;
|
||||
|
@ -201,16 +202,12 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
* The deepCopyReplica call doesn't use the datasetock since it will lead the
|
||||
* potential deadlock with the {@link FsVolumeList#addBlockPool} call.
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public Set<? extends Replica> deepCopyReplica(String bpid)
|
||||
throws IOException {
|
||||
Set<? extends Replica> replicas;
|
||||
try (AutoCloseableLock lock = datasetReadLock.acquire()) {
|
||||
replicas =
|
||||
new HashSet<>(volumeMap.replicas(bpid) == null ? Collections.EMPTY_SET
|
||||
: volumeMap.replicas(bpid));
|
||||
}
|
||||
Set<? extends Replica> replicas =
|
||||
new HashSet<>(volumeMap.replicas(bpid) == null ? Collections.EMPTY_SET
|
||||
: volumeMap.replicas(bpid));
|
||||
return Collections.unmodifiableSet(replicas);
|
||||
}
|
||||
|
||||
|
@ -271,12 +268,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
private final int maxDataLength;
|
||||
|
||||
@VisibleForTesting
|
||||
final AutoCloseableLock datasetWriteLock;
|
||||
@VisibleForTesting
|
||||
final AutoCloseableLock datasetReadLock;
|
||||
@VisibleForTesting
|
||||
final InstrumentedReadWriteLock datasetRWLock;
|
||||
private final Condition datasetWriteLockCondition;
|
||||
final AutoCloseableLock datasetLock;
|
||||
private final Condition datasetLockCondition;
|
||||
private static String blockPoolId = "";
|
||||
|
||||
/**
|
||||
|
@ -289,33 +282,15 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
this.dataStorage = storage;
|
||||
this.conf = conf;
|
||||
this.smallBufferSize = DFSUtilClient.getSmallBufferSize(conf);
|
||||
this.datasetRWLock = new InstrumentedReadWriteLock(
|
||||
conf.getBoolean(DFSConfigKeys.DFS_DATANODE_LOCK_FAIR_KEY,
|
||||
DFSConfigKeys.DFS_DATANODE_LOCK_FAIR_DEFAULT),
|
||||
"FsDatasetRWLock", LOG, conf.getTimeDuration(
|
||||
DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_KEY,
|
||||
DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_DEFAULT,
|
||||
TimeUnit.MILLISECONDS),
|
||||
conf.getTimeDuration(
|
||||
DFSConfigKeys.DFS_DATANODE_LOCK_REPORTING_THRESHOLD_MS_KEY,
|
||||
DFSConfigKeys.DFS_DATANODE_LOCK_REPORTING_THRESHOLD_MS_DEFAULT,
|
||||
TimeUnit.MILLISECONDS));
|
||||
this.datasetWriteLock = new AutoCloseableLock(datasetRWLock.writeLock());
|
||||
boolean enableRL = conf.getBoolean(
|
||||
DFSConfigKeys.DFS_DATANODE_LOCK_READ_WRITE_ENABLED_KEY,
|
||||
DFSConfigKeys.DFS_DATANODE_LOCK_READ_WRITE_ENABLED_DEFAULT);
|
||||
// The read lock can be disabled by the above config key. If it is disabled
|
||||
// then we simply make the both the read and write lock variables hold
|
||||
// the write lock. All accesses to the lock are via these variables, so that
|
||||
// effectively disables the read lock.
|
||||
if (enableRL) {
|
||||
LOG.info("The datanode lock is a read write lock");
|
||||
this.datasetReadLock = new AutoCloseableLock(datasetRWLock.readLock());
|
||||
} else {
|
||||
LOG.info("The datanode lock is an exclusive write lock");
|
||||
this.datasetReadLock = this.datasetWriteLock;
|
||||
}
|
||||
this.datasetWriteLockCondition = datasetWriteLock.newCondition();
|
||||
this.datasetLock = new AutoCloseableLock(
|
||||
new InstrumentedLock(getClass().getName(), LOG,
|
||||
new ReentrantLock(true),
|
||||
conf.getTimeDuration(
|
||||
DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_KEY,
|
||||
DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_DEFAULT,
|
||||
TimeUnit.MILLISECONDS),
|
||||
300));
|
||||
this.datasetLockCondition = datasetLock.newCondition();
|
||||
|
||||
// The number of volumes required for operation is the total number
|
||||
// of volumes minus the number of failed volumes we can tolerate.
|
||||
|
@ -354,7 +329,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
}
|
||||
|
||||
storageMap = new ConcurrentHashMap<String, DatanodeStorage>();
|
||||
volumeMap = new ReplicaMap(datasetReadLock, datasetWriteLock);
|
||||
volumeMap = new ReplicaMap(datasetLock);
|
||||
ramDiskReplicaTracker = RamDiskReplicaTracker.getInstance(conf, this);
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
|
@ -408,12 +383,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
|
||||
@Override
|
||||
public AutoCloseableLock acquireDatasetLock() {
|
||||
return datasetWriteLock.acquire();
|
||||
}
|
||||
|
||||
@Override
|
||||
public AutoCloseableLock acquireDatasetReadLock() {
|
||||
return datasetReadLock.acquire();
|
||||
return datasetLock.acquire();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -454,7 +424,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
ReplicaMap replicaMap,
|
||||
Storage.StorageDirectory sd, StorageType storageType,
|
||||
FsVolumeReference ref) throws IOException {
|
||||
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
|
||||
try (AutoCloseableLock lock = datasetLock.acquire()) {
|
||||
DatanodeStorage dnStorage = storageMap.get(sd.getStorageUuid());
|
||||
if (dnStorage != null) {
|
||||
final String errorMsg = String.format(
|
||||
|
@ -487,8 +457,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
.setConf(this.conf)
|
||||
.build();
|
||||
FsVolumeReference ref = fsVolume.obtainReference();
|
||||
ReplicaMap tempVolumeMap =
|
||||
new ReplicaMap(datasetReadLock, datasetWriteLock);
|
||||
ReplicaMap tempVolumeMap = new ReplicaMap(datasetLock);
|
||||
fsVolume.getVolumeMap(tempVolumeMap, ramDiskReplicaTracker);
|
||||
|
||||
activateVolume(tempVolumeMap, sd, storageLocation.getStorageType(), ref);
|
||||
|
@ -527,8 +496,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
StorageType storageType = location.getStorageType();
|
||||
final FsVolumeImpl fsVolume =
|
||||
createFsVolume(sd.getStorageUuid(), sd, location);
|
||||
final ReplicaMap tempVolumeMap =
|
||||
new ReplicaMap(datasetReadLock, datasetWriteLock);
|
||||
final ReplicaMap tempVolumeMap = new ReplicaMap(new AutoCloseableLock());
|
||||
ArrayList<IOException> exceptions = Lists.newArrayList();
|
||||
|
||||
for (final NamespaceInfo nsInfo : nsInfos) {
|
||||
|
@ -573,7 +541,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
new ArrayList<>(storageLocsToRemove);
|
||||
Map<String, List<ReplicaInfo>> blkToInvalidate = new HashMap<>();
|
||||
List<String> storageToRemove = new ArrayList<>();
|
||||
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
|
||||
try (AutoCloseableLock lock = datasetLock.acquire()) {
|
||||
for (int idx = 0; idx < dataStorage.getNumStorageDirs(); idx++) {
|
||||
Storage.StorageDirectory sd = dataStorage.getStorageDir(idx);
|
||||
final StorageLocation sdLocation = sd.getStorageLocation();
|
||||
|
@ -585,7 +553,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
// Disable the volume from the service.
|
||||
asyncDiskService.removeVolume(sd.getStorageUuid());
|
||||
volumes.removeVolume(sdLocation, clearFailure);
|
||||
volumes.waitVolumeRemoved(5000, datasetWriteLockCondition);
|
||||
volumes.waitVolumeRemoved(5000, datasetLockCondition);
|
||||
|
||||
// Removed all replica information for the blocks on the volume.
|
||||
// Unlike updating the volumeMap in addVolume(), this operation does
|
||||
|
@ -632,7 +600,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
}
|
||||
}
|
||||
|
||||
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
|
||||
try (AutoCloseableLock lock = datasetLock.acquire()) {
|
||||
for(String storageUuid : storageToRemove) {
|
||||
storageMap.remove(storageUuid);
|
||||
}
|
||||
|
@ -823,7 +791,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
long seekOffset) throws IOException {
|
||||
|
||||
ReplicaInfo info;
|
||||
try (AutoCloseableLock lock = datasetReadLock.acquire()) {
|
||||
try (AutoCloseableLock lock = datasetLock.acquire()) {
|
||||
info = volumeMap.get(b.getBlockPoolId(), b.getLocalBlock());
|
||||
}
|
||||
|
||||
|
@ -911,7 +879,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
@Override // FsDatasetSpi
|
||||
public ReplicaInputStreams getTmpInputStreams(ExtendedBlock b,
|
||||
long blkOffset, long metaOffset) throws IOException {
|
||||
try (AutoCloseableLock lock = datasetReadLock.acquire()) {
|
||||
try (AutoCloseableLock lock = datasetLock.acquire()) {
|
||||
ReplicaInfo info = getReplicaInfo(b);
|
||||
FsVolumeReference ref = info.getVolume().obtainReference();
|
||||
try {
|
||||
|
@ -1036,7 +1004,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
}
|
||||
|
||||
FsVolumeReference volumeRef = null;
|
||||
try (AutoCloseableLock lock = datasetReadLock.acquire()) {
|
||||
try (AutoCloseableLock lock = datasetLock.acquire()) {
|
||||
volumeRef = volumes.getNextVolume(targetStorageType, targetStorageId,
|
||||
block.getNumBytes());
|
||||
}
|
||||
|
@ -1150,7 +1118,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
|
||||
FsVolumeReference volumeRef = null;
|
||||
|
||||
try (AutoCloseableLock lock = datasetReadLock.acquire()) {
|
||||
try (AutoCloseableLock lock = datasetLock.acquire()) {
|
||||
volumeRef = destination.obtainReference();
|
||||
}
|
||||
|
||||
|
@ -1238,7 +1206,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
@Override // FsDatasetSpi
|
||||
public ReplicaHandler append(ExtendedBlock b,
|
||||
long newGS, long expectedBlockLen) throws IOException {
|
||||
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
|
||||
try (AutoCloseableLock lock = datasetLock.acquire()) {
|
||||
// If the block was successfully finalized because all packets
|
||||
// were successfully processed at the Datanode but the ack for
|
||||
// some of the packets were not received by the client. The client
|
||||
|
@ -1290,7 +1258,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
private ReplicaInPipeline append(String bpid,
|
||||
ReplicaInfo replicaInfo, long newGS, long estimateBlockLen)
|
||||
throws IOException {
|
||||
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
|
||||
try (AutoCloseableLock lock = datasetLock.acquire()) {
|
||||
// If the block is cached, start uncaching it.
|
||||
if (replicaInfo.getState() != ReplicaState.FINALIZED) {
|
||||
throw new IOException("Only a Finalized replica can be appended to; "
|
||||
|
@ -1386,7 +1354,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
|
||||
while (true) {
|
||||
try {
|
||||
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
|
||||
try (AutoCloseableLock lock = datasetLock.acquire()) {
|
||||
ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen);
|
||||
FsVolumeReference ref = replicaInfo.getVolume().obtainReference();
|
||||
ReplicaInPipeline replica;
|
||||
|
@ -1418,7 +1386,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
LOG.info("Recover failed close " + b);
|
||||
while (true) {
|
||||
try {
|
||||
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
|
||||
try (AutoCloseableLock lock = datasetLock.acquire()) {
|
||||
// check replica's state
|
||||
ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen);
|
||||
// bump the replica's GS
|
||||
|
@ -1440,7 +1408,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
public ReplicaHandler createRbw(
|
||||
StorageType storageType, String storageId, ExtendedBlock b,
|
||||
boolean allowLazyPersist) throws IOException {
|
||||
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
|
||||
try (AutoCloseableLock lock = datasetLock.acquire()) {
|
||||
ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
|
||||
b.getBlockId());
|
||||
if (replicaInfo != null) {
|
||||
|
@ -1511,7 +1479,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
|
||||
while (true) {
|
||||
try {
|
||||
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
|
||||
try (AutoCloseableLock lock = datasetLock.acquire()) {
|
||||
ReplicaInfo replicaInfo =
|
||||
getReplicaInfo(b.getBlockPoolId(), b.getBlockId());
|
||||
// check the replica's state
|
||||
|
@ -1536,7 +1504,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
private ReplicaHandler recoverRbwImpl(ReplicaInPipeline rbw,
|
||||
ExtendedBlock b, long newGS, long minBytesRcvd, long maxBytesRcvd)
|
||||
throws IOException {
|
||||
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
|
||||
try (AutoCloseableLock lock = datasetLock.acquire()) {
|
||||
// check generation stamp
|
||||
long replicaGenerationStamp = rbw.getGenerationStamp();
|
||||
if (replicaGenerationStamp < b.getGenerationStamp() ||
|
||||
|
@ -1597,7 +1565,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
public ReplicaInPipeline convertTemporaryToRbw(
|
||||
final ExtendedBlock b) throws IOException {
|
||||
|
||||
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
|
||||
try (AutoCloseableLock lock = datasetLock.acquire()) {
|
||||
final long blockId = b.getBlockId();
|
||||
final long expectedGs = b.getGenerationStamp();
|
||||
final long visible = b.getNumBytes();
|
||||
|
@ -1671,7 +1639,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
ReplicaInfo lastFoundReplicaInfo = null;
|
||||
boolean isInPipeline = false;
|
||||
do {
|
||||
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
|
||||
try (AutoCloseableLock lock = datasetLock.acquire()) {
|
||||
ReplicaInfo currentReplicaInfo =
|
||||
volumeMap.get(b.getBlockPoolId(), b.getBlockId());
|
||||
if (currentReplicaInfo == lastFoundReplicaInfo) {
|
||||
|
@ -1724,7 +1692,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
invalidate(b.getBlockPoolId(), new Block[] { lastFoundReplicaInfo },
|
||||
false);
|
||||
}
|
||||
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
|
||||
try (AutoCloseableLock lock = datasetLock.acquire()) {
|
||||
FsVolumeReference ref = volumes.getNextVolume(storageType, storageId, b
|
||||
.getNumBytes());
|
||||
FsVolumeImpl v = (FsVolumeImpl) ref.getVolume();
|
||||
|
@ -1775,7 +1743,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
throws IOException {
|
||||
ReplicaInfo replicaInfo = null;
|
||||
ReplicaInfo finalizedReplicaInfo = null;
|
||||
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
|
||||
try (AutoCloseableLock lock = datasetLock.acquire()) {
|
||||
if (Thread.interrupted()) {
|
||||
// Don't allow data modifications from interrupted threads
|
||||
throw new IOException("Cannot finalize block from Interrupted Thread");
|
||||
|
@ -1806,7 +1774,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
|
||||
private ReplicaInfo finalizeReplica(String bpid, ReplicaInfo replicaInfo)
|
||||
throws IOException {
|
||||
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
|
||||
try (AutoCloseableLock lock = datasetLock.acquire()) {
|
||||
// Compare generation stamp of old and new replica before finalizing
|
||||
if (volumeMap.get(bpid, replicaInfo.getBlockId()).getGenerationStamp()
|
||||
> replicaInfo.getGenerationStamp()) {
|
||||
|
@ -1851,7 +1819,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
*/
|
||||
@Override // FsDatasetSpi
|
||||
public void unfinalizeBlock(ExtendedBlock b) throws IOException {
|
||||
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
|
||||
try (AutoCloseableLock lock = datasetLock.acquire()) {
|
||||
ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
|
||||
b.getLocalBlock());
|
||||
if (replicaInfo != null &&
|
||||
|
@ -1904,7 +1872,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
new HashMap<String, BlockListAsLongs.Builder>();
|
||||
|
||||
List<FsVolumeImpl> curVolumes = null;
|
||||
try (AutoCloseableLock lock = datasetReadLock.acquire()) {
|
||||
try (AutoCloseableLock lock = datasetLock.acquire()) {
|
||||
curVolumes = volumes.getVolumes();
|
||||
for (FsVolumeSpi v : curVolumes) {
|
||||
builders.put(v.getStorageID(), BlockListAsLongs.builder(maxDataLength));
|
||||
|
@ -1959,7 +1927,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
* Gets a list of references to the finalized blocks for the given block pool.
|
||||
* <p>
|
||||
* Callers of this function should call
|
||||
* {@link FsDatasetSpi#acquireDatasetLock()} to avoid blocks' status being
|
||||
* {@link FsDatasetSpi#acquireDatasetLock} to avoid blocks' status being
|
||||
* changed during list iteration.
|
||||
* </p>
|
||||
* @return a list of references to the finalized blocks for the given block
|
||||
|
@ -1967,7 +1935,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
*/
|
||||
@Override
|
||||
public List<ReplicaInfo> getFinalizedBlocks(String bpid) {
|
||||
try (AutoCloseableLock lock = datasetReadLock.acquire()) {
|
||||
try (AutoCloseableLock lock = datasetLock.acquire()) {
|
||||
final List<ReplicaInfo> finalized = new ArrayList<ReplicaInfo>(
|
||||
volumeMap.size(bpid));
|
||||
for (ReplicaInfo b : volumeMap.replicas(bpid)) {
|
||||
|
@ -2060,7 +2028,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
ReplicaInfo validateBlockFile(String bpid, long blockId) {
|
||||
//Should we check for metadata file too?
|
||||
final ReplicaInfo r;
|
||||
r = volumeMap.get(bpid, blockId);
|
||||
try (AutoCloseableLock lock = datasetLock.acquire()) {
|
||||
r = volumeMap.get(bpid, blockId);
|
||||
}
|
||||
if (r != null) {
|
||||
if (r.blockDataExists()) {
|
||||
return r;
|
||||
|
@ -2109,7 +2079,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
for (int i = 0; i < invalidBlks.length; i++) {
|
||||
final ReplicaInfo removing;
|
||||
final FsVolumeImpl v;
|
||||
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
|
||||
try (AutoCloseableLock lock = datasetLock.acquire()) {
|
||||
final ReplicaInfo info = volumeMap.get(bpid, invalidBlks[i]);
|
||||
if (info == null) {
|
||||
ReplicaInfo infoByBlockId =
|
||||
|
@ -2235,7 +2205,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
long length, genstamp;
|
||||
Executor volumeExecutor;
|
||||
|
||||
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
|
||||
try (AutoCloseableLock lock = datasetLock.acquire()) {
|
||||
ReplicaInfo info = volumeMap.get(bpid, blockId);
|
||||
boolean success = false;
|
||||
try {
|
||||
|
@ -2303,7 +2273,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
|
||||
@Override // FsDatasetSpi
|
||||
public boolean contains(final ExtendedBlock block) {
|
||||
try (AutoCloseableLock lock = datasetReadLock.acquire()) {
|
||||
try (AutoCloseableLock lock = datasetLock.acquire()) {
|
||||
final long blockId = block.getLocalBlock().getBlockId();
|
||||
final String bpid = block.getBlockPoolId();
|
||||
final ReplicaInfo r = volumeMap.get(bpid, blockId);
|
||||
|
@ -2423,7 +2393,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
|
||||
Block corruptBlock = null;
|
||||
ReplicaInfo memBlockInfo;
|
||||
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
|
||||
try (AutoCloseableLock lock = datasetLock.acquire()) {
|
||||
memBlockInfo = volumeMap.get(bpid, blockId);
|
||||
if (memBlockInfo != null &&
|
||||
memBlockInfo.getState() != ReplicaState.FINALIZED) {
|
||||
|
@ -2624,7 +2594,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
|
||||
@Override
|
||||
public String getReplicaString(String bpid, long blockId) {
|
||||
try (AutoCloseableLock lock = datasetReadLock.acquire()) {
|
||||
try (AutoCloseableLock lock = datasetLock.acquire()) {
|
||||
final Replica r = volumeMap.get(bpid, blockId);
|
||||
return r == null ? "null" : r.toString();
|
||||
}
|
||||
|
@ -2731,7 +2701,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
final long recoveryId,
|
||||
final long newBlockId,
|
||||
final long newlength) throws IOException {
|
||||
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
|
||||
try (AutoCloseableLock lock = datasetLock.acquire()) {
|
||||
//get replica
|
||||
final String bpid = oldBlock.getBlockPoolId();
|
||||
final ReplicaInfo replica = volumeMap.get(bpid, oldBlock.getBlockId());
|
||||
|
@ -2844,7 +2814,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
@Override // FsDatasetSpi
|
||||
public long getReplicaVisibleLength(final ExtendedBlock block)
|
||||
throws IOException {
|
||||
try (AutoCloseableLock lock = datasetReadLock.acquire()) {
|
||||
try (AutoCloseableLock lock = datasetLock.acquire()) {
|
||||
final Replica replica = getReplicaInfo(block.getBlockPoolId(),
|
||||
block.getBlockId());
|
||||
if (replica.getGenerationStamp() < block.getGenerationStamp()) {
|
||||
|
@ -2861,7 +2831,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
throws IOException {
|
||||
LOG.info("Adding block pool " + bpid);
|
||||
AddBlockPoolException volumeExceptions = new AddBlockPoolException();
|
||||
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
|
||||
try (AutoCloseableLock lock = datasetLock.acquire()) {
|
||||
try {
|
||||
volumes.addBlockPool(bpid, conf);
|
||||
} catch (AddBlockPoolException e) {
|
||||
|
@ -2891,7 +2861,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
|
||||
@Override
|
||||
public void shutdownBlockPool(String bpid) {
|
||||
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
|
||||
try (AutoCloseableLock lock = datasetLock.acquire()) {
|
||||
LOG.info("Removing block pool " + bpid);
|
||||
Map<DatanodeStorage, BlockListAsLongs> blocksPerVolume
|
||||
= getBlockReports(bpid);
|
||||
|
@ -2965,7 +2935,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
@Override //FsDatasetSpi
|
||||
public void deleteBlockPool(String bpid, boolean force)
|
||||
throws IOException {
|
||||
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
|
||||
try (AutoCloseableLock lock = datasetLock.acquire()) {
|
||||
List<FsVolumeImpl> curVolumes = volumes.getVolumes();
|
||||
if (!force) {
|
||||
for (FsVolumeImpl volume : curVolumes) {
|
||||
|
@ -2994,20 +2964,18 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
@Override // FsDatasetSpi
|
||||
public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block)
|
||||
throws IOException {
|
||||
try (AutoCloseableLock lock = datasetReadLock.acquire()) {
|
||||
try (AutoCloseableLock lock = datasetLock.acquire()) {
|
||||
final Replica replica = volumeMap.get(block.getBlockPoolId(),
|
||||
block.getBlockId());
|
||||
if (replica == null) {
|
||||
throw new ReplicaNotFoundException(block);
|
||||
}
|
||||
synchronized(replica) {
|
||||
if (replica.getGenerationStamp() < block.getGenerationStamp()) {
|
||||
throw new IOException(
|
||||
"Replica generation stamp < block generation stamp, block="
|
||||
+ block + ", replica=" + replica);
|
||||
} else if (replica.getGenerationStamp() > block.getGenerationStamp()) {
|
||||
block.setGenerationStamp(replica.getGenerationStamp());
|
||||
}
|
||||
if (replica.getGenerationStamp() < block.getGenerationStamp()) {
|
||||
throw new IOException(
|
||||
"Replica generation stamp < block generation stamp, block="
|
||||
+ block + ", replica=" + replica);
|
||||
} else if (replica.getGenerationStamp() > block.getGenerationStamp()) {
|
||||
block.setGenerationStamp(replica.getGenerationStamp());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -3048,7 +3016,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
@Override
|
||||
public void onCompleteLazyPersist(String bpId, long blockId,
|
||||
long creationTime, File[] savedFiles, FsVolumeImpl targetVolume) {
|
||||
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
|
||||
try (AutoCloseableLock lock = datasetLock.acquire()) {
|
||||
ramDiskReplicaTracker.recordEndLazyPersist(bpId, blockId, savedFiles);
|
||||
|
||||
targetVolume.incDfsUsedAndNumBlocks(bpId, savedFiles[0].length()
|
||||
|
@ -3182,7 +3150,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
try {
|
||||
block = ramDiskReplicaTracker.dequeueNextReplicaToPersist();
|
||||
if (block != null) {
|
||||
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
|
||||
try (AutoCloseableLock lock = datasetLock.acquire()) {
|
||||
replicaInfo = volumeMap.get(block.getBlockPoolId(), block.getBlockId());
|
||||
|
||||
// If replicaInfo is null, the block was either deleted before
|
||||
|
@ -3249,7 +3217,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
ReplicaInfo replicaInfo, newReplicaInfo;
|
||||
final String bpid = replicaState.getBlockPoolId();
|
||||
|
||||
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
|
||||
try (AutoCloseableLock lock = datasetLock.acquire()) {
|
||||
replicaInfo = getReplicaInfo(replicaState.getBlockPoolId(),
|
||||
replicaState.getBlockId());
|
||||
Preconditions.checkState(replicaInfo.getVolume().isTransientStorage());
|
||||
|
@ -3422,7 +3390,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
}
|
||||
|
||||
void stopAllDataxceiverThreads(FsVolumeImpl volume) {
|
||||
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
|
||||
try (AutoCloseableLock lock = datasetLock.acquire()) {
|
||||
for (String bpid : volumeMap.getBlockPoolList()) {
|
||||
Collection<ReplicaInfo> replicas = volumeMap.replicas(bpid);
|
||||
for (ReplicaInfo replicaInfo : replicas) {
|
||||
|
|
|
@ -28,7 +28,6 @@ import java.util.Set;
|
|||
import java.util.Map.Entry;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -54,10 +53,9 @@ import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult;
|
|||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||
import org.apache.hadoop.hdfs.server.datanode.FileIoProvider;
|
||||
import org.apache.hadoop.hdfs.server.datanode.ReplicaBuilder;
|
||||
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
|
||||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.apache.hadoop.util.Timer;
|
||||
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
|
||||
import org.apache.hadoop.util.AutoCloseableLock;
|
||||
import org.codehaus.jackson.annotate.JsonProperty;
|
||||
import org.codehaus.jackson.map.ObjectMapper;
|
||||
import org.codehaus.jackson.map.ObjectReader;
|
||||
|
@ -65,6 +63,8 @@ import org.codehaus.jackson.map.ObjectWriter;
|
|||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
import org.apache.hadoop.util.Time;
|
||||
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PROVIDED_ALIASMAP_LOAD_RETRIES;
|
||||
|
||||
|
@ -135,7 +135,7 @@ class ProvidedVolumeImpl extends FsVolumeImpl {
|
|||
ProvidedBlockPoolSlice(String bpid, ProvidedVolumeImpl volume,
|
||||
Configuration conf) {
|
||||
this.providedVolume = volume;
|
||||
bpVolumeMap = new ReplicaMap(new ReentrantReadWriteLock());
|
||||
bpVolumeMap = new ReplicaMap(new AutoCloseableLock());
|
||||
Class<? extends BlockAliasMap> fmt =
|
||||
conf.getClass(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_CLASS,
|
||||
TextFileRegionAliasMap.class, BlockAliasMap.class);
|
||||
|
|
|
@ -20,7 +20,6 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
|
|||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.locks.ReadWriteLock;
|
||||
|
||||
import org.apache.hadoop.HadoopIllegalArgumentException;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
|
@ -33,29 +32,22 @@ import org.apache.hadoop.util.AutoCloseableLock;
|
|||
*/
|
||||
class ReplicaMap {
|
||||
// Lock object to synchronize this instance.
|
||||
private final AutoCloseableLock readLock;
|
||||
private final AutoCloseableLock writeLock;
|
||||
private final AutoCloseableLock lock;
|
||||
|
||||
// Map of block pool Id to another map of block Id to ReplicaInfo.
|
||||
private final Map<String, LightWeightResizableGSet<Block, ReplicaInfo>> map =
|
||||
new HashMap<>();
|
||||
|
||||
ReplicaMap(AutoCloseableLock rLock, AutoCloseableLock wLock) {
|
||||
if (rLock == null || wLock == null) {
|
||||
ReplicaMap(AutoCloseableLock lock) {
|
||||
if (lock == null) {
|
||||
throw new HadoopIllegalArgumentException(
|
||||
"Lock to synchronize on cannot be null");
|
||||
}
|
||||
this.readLock = rLock;
|
||||
this.writeLock = wLock;
|
||||
}
|
||||
|
||||
ReplicaMap(ReadWriteLock lock) {
|
||||
this(new AutoCloseableLock(lock.readLock()),
|
||||
new AutoCloseableLock(lock.writeLock()));
|
||||
this.lock = lock;
|
||||
}
|
||||
|
||||
String[] getBlockPoolList() {
|
||||
try (AutoCloseableLock l = readLock.acquire()) {
|
||||
try (AutoCloseableLock l = lock.acquire()) {
|
||||
return map.keySet().toArray(new String[map.keySet().size()]);
|
||||
}
|
||||
}
|
||||
|
@ -100,7 +92,7 @@ class ReplicaMap {
|
|||
*/
|
||||
ReplicaInfo get(String bpid, long blockId) {
|
||||
checkBlockPool(bpid);
|
||||
try (AutoCloseableLock l = readLock.acquire()) {
|
||||
try (AutoCloseableLock l = lock.acquire()) {
|
||||
LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
|
||||
return m != null ? m.get(new Block(blockId)) : null;
|
||||
}
|
||||
|
@ -117,7 +109,7 @@ class ReplicaMap {
|
|||
ReplicaInfo add(String bpid, ReplicaInfo replicaInfo) {
|
||||
checkBlockPool(bpid);
|
||||
checkBlock(replicaInfo);
|
||||
try (AutoCloseableLock l = writeLock.acquire()) {
|
||||
try (AutoCloseableLock l = lock.acquire()) {
|
||||
LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
|
||||
if (m == null) {
|
||||
// Add an entry for block pool if it does not exist already
|
||||
|
@ -135,7 +127,7 @@ class ReplicaMap {
|
|||
ReplicaInfo addAndGet(String bpid, ReplicaInfo replicaInfo) {
|
||||
checkBlockPool(bpid);
|
||||
checkBlock(replicaInfo);
|
||||
try (AutoCloseableLock l = writeLock.acquire()) {
|
||||
try (AutoCloseableLock l = lock.acquire()) {
|
||||
LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
|
||||
if (m == null) {
|
||||
// Add an entry for block pool if it does not exist already
|
||||
|
@ -184,7 +176,7 @@ class ReplicaMap {
|
|||
ReplicaInfo remove(String bpid, Block block) {
|
||||
checkBlockPool(bpid);
|
||||
checkBlock(block);
|
||||
try (AutoCloseableLock l = writeLock.acquire()) {
|
||||
try (AutoCloseableLock l = lock.acquire()) {
|
||||
LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
|
||||
if (m != null) {
|
||||
ReplicaInfo replicaInfo = m.get(block);
|
||||
|
@ -206,7 +198,7 @@ class ReplicaMap {
|
|||
*/
|
||||
ReplicaInfo remove(String bpid, long blockId) {
|
||||
checkBlockPool(bpid);
|
||||
try (AutoCloseableLock l = writeLock.acquire()) {
|
||||
try (AutoCloseableLock l = lock.acquire()) {
|
||||
LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
|
||||
if (m != null) {
|
||||
return m.remove(new Block(blockId));
|
||||
|
@ -221,7 +213,7 @@ class ReplicaMap {
|
|||
* @return the number of replicas in the map
|
||||
*/
|
||||
int size(String bpid) {
|
||||
try (AutoCloseableLock l = readLock.acquire()) {
|
||||
try (AutoCloseableLock l = lock.acquire()) {
|
||||
LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
|
||||
return m != null ? m.size() : 0;
|
||||
}
|
||||
|
@ -245,7 +237,7 @@ class ReplicaMap {
|
|||
|
||||
void initBlockPool(String bpid) {
|
||||
checkBlockPool(bpid);
|
||||
try (AutoCloseableLock l = writeLock.acquire()) {
|
||||
try (AutoCloseableLock l = lock.acquire()) {
|
||||
LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
|
||||
if (m == null) {
|
||||
// Add an entry for block pool if it does not exist already
|
||||
|
@ -257,7 +249,7 @@ class ReplicaMap {
|
|||
|
||||
void cleanUpBlockPool(String bpid) {
|
||||
checkBlockPool(bpid);
|
||||
try (AutoCloseableLock l = writeLock.acquire()) {
|
||||
try (AutoCloseableLock l = lock.acquire()) {
|
||||
map.remove(bpid);
|
||||
}
|
||||
}
|
||||
|
@ -267,16 +259,6 @@ class ReplicaMap {
|
|||
* @return lock object
|
||||
*/
|
||||
AutoCloseableLock getLock() {
|
||||
return writeLock;
|
||||
return lock;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the lock object used for synchronizing the ReplicasMap for read only
|
||||
* operations.
|
||||
* @return The read lock object
|
||||
*/
|
||||
AutoCloseableLock getReadLock() {
|
||||
return readLock;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -2999,40 +2999,6 @@
|
|||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.datanode.lock.fair</name>
|
||||
<value>true</value>
|
||||
<description>If this is true, the Datanode FsDataset lock will be used in Fair
|
||||
mode, which will help to prevent writer threads from being starved, but can
|
||||
lower lock throughput. See java.util.concurrent.locks.ReentrantReadWriteLock
|
||||
for more information on fair/non-fair locks.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.datanode.lock.read.write.enabled</name>
|
||||
<value>true</value>
|
||||
<description>If this is true, the FsDataset lock will be a read write lock. If
|
||||
it is false, all locks will be a write lock.
|
||||
Enabling this should give better datanode throughput, as many read only
|
||||
functions can run concurrently under the read lock, when they would
|
||||
previously have required the exclusive write lock. As the feature is
|
||||
experimental, this switch can be used to disable the shared read lock, and
|
||||
cause all lock acquisitions to use the exclusive write lock.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.datanode.lock-reporting-threshold-ms</name>
|
||||
<value>300</value>
|
||||
<description>When thread waits to obtain a lock, or a thread holds a lock for
|
||||
more than the threshold, a log message will be written. Note that
|
||||
dfs.lock.suppress.warning.interval ensures a single log message is
|
||||
emitted per interval for waiting threads and a single message for holding
|
||||
threads to avoid excessive logging.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.namenode.startup.delay.block.deletion.sec</name>
|
||||
<value>0</value>
|
||||
|
|
|
@ -1572,12 +1572,6 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
|
|||
return datasetLock.acquire();
|
||||
}
|
||||
|
||||
@Override
|
||||
public AutoCloseableLock acquireDatasetReadLock() {
|
||||
// No RW lock implementation in simulated dataset currently.
|
||||
return datasetLock.acquire();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<? extends Replica> deepCopyReplica(String bpid)
|
||||
throws IOException {
|
||||
|
|
|
@ -455,11 +455,6 @@ public class ExternalDatasetImpl implements FsDatasetSpi<ExternalVolumeImpl> {
|
|||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public AutoCloseableLock acquireDatasetReadLock() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<? extends Replica> deepCopyReplica(String bpid)
|
||||
throws IOException {
|
||||
|
|
|
@ -434,7 +434,7 @@ public class FsDatasetImplTestUtils implements FsDatasetTestUtils {
|
|||
@Override
|
||||
public Iterator<Replica> getStoredReplicas(String bpid) throws IOException {
|
||||
// Reload replicas from the disk.
|
||||
ReplicaMap replicaMap = new ReplicaMap(dataset.datasetRWLock);
|
||||
ReplicaMap replicaMap = new ReplicaMap(dataset.datasetLock);
|
||||
try (FsVolumeReferences refs = dataset.getFsVolumeReferences()) {
|
||||
for (FsVolumeSpi vol : refs) {
|
||||
FsVolumeImpl volume = (FsVolumeImpl) vol;
|
||||
|
|
|
@ -17,10 +17,10 @@
|
|||
*/
|
||||
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
|
||||
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.function.Supplier;
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
import java.io.OutputStream;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Paths;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
|
@ -65,7 +65,6 @@ import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
|||
import org.apache.hadoop.io.MultipleIOException;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.test.LambdaTestUtils;
|
||||
import org.apache.hadoop.util.AutoCloseableLock;
|
||||
import org.apache.hadoop.util.FakeTimer;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.junit.Assert;
|
||||
|
@ -86,7 +85,6 @@ import java.util.HashSet;
|
|||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DN_CACHED_DFSUSED_CHECK_INTERVAL_MS;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY;
|
||||
|
@ -198,118 +196,6 @@ public class TestFsDatasetImpl {
|
|||
assertEquals(0, dataset.getNumFailedVolumes());
|
||||
}
|
||||
|
||||
@Test(timeout=10000)
|
||||
public void testReadLockEnabledByDefault()
|
||||
throws Exception {
|
||||
final FsDatasetSpi ds = dataset;
|
||||
AtomicBoolean accessed = new AtomicBoolean(false);
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
CountDownLatch waiterLatch = new CountDownLatch(1);
|
||||
|
||||
Thread holder = new Thread() {
|
||||
public void run() {
|
||||
try (AutoCloseableLock l = ds.acquireDatasetReadLock()) {
|
||||
latch.countDown();
|
||||
// wait for the waiter thread to access the lock.
|
||||
waiterLatch.await();
|
||||
} catch (Exception e) {
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
Thread waiter = new Thread() {
|
||||
public void run() {
|
||||
try {
|
||||
latch.await();
|
||||
} catch (InterruptedException e) {
|
||||
waiterLatch.countDown();
|
||||
return;
|
||||
}
|
||||
try (AutoCloseableLock l = ds.acquireDatasetReadLock()) {
|
||||
accessed.getAndSet(true);
|
||||
// signal the holder thread.
|
||||
waiterLatch.countDown();
|
||||
} catch (Exception e) {
|
||||
}
|
||||
}
|
||||
};
|
||||
waiter.start();
|
||||
holder.start();
|
||||
holder.join();
|
||||
waiter.join();
|
||||
// The holder thread is still holding the lock, but the waiter can still
|
||||
// run as the lock is a shared read lock.
|
||||
// Otherwise test will timeout with deadlock.
|
||||
assertEquals(true, accessed.get());
|
||||
holder.interrupt();
|
||||
}
|
||||
|
||||
@Test(timeout=20000)
|
||||
public void testReadLockCanBeDisabledByConfig()
|
||||
throws Exception {
|
||||
HdfsConfiguration conf = new HdfsConfiguration();
|
||||
conf.setBoolean(
|
||||
DFSConfigKeys.DFS_DATANODE_LOCK_READ_WRITE_ENABLED_KEY, false);
|
||||
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
|
||||
.numDataNodes(1).build();
|
||||
try {
|
||||
AtomicBoolean accessed = new AtomicBoolean(false);
|
||||
cluster.waitActive();
|
||||
DataNode dn = cluster.getDataNodes().get(0);
|
||||
final FsDatasetSpi<?> ds = DataNodeTestUtils.getFSDataset(dn);
|
||||
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
CountDownLatch waiterLatch = new CountDownLatch(1);
|
||||
Thread holder = new Thread() {
|
||||
public void run() {
|
||||
try (AutoCloseableLock l = ds.acquireDatasetReadLock()) {
|
||||
latch.countDown();
|
||||
// wait for the waiter thread to access the lock.
|
||||
waiterLatch.await();
|
||||
} catch (Exception e) {
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
Thread waiter = new Thread() {
|
||||
public void run() {
|
||||
try {
|
||||
// Wait for holder to get ds read lock.
|
||||
latch.await();
|
||||
} catch (InterruptedException e) {
|
||||
waiterLatch.countDown();
|
||||
return;
|
||||
}
|
||||
try (AutoCloseableLock l = ds.acquireDatasetReadLock()) {
|
||||
accessed.getAndSet(true);
|
||||
// signal the holder thread.
|
||||
waiterLatch.countDown();
|
||||
} catch (Exception e) {
|
||||
}
|
||||
}
|
||||
};
|
||||
waiter.start();
|
||||
holder.start();
|
||||
// Wait for sometime to make sure we are in deadlock,
|
||||
try {
|
||||
GenericTestUtils.waitFor(() ->
|
||||
accessed.get(),
|
||||
100, 10000);
|
||||
fail("Waiter thread should not execute.");
|
||||
} catch (TimeoutException e) {
|
||||
}
|
||||
// Release waiterLatch to exit deadlock.
|
||||
waiterLatch.countDown();
|
||||
holder.join();
|
||||
waiter.join();
|
||||
// After releasing waiterLatch water
|
||||
// thread will be able to execute.
|
||||
assertTrue(accessed.get());
|
||||
} finally {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAddVolumes() throws IOException {
|
||||
final int numNewVolumes = 3;
|
||||
|
@ -356,8 +242,8 @@ public class TestFsDatasetImpl {
|
|||
|
||||
@Test
|
||||
public void testAddVolumeWithSameStorageUuid() throws IOException {
|
||||
HdfsConfiguration config = new HdfsConfiguration();
|
||||
MiniDFSCluster cluster = new MiniDFSCluster.Builder(config)
|
||||
HdfsConfiguration conf = new HdfsConfiguration();
|
||||
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
|
||||
.numDataNodes(1).build();
|
||||
try {
|
||||
cluster.waitActive();
|
||||
|
|
|
@ -37,6 +37,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
|
|||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.util.AutoCloseableLock;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
@ -52,7 +53,6 @@ import java.util.concurrent.Executors;
|
|||
import java.util.concurrent.ForkJoinPool;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DU_RESERVED_PERCENTAGE_KEY;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
@ -368,7 +368,7 @@ public class TestFsVolumeList {
|
|||
fs.close();
|
||||
FsDatasetImpl fsDataset = (FsDatasetImpl) cluster.getDataNodes().get(0)
|
||||
.getFSDataset();
|
||||
ReplicaMap volumeMap = new ReplicaMap(new ReentrantReadWriteLock());
|
||||
ReplicaMap volumeMap = new ReplicaMap(new AutoCloseableLock());
|
||||
RamDiskReplicaTracker ramDiskReplicaMap = RamDiskReplicaTracker
|
||||
.getInstance(conf, fsDataset);
|
||||
FsVolumeImpl vol = (FsVolumeImpl) fsDataset.getFsVolumeReferences().get(0);
|
||||
|
|
|
@ -25,7 +25,6 @@ import java.io.IOException;
|
|||
import java.net.InetSocketAddress;
|
||||
import java.net.SocketTimeoutException;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
@ -60,6 +59,7 @@ import org.apache.hadoop.io.Writable;
|
|||
import org.apache.hadoop.ipc.RPC;
|
||||
import org.apache.hadoop.ipc.Server;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.util.AutoCloseableLock;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
|
@ -236,7 +236,7 @@ public class TestInterDatanodeProtocol {
|
|||
final long firstblockid = 10000L;
|
||||
final long gs = 7777L;
|
||||
final long length = 22L;
|
||||
final ReplicaMap map = new ReplicaMap(new ReentrantReadWriteLock());
|
||||
final ReplicaMap map = new ReplicaMap(new AutoCloseableLock());
|
||||
String bpid = "BP-TEST";
|
||||
final Block[] blocks = new Block[5];
|
||||
for(int i = 0; i < blocks.length; i++) {
|
||||
|
|
|
@ -43,7 +43,6 @@ import java.util.Map;
|
|||
import java.util.Optional;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -78,6 +77,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
|||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.BlockIterator;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi.FsVolumeReferences;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.util.AutoCloseableLock;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
@ -399,7 +399,7 @@ public class TestProvidedImpl {
|
|||
public void testBlockLoad() throws IOException {
|
||||
for (int i = 0; i < providedVolumes.size(); i++) {
|
||||
FsVolumeImpl vol = providedVolumes.get(i);
|
||||
ReplicaMap volumeMap = new ReplicaMap(new ReentrantReadWriteLock());
|
||||
ReplicaMap volumeMap = new ReplicaMap(new AutoCloseableLock());
|
||||
vol.getVolumeMap(volumeMap, null);
|
||||
|
||||
assertEquals(vol.getBlockPoolList().length, BLOCK_POOL_IDS.length);
|
||||
|
@ -475,7 +475,7 @@ public class TestProvidedImpl {
|
|||
vol.setFileRegionProvider(BLOCK_POOL_IDS[CHOSEN_BP_ID],
|
||||
new TestFileRegionBlockAliasMap(fileRegionIterator, minBlockId,
|
||||
numBlocks));
|
||||
ReplicaMap volumeMap = new ReplicaMap(new ReentrantReadWriteLock());
|
||||
ReplicaMap volumeMap = new ReplicaMap(new AutoCloseableLock());
|
||||
vol.getVolumeMap(BLOCK_POOL_IDS[CHOSEN_BP_ID], volumeMap, null);
|
||||
totalBlocks += volumeMap.size(BLOCK_POOL_IDS[CHOSEN_BP_ID]);
|
||||
}
|
||||
|
@ -585,7 +585,7 @@ public class TestProvidedImpl {
|
|||
public void testProvidedReplicaPrefix() throws Exception {
|
||||
for (int i = 0; i < providedVolumes.size(); i++) {
|
||||
FsVolumeImpl vol = providedVolumes.get(i);
|
||||
ReplicaMap volumeMap = new ReplicaMap(new ReentrantReadWriteLock());
|
||||
ReplicaMap volumeMap = new ReplicaMap(new AutoCloseableLock());
|
||||
vol.getVolumeMap(volumeMap, null);
|
||||
|
||||
Path expectedPrefix = new Path(
|
||||
|
|
|
@ -23,16 +23,15 @@ import static org.junit.Assert.fail;
|
|||
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
|
||||
import org.apache.hadoop.util.AutoCloseableLock;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
/**
|
||||
* Unit test for ReplicasMap class
|
||||
*/
|
||||
public class TestReplicaMap {
|
||||
private final ReplicaMap map = new ReplicaMap(new ReentrantReadWriteLock());
|
||||
private final ReplicaMap map = new ReplicaMap(new AutoCloseableLock());
|
||||
private final String bpid = "BP-TEST";
|
||||
private final Block block = new Block(1234, 1234, 1234);
|
||||
|
||||
|
@ -112,7 +111,7 @@ public class TestReplicaMap {
|
|||
|
||||
@Test
|
||||
public void testMergeAll() {
|
||||
ReplicaMap temReplicaMap = new ReplicaMap(new ReentrantReadWriteLock());
|
||||
ReplicaMap temReplicaMap = new ReplicaMap(new AutoCloseableLock());
|
||||
Block tmpBlock = new Block(5678, 5678, 5678);
|
||||
temReplicaMap.add(bpid, new FinalizedReplica(tmpBlock, null, null));
|
||||
|
||||
|
@ -123,7 +122,7 @@ public class TestReplicaMap {
|
|||
|
||||
@Test
|
||||
public void testAddAll() {
|
||||
ReplicaMap temReplicaMap = new ReplicaMap(new ReentrantReadWriteLock());
|
||||
ReplicaMap temReplicaMap = new ReplicaMap(new AutoCloseableLock());
|
||||
Block tmpBlock = new Block(5678, 5678, 5678);
|
||||
temReplicaMap.add(bpid, new FinalizedReplica(tmpBlock, null, null));
|
||||
|
||||
|
|
|
@ -27,7 +27,6 @@ import java.io.RandomAccessFile;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.StorageType;
|
||||
|
@ -48,6 +47,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
|||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.util.AutoCloseableLock;
|
||||
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
@ -550,7 +550,7 @@ public class TestWriteToReplica {
|
|||
bpList.size() == 2);
|
||||
|
||||
createReplicas(bpList, volumes, cluster.getFsDatasetTestUtils(dn));
|
||||
ReplicaMap oldReplicaMap = new ReplicaMap(new ReentrantReadWriteLock());
|
||||
ReplicaMap oldReplicaMap = new ReplicaMap(new AutoCloseableLock());
|
||||
oldReplicaMap.addAll(dataSet.volumeMap);
|
||||
|
||||
cluster.restartDataNode(0);
|
||||
|
|
Loading…
Reference in New Issue