HDFS-15150. Introduce read write lock to Datanode. Contributed Stephen O'Donnell.

Signed-off-by: Wei-Chiu Chuang <weichiu@apache.org>
(cherry picked from commit d7c136b9ed)
This commit is contained in:
Stephen O'Donnell 2020-02-11 07:59:34 -08:00 committed by He Xiaoqiao
parent ca182b6e5f
commit 7039433146
No known key found for this signature in database
GPG Key ID: A80CC124E9A0FA63
16 changed files with 168 additions and 98 deletions

View File

@ -37,7 +37,7 @@ public class InstrumentedReadWriteLock implements ReadWriteLock {
private final Lock readLock; private final Lock readLock;
private final Lock writeLock; private final Lock writeLock;
InstrumentedReadWriteLock(boolean fair, String name, Logger logger, public InstrumentedReadWriteLock(boolean fair, String name, Logger logger,
long minLoggingGapMs, long lockWarningThresholdMs) { long minLoggingGapMs, long lockWarningThresholdMs) {
ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(fair); ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(fair);
readLock = new InstrumentedReadLock(name, logger, readWriteLock, readLock = new InstrumentedReadLock(name, logger, readWriteLock,

View File

@ -549,6 +549,13 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
"dfs.lock.suppress.warning.interval"; "dfs.lock.suppress.warning.interval";
public static final long DFS_LOCK_SUPPRESS_WARNING_INTERVAL_DEFAULT = public static final long DFS_LOCK_SUPPRESS_WARNING_INTERVAL_DEFAULT =
10000; //ms 10000; //ms
public static final String DFS_DATANODE_LOCK_FAIR_KEY =
"dfs.datanode.lock.fair";
public static final boolean DFS_DATANODE_LOCK_FAIR_DEFAULT = true;
public static final String DFS_DATANODE_LOCK_REPORTING_THRESHOLD_MS_KEY =
"dfs.datanode.lock-reporting-threshold-ms";
public static final long
DFS_DATANODE_LOCK_REPORTING_THRESHOLD_MS_DEFAULT = 300L;
public static final String DFS_UPGRADE_DOMAIN_FACTOR = "dfs.namenode.upgrade.domain.factor"; public static final String DFS_UPGRADE_DOMAIN_FACTOR = "dfs.namenode.upgrade.domain.factor";
public static final int DFS_UPGRADE_DOMAIN_FACTOR_DEFAULT = DFS_REPLICATION_DEFAULT; public static final int DFS_UPGRADE_DOMAIN_FACTOR_DEFAULT = DFS_REPLICATION_DEFAULT;

View File

@ -661,5 +661,11 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
*/ */
AutoCloseableLock acquireDatasetLock(); AutoCloseableLock acquireDatasetLock();
/***
* Acquire the read lock of the data set.
* @return The AutoClosable read lock instance.
*/
AutoCloseableLock acquireDatasetReadLock();
Set<? extends Replica> deepCopyReplica(String bpid) throws IOException; Set<? extends Replica> deepCopyReplica(String bpid) throws IOException;
} }

View File

@ -42,6 +42,7 @@ import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask; import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveAction; import java.util.concurrent.RecursiveAction;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.hdfs.server.datanode.FSCachingGetSpaceUsed; import org.apache.hadoop.hdfs.server.datanode.FSCachingGetSpaceUsed;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -66,7 +67,6 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaTrack
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams; import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.MultipleIOException; import org.apache.hadoop.io.MultipleIOException;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.DataChecksum.Type; import org.apache.hadoop.util.DataChecksum.Type;
import org.apache.hadoop.util.DiskChecker; import org.apache.hadoop.util.DiskChecker;
@ -874,7 +874,7 @@ class BlockPoolSlice {
private boolean readReplicasFromCache(ReplicaMap volumeMap, private boolean readReplicasFromCache(ReplicaMap volumeMap,
final RamDiskReplicaTracker lazyWriteReplicaMap) { final RamDiskReplicaTracker lazyWriteReplicaMap) {
ReplicaMap tmpReplicaMap = new ReplicaMap(new AutoCloseableLock()); ReplicaMap tmpReplicaMap = new ReplicaMap(new ReentrantReadWriteLock());
File replicaFile = new File(currentDir, REPLICA_CACHE_FILE); File replicaFile = new File(currentDir, REPLICA_CACHE_FILE);
// Check whether the file exists or not. // Check whether the file exists or not.
if (!replicaFile.exists()) { if (!replicaFile.exists()) {

View File

@ -40,8 +40,8 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.management.NotCompliantMBeanException; import javax.management.NotCompliantMBeanException;
import javax.management.ObjectName; import javax.management.ObjectName;
@ -112,7 +112,7 @@ import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.DiskChecker.DiskErrorException; import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException; import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
import org.apache.hadoop.util.InstrumentedLock; import org.apache.hadoop.util.InstrumentedReadWriteLock;
import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.Timer; import org.apache.hadoop.util.Timer;
@ -179,7 +179,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
@Override @Override
public FsVolumeImpl getVolume(final ExtendedBlock b) { public FsVolumeImpl getVolume(final ExtendedBlock b) {
try (AutoCloseableLock lock = datasetLock.acquire()) { try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
final ReplicaInfo r = final ReplicaInfo r =
volumeMap.get(b.getBlockPoolId(), b.getLocalBlock()); volumeMap.get(b.getBlockPoolId(), b.getLocalBlock());
return r != null ? (FsVolumeImpl) r.getVolume() : null; return r != null ? (FsVolumeImpl) r.getVolume() : null;
@ -189,7 +189,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
@Override // FsDatasetSpi @Override // FsDatasetSpi
public Block getStoredBlock(String bpid, long blkid) public Block getStoredBlock(String bpid, long blkid)
throws IOException { throws IOException {
try (AutoCloseableLock lock = datasetLock.acquire()) { try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
ReplicaInfo r = volumeMap.get(bpid, blkid); ReplicaInfo r = volumeMap.get(bpid, blkid);
if (r == null) { if (r == null) {
return null; return null;
@ -205,9 +205,13 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
@Override @Override
public Set<? extends Replica> deepCopyReplica(String bpid) public Set<? extends Replica> deepCopyReplica(String bpid)
throws IOException { throws IOException {
Set<? extends Replica> replicas = Set<? extends Replica> replicas = null;
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
replicas =
new HashSet<>(volumeMap.replicas(bpid) == null ? Collections.EMPTY_SET new HashSet<>(volumeMap.replicas(bpid) == null ? Collections.EMPTY_SET
: volumeMap.replicas(bpid)); : volumeMap.replicas(bpid));
}
return Collections.unmodifiableSet(replicas); return Collections.unmodifiableSet(replicas);
} }
@ -268,8 +272,12 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
private final int maxDataLength; private final int maxDataLength;
@VisibleForTesting @VisibleForTesting
final AutoCloseableLock datasetLock; final AutoCloseableLock datasetWriteLock;
private final Condition datasetLockCondition; @VisibleForTesting
final AutoCloseableLock datasetReadLock;
@VisibleForTesting
final InstrumentedReadWriteLock datasetRWLock;
private final Condition datasetWriteLockCondition;
private static String blockPoolId = ""; private static String blockPoolId = "";
/** /**
@ -282,15 +290,20 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
this.dataStorage = storage; this.dataStorage = storage;
this.conf = conf; this.conf = conf;
this.smallBufferSize = DFSUtilClient.getSmallBufferSize(conf); this.smallBufferSize = DFSUtilClient.getSmallBufferSize(conf);
this.datasetLock = new AutoCloseableLock( this.datasetRWLock = new InstrumentedReadWriteLock(
new InstrumentedLock(getClass().getName(), LOG, conf.getBoolean(DFSConfigKeys.DFS_DATANODE_LOCK_FAIR_KEY,
new ReentrantLock(true), DFSConfigKeys.DFS_DATANODE_LOCK_FAIR_DEFAULT),
conf.getTimeDuration( "FsDatasetRWLock", LOG, conf.getTimeDuration(
DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_KEY, DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_KEY,
DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_DEFAULT, DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_DEFAULT,
TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS),
300)); conf.getTimeDuration(
this.datasetLockCondition = datasetLock.newCondition(); DFSConfigKeys.DFS_DATANODE_LOCK_REPORTING_THRESHOLD_MS_KEY,
DFSConfigKeys.DFS_DATANODE_LOCK_REPORTING_THRESHOLD_MS_DEFAULT,
TimeUnit.MILLISECONDS));
this.datasetWriteLock = new AutoCloseableLock(datasetRWLock.writeLock());
this.datasetReadLock = new AutoCloseableLock(datasetRWLock.readLock());
this.datasetWriteLockCondition = datasetWriteLock.newCondition();
// The number of volumes required for operation is the total number // The number of volumes required for operation is the total number
// of volumes minus the number of failed volumes we can tolerate. // of volumes minus the number of failed volumes we can tolerate.
@ -329,7 +342,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
} }
storageMap = new ConcurrentHashMap<String, DatanodeStorage>(); storageMap = new ConcurrentHashMap<String, DatanodeStorage>();
volumeMap = new ReplicaMap(datasetLock); volumeMap = new ReplicaMap(datasetRWLock);
ramDiskReplicaTracker = RamDiskReplicaTracker.getInstance(conf, this); ramDiskReplicaTracker = RamDiskReplicaTracker.getInstance(conf, this);
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@ -383,7 +396,12 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
@Override @Override
public AutoCloseableLock acquireDatasetLock() { public AutoCloseableLock acquireDatasetLock() {
return datasetLock.acquire(); return datasetWriteLock.acquire();
}
@Override
public AutoCloseableLock acquireDatasetReadLock() {
return datasetReadLock.acquire();
} }
/** /**
@ -424,7 +442,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
ReplicaMap replicaMap, ReplicaMap replicaMap,
Storage.StorageDirectory sd, StorageType storageType, Storage.StorageDirectory sd, StorageType storageType,
FsVolumeReference ref) throws IOException { FsVolumeReference ref) throws IOException {
try (AutoCloseableLock lock = datasetLock.acquire()) { try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
DatanodeStorage dnStorage = storageMap.get(sd.getStorageUuid()); DatanodeStorage dnStorage = storageMap.get(sd.getStorageUuid());
if (dnStorage != null) { if (dnStorage != null) {
final String errorMsg = String.format( final String errorMsg = String.format(
@ -457,7 +475,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
.setConf(this.conf) .setConf(this.conf)
.build(); .build();
FsVolumeReference ref = fsVolume.obtainReference(); FsVolumeReference ref = fsVolume.obtainReference();
ReplicaMap tempVolumeMap = new ReplicaMap(datasetLock); ReplicaMap tempVolumeMap = new ReplicaMap(datasetRWLock);
fsVolume.getVolumeMap(tempVolumeMap, ramDiskReplicaTracker); fsVolume.getVolumeMap(tempVolumeMap, ramDiskReplicaTracker);
activateVolume(tempVolumeMap, sd, storageLocation.getStorageType(), ref); activateVolume(tempVolumeMap, sd, storageLocation.getStorageType(), ref);
@ -496,7 +514,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
StorageType storageType = location.getStorageType(); StorageType storageType = location.getStorageType();
final FsVolumeImpl fsVolume = final FsVolumeImpl fsVolume =
createFsVolume(sd.getStorageUuid(), sd, location); createFsVolume(sd.getStorageUuid(), sd, location);
final ReplicaMap tempVolumeMap = new ReplicaMap(new AutoCloseableLock()); final ReplicaMap tempVolumeMap =
new ReplicaMap(new ReentrantReadWriteLock());
ArrayList<IOException> exceptions = Lists.newArrayList(); ArrayList<IOException> exceptions = Lists.newArrayList();
for (final NamespaceInfo nsInfo : nsInfos) { for (final NamespaceInfo nsInfo : nsInfos) {
@ -541,7 +560,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
new ArrayList<>(storageLocsToRemove); new ArrayList<>(storageLocsToRemove);
Map<String, List<ReplicaInfo>> blkToInvalidate = new HashMap<>(); Map<String, List<ReplicaInfo>> blkToInvalidate = new HashMap<>();
List<String> storageToRemove = new ArrayList<>(); List<String> storageToRemove = new ArrayList<>();
try (AutoCloseableLock lock = datasetLock.acquire()) { try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
for (int idx = 0; idx < dataStorage.getNumStorageDirs(); idx++) { for (int idx = 0; idx < dataStorage.getNumStorageDirs(); idx++) {
Storage.StorageDirectory sd = dataStorage.getStorageDir(idx); Storage.StorageDirectory sd = dataStorage.getStorageDir(idx);
final StorageLocation sdLocation = sd.getStorageLocation(); final StorageLocation sdLocation = sd.getStorageLocation();
@ -553,7 +572,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
// Disable the volume from the service. // Disable the volume from the service.
asyncDiskService.removeVolume(sd.getStorageUuid()); asyncDiskService.removeVolume(sd.getStorageUuid());
volumes.removeVolume(sdLocation, clearFailure); volumes.removeVolume(sdLocation, clearFailure);
volumes.waitVolumeRemoved(5000, datasetLockCondition); volumes.waitVolumeRemoved(5000, datasetWriteLockCondition);
// Removed all replica information for the blocks on the volume. // Removed all replica information for the blocks on the volume.
// Unlike updating the volumeMap in addVolume(), this operation does // Unlike updating the volumeMap in addVolume(), this operation does
@ -600,7 +619,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
} }
} }
try (AutoCloseableLock lock = datasetLock.acquire()) { try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
for(String storageUuid : storageToRemove) { for(String storageUuid : storageToRemove) {
storageMap.remove(storageUuid); storageMap.remove(storageUuid);
} }
@ -791,7 +810,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
long seekOffset) throws IOException { long seekOffset) throws IOException {
ReplicaInfo info; ReplicaInfo info;
try (AutoCloseableLock lock = datasetLock.acquire()) { try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
info = volumeMap.get(b.getBlockPoolId(), b.getLocalBlock()); info = volumeMap.get(b.getBlockPoolId(), b.getLocalBlock());
} }
@ -879,7 +898,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
@Override // FsDatasetSpi @Override // FsDatasetSpi
public ReplicaInputStreams getTmpInputStreams(ExtendedBlock b, public ReplicaInputStreams getTmpInputStreams(ExtendedBlock b,
long blkOffset, long metaOffset) throws IOException { long blkOffset, long metaOffset) throws IOException {
try (AutoCloseableLock lock = datasetLock.acquire()) { try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
ReplicaInfo info = getReplicaInfo(b); ReplicaInfo info = getReplicaInfo(b);
FsVolumeReference ref = info.getVolume().obtainReference(); FsVolumeReference ref = info.getVolume().obtainReference();
try { try {
@ -1004,7 +1023,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
} }
FsVolumeReference volumeRef = null; FsVolumeReference volumeRef = null;
try (AutoCloseableLock lock = datasetLock.acquire()) { try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
volumeRef = volumes.getNextVolume(targetStorageType, targetStorageId, volumeRef = volumes.getNextVolume(targetStorageType, targetStorageId,
block.getNumBytes()); block.getNumBytes());
} }
@ -1118,7 +1137,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
FsVolumeReference volumeRef = null; FsVolumeReference volumeRef = null;
try (AutoCloseableLock lock = datasetLock.acquire()) { try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
volumeRef = destination.obtainReference(); volumeRef = destination.obtainReference();
} }
@ -1206,7 +1225,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
@Override // FsDatasetSpi @Override // FsDatasetSpi
public ReplicaHandler append(ExtendedBlock b, public ReplicaHandler append(ExtendedBlock b,
long newGS, long expectedBlockLen) throws IOException { long newGS, long expectedBlockLen) throws IOException {
try (AutoCloseableLock lock = datasetLock.acquire()) { try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
// If the block was successfully finalized because all packets // If the block was successfully finalized because all packets
// were successfully processed at the Datanode but the ack for // were successfully processed at the Datanode but the ack for
// some of the packets were not received by the client. The client // some of the packets were not received by the client. The client
@ -1258,7 +1277,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
private ReplicaInPipeline append(String bpid, private ReplicaInPipeline append(String bpid,
ReplicaInfo replicaInfo, long newGS, long estimateBlockLen) ReplicaInfo replicaInfo, long newGS, long estimateBlockLen)
throws IOException { throws IOException {
try (AutoCloseableLock lock = datasetLock.acquire()) { try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
// If the block is cached, start uncaching it. // If the block is cached, start uncaching it.
if (replicaInfo.getState() != ReplicaState.FINALIZED) { if (replicaInfo.getState() != ReplicaState.FINALIZED) {
throw new IOException("Only a Finalized replica can be appended to; " throw new IOException("Only a Finalized replica can be appended to; "
@ -1354,7 +1373,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
while (true) { while (true) {
try { try {
try (AutoCloseableLock lock = datasetLock.acquire()) { try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen); ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen);
FsVolumeReference ref = replicaInfo.getVolume().obtainReference(); FsVolumeReference ref = replicaInfo.getVolume().obtainReference();
ReplicaInPipeline replica; ReplicaInPipeline replica;
@ -1386,7 +1405,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
LOG.info("Recover failed close " + b); LOG.info("Recover failed close " + b);
while (true) { while (true) {
try { try {
try (AutoCloseableLock lock = datasetLock.acquire()) { try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
// check replica's state // check replica's state
ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen); ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen);
// bump the replica's GS // bump the replica's GS
@ -1408,7 +1427,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
public ReplicaHandler createRbw( public ReplicaHandler createRbw(
StorageType storageType, String storageId, ExtendedBlock b, StorageType storageType, String storageId, ExtendedBlock b,
boolean allowLazyPersist) throws IOException { boolean allowLazyPersist) throws IOException {
try (AutoCloseableLock lock = datasetLock.acquire()) { try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
b.getBlockId()); b.getBlockId());
if (replicaInfo != null) { if (replicaInfo != null) {
@ -1479,7 +1498,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
while (true) { while (true) {
try { try {
try (AutoCloseableLock lock = datasetLock.acquire()) { try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
ReplicaInfo replicaInfo = ReplicaInfo replicaInfo =
getReplicaInfo(b.getBlockPoolId(), b.getBlockId()); getReplicaInfo(b.getBlockPoolId(), b.getBlockId());
// check the replica's state // check the replica's state
@ -1504,7 +1523,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
private ReplicaHandler recoverRbwImpl(ReplicaInPipeline rbw, private ReplicaHandler recoverRbwImpl(ReplicaInPipeline rbw,
ExtendedBlock b, long newGS, long minBytesRcvd, long maxBytesRcvd) ExtendedBlock b, long newGS, long minBytesRcvd, long maxBytesRcvd)
throws IOException { throws IOException {
try (AutoCloseableLock lock = datasetLock.acquire()) { try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
// check generation stamp // check generation stamp
long replicaGenerationStamp = rbw.getGenerationStamp(); long replicaGenerationStamp = rbw.getGenerationStamp();
if (replicaGenerationStamp < b.getGenerationStamp() || if (replicaGenerationStamp < b.getGenerationStamp() ||
@ -1565,7 +1584,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
public ReplicaInPipeline convertTemporaryToRbw( public ReplicaInPipeline convertTemporaryToRbw(
final ExtendedBlock b) throws IOException { final ExtendedBlock b) throws IOException {
try (AutoCloseableLock lock = datasetLock.acquire()) { try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
final long blockId = b.getBlockId(); final long blockId = b.getBlockId();
final long expectedGs = b.getGenerationStamp(); final long expectedGs = b.getGenerationStamp();
final long visible = b.getNumBytes(); final long visible = b.getNumBytes();
@ -1639,7 +1658,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
ReplicaInfo lastFoundReplicaInfo = null; ReplicaInfo lastFoundReplicaInfo = null;
boolean isInPipeline = false; boolean isInPipeline = false;
do { do {
try (AutoCloseableLock lock = datasetLock.acquire()) { try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
ReplicaInfo currentReplicaInfo = ReplicaInfo currentReplicaInfo =
volumeMap.get(b.getBlockPoolId(), b.getBlockId()); volumeMap.get(b.getBlockPoolId(), b.getBlockId());
if (currentReplicaInfo == lastFoundReplicaInfo) { if (currentReplicaInfo == lastFoundReplicaInfo) {
@ -1692,7 +1711,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
invalidate(b.getBlockPoolId(), new Block[] { lastFoundReplicaInfo }, invalidate(b.getBlockPoolId(), new Block[] { lastFoundReplicaInfo },
false); false);
} }
try (AutoCloseableLock lock = datasetLock.acquire()) { try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
FsVolumeReference ref = volumes.getNextVolume(storageType, storageId, b FsVolumeReference ref = volumes.getNextVolume(storageType, storageId, b
.getNumBytes()); .getNumBytes());
FsVolumeImpl v = (FsVolumeImpl) ref.getVolume(); FsVolumeImpl v = (FsVolumeImpl) ref.getVolume();
@ -1743,7 +1762,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
throws IOException { throws IOException {
ReplicaInfo replicaInfo = null; ReplicaInfo replicaInfo = null;
ReplicaInfo finalizedReplicaInfo = null; ReplicaInfo finalizedReplicaInfo = null;
try (AutoCloseableLock lock = datasetLock.acquire()) { try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
if (Thread.interrupted()) { if (Thread.interrupted()) {
// Don't allow data modifications from interrupted threads // Don't allow data modifications from interrupted threads
throw new IOException("Cannot finalize block from Interrupted Thread"); throw new IOException("Cannot finalize block from Interrupted Thread");
@ -1774,7 +1793,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
private ReplicaInfo finalizeReplica(String bpid, ReplicaInfo replicaInfo) private ReplicaInfo finalizeReplica(String bpid, ReplicaInfo replicaInfo)
throws IOException { throws IOException {
try (AutoCloseableLock lock = datasetLock.acquire()) { try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
// Compare generation stamp of old and new replica before finalizing // Compare generation stamp of old and new replica before finalizing
if (volumeMap.get(bpid, replicaInfo.getBlockId()).getGenerationStamp() if (volumeMap.get(bpid, replicaInfo.getBlockId()).getGenerationStamp()
> replicaInfo.getGenerationStamp()) { > replicaInfo.getGenerationStamp()) {
@ -1819,7 +1838,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
*/ */
@Override // FsDatasetSpi @Override // FsDatasetSpi
public void unfinalizeBlock(ExtendedBlock b) throws IOException { public void unfinalizeBlock(ExtendedBlock b) throws IOException {
try (AutoCloseableLock lock = datasetLock.acquire()) { try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
b.getLocalBlock()); b.getLocalBlock());
if (replicaInfo != null && if (replicaInfo != null &&
@ -1872,7 +1891,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
new HashMap<String, BlockListAsLongs.Builder>(); new HashMap<String, BlockListAsLongs.Builder>();
List<FsVolumeImpl> curVolumes = null; List<FsVolumeImpl> curVolumes = null;
try (AutoCloseableLock lock = datasetLock.acquire()) { try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
curVolumes = volumes.getVolumes(); curVolumes = volumes.getVolumes();
for (FsVolumeSpi v : curVolumes) { for (FsVolumeSpi v : curVolumes) {
builders.put(v.getStorageID(), BlockListAsLongs.builder(maxDataLength)); builders.put(v.getStorageID(), BlockListAsLongs.builder(maxDataLength));
@ -1927,7 +1946,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
* Gets a list of references to the finalized blocks for the given block pool. * Gets a list of references to the finalized blocks for the given block pool.
* <p> * <p>
* Callers of this function should call * Callers of this function should call
* {@link FsDatasetSpi#acquireDatasetLock} to avoid blocks' status being * {@link FsDatasetSpi#acquireDatasetLock()} to avoid blocks' status being
* changed during list iteration. * changed during list iteration.
* </p> * </p>
* @return a list of references to the finalized blocks for the given block * @return a list of references to the finalized blocks for the given block
@ -1935,7 +1954,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
*/ */
@Override @Override
public List<ReplicaInfo> getFinalizedBlocks(String bpid) { public List<ReplicaInfo> getFinalizedBlocks(String bpid) {
try (AutoCloseableLock lock = datasetLock.acquire()) { try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
final List<ReplicaInfo> finalized = new ArrayList<ReplicaInfo>( final List<ReplicaInfo> finalized = new ArrayList<ReplicaInfo>(
volumeMap.size(bpid)); volumeMap.size(bpid));
for (ReplicaInfo b : volumeMap.replicas(bpid)) { for (ReplicaInfo b : volumeMap.replicas(bpid)) {
@ -2028,7 +2047,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
ReplicaInfo validateBlockFile(String bpid, long blockId) { ReplicaInfo validateBlockFile(String bpid, long blockId) {
//Should we check for metadata file too? //Should we check for metadata file too?
final ReplicaInfo r; final ReplicaInfo r;
try (AutoCloseableLock lock = datasetLock.acquire()) { try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
r = volumeMap.get(bpid, blockId); r = volumeMap.get(bpid, blockId);
} }
if (r != null) { if (r != null) {
@ -2079,7 +2098,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
for (int i = 0; i < invalidBlks.length; i++) { for (int i = 0; i < invalidBlks.length; i++) {
final ReplicaInfo removing; final ReplicaInfo removing;
final FsVolumeImpl v; final FsVolumeImpl v;
try (AutoCloseableLock lock = datasetLock.acquire()) { try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
final ReplicaInfo info = volumeMap.get(bpid, invalidBlks[i]); final ReplicaInfo info = volumeMap.get(bpid, invalidBlks[i]);
if (info == null) { if (info == null) {
ReplicaInfo infoByBlockId = ReplicaInfo infoByBlockId =
@ -2205,7 +2224,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
long length, genstamp; long length, genstamp;
Executor volumeExecutor; Executor volumeExecutor;
try (AutoCloseableLock lock = datasetLock.acquire()) { try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
ReplicaInfo info = volumeMap.get(bpid, blockId); ReplicaInfo info = volumeMap.get(bpid, blockId);
boolean success = false; boolean success = false;
try { try {
@ -2273,7 +2292,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
@Override // FsDatasetSpi @Override // FsDatasetSpi
public boolean contains(final ExtendedBlock block) { public boolean contains(final ExtendedBlock block) {
try (AutoCloseableLock lock = datasetLock.acquire()) { try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
final long blockId = block.getLocalBlock().getBlockId(); final long blockId = block.getLocalBlock().getBlockId();
final String bpid = block.getBlockPoolId(); final String bpid = block.getBlockPoolId();
final ReplicaInfo r = volumeMap.get(bpid, blockId); final ReplicaInfo r = volumeMap.get(bpid, blockId);
@ -2393,7 +2412,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
Block corruptBlock = null; Block corruptBlock = null;
ReplicaInfo memBlockInfo; ReplicaInfo memBlockInfo;
try (AutoCloseableLock lock = datasetLock.acquire()) { try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
memBlockInfo = volumeMap.get(bpid, blockId); memBlockInfo = volumeMap.get(bpid, blockId);
if (memBlockInfo != null && if (memBlockInfo != null &&
memBlockInfo.getState() != ReplicaState.FINALIZED) { memBlockInfo.getState() != ReplicaState.FINALIZED) {
@ -2594,7 +2613,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
@Override @Override
public String getReplicaString(String bpid, long blockId) { public String getReplicaString(String bpid, long blockId) {
try (AutoCloseableLock lock = datasetLock.acquire()) { try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
final Replica r = volumeMap.get(bpid, blockId); final Replica r = volumeMap.get(bpid, blockId);
return r == null ? "null" : r.toString(); return r == null ? "null" : r.toString();
} }
@ -2701,7 +2720,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
final long recoveryId, final long recoveryId,
final long newBlockId, final long newBlockId,
final long newlength) throws IOException { final long newlength) throws IOException {
try (AutoCloseableLock lock = datasetLock.acquire()) { try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
//get replica //get replica
final String bpid = oldBlock.getBlockPoolId(); final String bpid = oldBlock.getBlockPoolId();
final ReplicaInfo replica = volumeMap.get(bpid, oldBlock.getBlockId()); final ReplicaInfo replica = volumeMap.get(bpid, oldBlock.getBlockId());
@ -2814,7 +2833,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
@Override // FsDatasetSpi @Override // FsDatasetSpi
public long getReplicaVisibleLength(final ExtendedBlock block) public long getReplicaVisibleLength(final ExtendedBlock block)
throws IOException { throws IOException {
try (AutoCloseableLock lock = datasetLock.acquire()) { try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
final Replica replica = getReplicaInfo(block.getBlockPoolId(), final Replica replica = getReplicaInfo(block.getBlockPoolId(),
block.getBlockId()); block.getBlockId());
if (replica.getGenerationStamp() < block.getGenerationStamp()) { if (replica.getGenerationStamp() < block.getGenerationStamp()) {
@ -2831,7 +2850,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
throws IOException { throws IOException {
LOG.info("Adding block pool " + bpid); LOG.info("Adding block pool " + bpid);
AddBlockPoolException volumeExceptions = new AddBlockPoolException(); AddBlockPoolException volumeExceptions = new AddBlockPoolException();
try (AutoCloseableLock lock = datasetLock.acquire()) { try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
try { try {
volumes.addBlockPool(bpid, conf); volumes.addBlockPool(bpid, conf);
} catch (AddBlockPoolException e) { } catch (AddBlockPoolException e) {
@ -2861,7 +2880,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
@Override @Override
public void shutdownBlockPool(String bpid) { public void shutdownBlockPool(String bpid) {
try (AutoCloseableLock lock = datasetLock.acquire()) { try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
LOG.info("Removing block pool " + bpid); LOG.info("Removing block pool " + bpid);
Map<DatanodeStorage, BlockListAsLongs> blocksPerVolume Map<DatanodeStorage, BlockListAsLongs> blocksPerVolume
= getBlockReports(bpid); = getBlockReports(bpid);
@ -2935,7 +2954,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
@Override //FsDatasetSpi @Override //FsDatasetSpi
public void deleteBlockPool(String bpid, boolean force) public void deleteBlockPool(String bpid, boolean force)
throws IOException { throws IOException {
try (AutoCloseableLock lock = datasetLock.acquire()) { try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
List<FsVolumeImpl> curVolumes = volumes.getVolumes(); List<FsVolumeImpl> curVolumes = volumes.getVolumes();
if (!force) { if (!force) {
for (FsVolumeImpl volume : curVolumes) { for (FsVolumeImpl volume : curVolumes) {
@ -2964,7 +2983,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
@Override // FsDatasetSpi @Override // FsDatasetSpi
public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block) public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block)
throws IOException { throws IOException {
try (AutoCloseableLock lock = datasetLock.acquire()) { try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
final Replica replica = volumeMap.get(block.getBlockPoolId(), final Replica replica = volumeMap.get(block.getBlockPoolId(),
block.getBlockId()); block.getBlockId());
if (replica == null) { if (replica == null) {
@ -3016,7 +3035,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
@Override @Override
public void onCompleteLazyPersist(String bpId, long blockId, public void onCompleteLazyPersist(String bpId, long blockId,
long creationTime, File[] savedFiles, FsVolumeImpl targetVolume) { long creationTime, File[] savedFiles, FsVolumeImpl targetVolume) {
try (AutoCloseableLock lock = datasetLock.acquire()) { try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
ramDiskReplicaTracker.recordEndLazyPersist(bpId, blockId, savedFiles); ramDiskReplicaTracker.recordEndLazyPersist(bpId, blockId, savedFiles);
targetVolume.incDfsUsedAndNumBlocks(bpId, savedFiles[0].length() targetVolume.incDfsUsedAndNumBlocks(bpId, savedFiles[0].length()
@ -3150,7 +3169,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
try { try {
block = ramDiskReplicaTracker.dequeueNextReplicaToPersist(); block = ramDiskReplicaTracker.dequeueNextReplicaToPersist();
if (block != null) { if (block != null) {
try (AutoCloseableLock lock = datasetLock.acquire()) { try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
replicaInfo = volumeMap.get(block.getBlockPoolId(), block.getBlockId()); replicaInfo = volumeMap.get(block.getBlockPoolId(), block.getBlockId());
// If replicaInfo is null, the block was either deleted before // If replicaInfo is null, the block was either deleted before
@ -3217,7 +3236,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
ReplicaInfo replicaInfo, newReplicaInfo; ReplicaInfo replicaInfo, newReplicaInfo;
final String bpid = replicaState.getBlockPoolId(); final String bpid = replicaState.getBlockPoolId();
try (AutoCloseableLock lock = datasetLock.acquire()) { try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
replicaInfo = getReplicaInfo(replicaState.getBlockPoolId(), replicaInfo = getReplicaInfo(replicaState.getBlockPoolId(),
replicaState.getBlockId()); replicaState.getBlockId());
Preconditions.checkState(replicaInfo.getVolume().isTransientStorage()); Preconditions.checkState(replicaInfo.getVolume().isTransientStorage());
@ -3390,7 +3409,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
} }
void stopAllDataxceiverThreads(FsVolumeImpl volume) { void stopAllDataxceiverThreads(FsVolumeImpl volume) {
try (AutoCloseableLock lock = datasetLock.acquire()) { try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
for (String bpid : volumeMap.getBlockPoolList()) { for (String bpid : volumeMap.getBlockPoolList()) {
Collection<ReplicaInfo> replicas = volumeMap.replicas(bpid); Collection<ReplicaInfo> replicas = volumeMap.replicas(bpid);
for (ReplicaInfo replicaInfo : replicas) { for (ReplicaInfo replicaInfo : replicas) {

View File

@ -28,6 +28,7 @@ import java.util.Set;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -53,9 +54,10 @@ import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.FileIoProvider; import org.apache.hadoop.hdfs.server.datanode.FileIoProvider;
import org.apache.hadoop.hdfs.server.datanode.ReplicaBuilder; import org.apache.hadoop.hdfs.server.datanode.ReplicaBuilder;
import org.apache.hadoop.util.Timer;
import org.apache.hadoop.util.DiskChecker.DiskErrorException; import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.Timer;
import org.codehaus.jackson.annotate.JsonProperty; import org.codehaus.jackson.annotate.JsonProperty;
import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.map.ObjectReader; import org.codehaus.jackson.map.ObjectReader;
@ -63,8 +65,6 @@ import org.codehaus.jackson.map.ObjectWriter;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Time;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PROVIDED_ALIASMAP_LOAD_RETRIES; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PROVIDED_ALIASMAP_LOAD_RETRIES;
@ -135,7 +135,7 @@ class ProvidedVolumeImpl extends FsVolumeImpl {
ProvidedBlockPoolSlice(String bpid, ProvidedVolumeImpl volume, ProvidedBlockPoolSlice(String bpid, ProvidedVolumeImpl volume,
Configuration conf) { Configuration conf) {
this.providedVolume = volume; this.providedVolume = volume;
bpVolumeMap = new ReplicaMap(new AutoCloseableLock()); bpVolumeMap = new ReplicaMap(new ReentrantReadWriteLock());
Class<? extends BlockAliasMap> fmt = Class<? extends BlockAliasMap> fmt =
conf.getClass(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_CLASS, conf.getClass(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_CLASS,
TextFileRegionAliasMap.class, BlockAliasMap.class); TextFileRegionAliasMap.class, BlockAliasMap.class);

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import java.util.Collection; import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.locks.ReadWriteLock;
import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
@ -31,23 +32,27 @@ import org.apache.hadoop.util.AutoCloseableLock;
* Maintains the replica map. * Maintains the replica map.
*/ */
class ReplicaMap { class ReplicaMap {
private final ReadWriteLock rwLock;
// Lock object to synchronize this instance. // Lock object to synchronize this instance.
private final AutoCloseableLock lock; private final AutoCloseableLock readLock;
private final AutoCloseableLock writeLock;
// Map of block pool Id to another map of block Id to ReplicaInfo. // Map of block pool Id to another map of block Id to ReplicaInfo.
private final Map<String, LightWeightResizableGSet<Block, ReplicaInfo>> map = private final Map<String, LightWeightResizableGSet<Block, ReplicaInfo>> map =
new HashMap<>(); new HashMap<>();
ReplicaMap(AutoCloseableLock lock) { ReplicaMap(ReadWriteLock lock) {
if (lock == null) { if (lock == null) {
throw new HadoopIllegalArgumentException( throw new HadoopIllegalArgumentException(
"Lock to synchronize on cannot be null"); "Lock to synchronize on cannot be null");
} }
this.lock = lock; this.rwLock = lock;
this.readLock = new AutoCloseableLock(rwLock.readLock());
this.writeLock = new AutoCloseableLock(rwLock.writeLock());
} }
String[] getBlockPoolList() { String[] getBlockPoolList() {
try (AutoCloseableLock l = lock.acquire()) { try (AutoCloseableLock l = writeLock.acquire()) {
return map.keySet().toArray(new String[map.keySet().size()]); return map.keySet().toArray(new String[map.keySet().size()]);
} }
} }
@ -92,7 +97,7 @@ class ReplicaMap {
*/ */
ReplicaInfo get(String bpid, long blockId) { ReplicaInfo get(String bpid, long blockId) {
checkBlockPool(bpid); checkBlockPool(bpid);
try (AutoCloseableLock l = lock.acquire()) { try (AutoCloseableLock l = writeLock.acquire()) {
LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid); LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
return m != null ? m.get(new Block(blockId)) : null; return m != null ? m.get(new Block(blockId)) : null;
} }
@ -109,7 +114,7 @@ class ReplicaMap {
ReplicaInfo add(String bpid, ReplicaInfo replicaInfo) { ReplicaInfo add(String bpid, ReplicaInfo replicaInfo) {
checkBlockPool(bpid); checkBlockPool(bpid);
checkBlock(replicaInfo); checkBlock(replicaInfo);
try (AutoCloseableLock l = lock.acquire()) { try (AutoCloseableLock l = writeLock.acquire()) {
LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid); LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
if (m == null) { if (m == null) {
// Add an entry for block pool if it does not exist already // Add an entry for block pool if it does not exist already
@ -127,7 +132,7 @@ class ReplicaMap {
ReplicaInfo addAndGet(String bpid, ReplicaInfo replicaInfo) { ReplicaInfo addAndGet(String bpid, ReplicaInfo replicaInfo) {
checkBlockPool(bpid); checkBlockPool(bpid);
checkBlock(replicaInfo); checkBlock(replicaInfo);
try (AutoCloseableLock l = lock.acquire()) { try (AutoCloseableLock l = writeLock.acquire()) {
LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid); LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
if (m == null) { if (m == null) {
// Add an entry for block pool if it does not exist already // Add an entry for block pool if it does not exist already
@ -176,7 +181,7 @@ class ReplicaMap {
ReplicaInfo remove(String bpid, Block block) { ReplicaInfo remove(String bpid, Block block) {
checkBlockPool(bpid); checkBlockPool(bpid);
checkBlock(block); checkBlock(block);
try (AutoCloseableLock l = lock.acquire()) { try (AutoCloseableLock l = writeLock.acquire()) {
LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid); LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
if (m != null) { if (m != null) {
ReplicaInfo replicaInfo = m.get(block); ReplicaInfo replicaInfo = m.get(block);
@ -198,7 +203,7 @@ class ReplicaMap {
*/ */
ReplicaInfo remove(String bpid, long blockId) { ReplicaInfo remove(String bpid, long blockId) {
checkBlockPool(bpid); checkBlockPool(bpid);
try (AutoCloseableLock l = lock.acquire()) { try (AutoCloseableLock l = writeLock.acquire()) {
LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid); LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
if (m != null) { if (m != null) {
return m.remove(new Block(blockId)); return m.remove(new Block(blockId));
@ -213,7 +218,7 @@ class ReplicaMap {
* @return the number of replicas in the map * @return the number of replicas in the map
*/ */
int size(String bpid) { int size(String bpid) {
try (AutoCloseableLock l = lock.acquire()) { try (AutoCloseableLock l = writeLock.acquire()) {
LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid); LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
return m != null ? m.size() : 0; return m != null ? m.size() : 0;
} }
@ -237,7 +242,7 @@ class ReplicaMap {
void initBlockPool(String bpid) { void initBlockPool(String bpid) {
checkBlockPool(bpid); checkBlockPool(bpid);
try (AutoCloseableLock l = lock.acquire()) { try (AutoCloseableLock l = writeLock.acquire()) {
LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid); LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
if (m == null) { if (m == null) {
// Add an entry for block pool if it does not exist already // Add an entry for block pool if it does not exist already
@ -249,7 +254,7 @@ class ReplicaMap {
void cleanUpBlockPool(String bpid) { void cleanUpBlockPool(String bpid) {
checkBlockPool(bpid); checkBlockPool(bpid);
try (AutoCloseableLock l = lock.acquire()) { try (AutoCloseableLock l = writeLock.acquire()) {
map.remove(bpid); map.remove(bpid);
} }
} }
@ -259,6 +264,6 @@ class ReplicaMap {
* @return lock object * @return lock object
*/ */
AutoCloseableLock getLock() { AutoCloseableLock getLock() {
return lock; return writeLock;
} }
} }

View File

@ -2999,6 +2999,27 @@
</description> </description>
</property> </property>
<property>
<name>dfs.datanode.lock.fair</name>
<value>true</value>
<description>If this is true, the Datanode FsDataset lock will be used in Fair
mode, which will help to prevent writer threads from being starved, but can
lower lock throughput. See java.util.concurrent.locks.ReentrantReadWriteLock
for more information on fair/non-fair locks.
</description>
</property>
<property>
<name>dfs.datanode.lock-reporting-threshold-ms</name>
<value>300</value>
<description>When thread waits to obtain a lock, or a thread holds a lock for
more than the threshold, a log message will be written. Note that
dfs.lock.suppress.warning.interval ensures a single log message is
emitted per interval for waiting threads and a single message for holding
threads to avoid excessive logging.
</description>
</property>
<property> <property>
<name>dfs.namenode.startup.delay.block.deletion.sec</name> <name>dfs.namenode.startup.delay.block.deletion.sec</name>
<value>0</value> <value>0</value>

View File

@ -1572,6 +1572,12 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
return datasetLock.acquire(); return datasetLock.acquire();
} }
@Override
public AutoCloseableLock acquireDatasetReadLock() {
// No RW lock implementation in simulated dataset currently.
return datasetLock.acquire();
}
@Override @Override
public Set<? extends Replica> deepCopyReplica(String bpid) public Set<? extends Replica> deepCopyReplica(String bpid)
throws IOException { throws IOException {

View File

@ -455,6 +455,11 @@ public class ExternalDatasetImpl implements FsDatasetSpi<ExternalVolumeImpl> {
return null; return null;
} }
@Override
public AutoCloseableLock acquireDatasetReadLock() {
return null;
}
@Override @Override
public Set<? extends Replica> deepCopyReplica(String bpid) public Set<? extends Replica> deepCopyReplica(String bpid)
throws IOException { throws IOException {

View File

@ -434,7 +434,7 @@ public class FsDatasetImplTestUtils implements FsDatasetTestUtils {
@Override @Override
public Iterator<Replica> getStoredReplicas(String bpid) throws IOException { public Iterator<Replica> getStoredReplicas(String bpid) throws IOException {
// Reload replicas from the disk. // Reload replicas from the disk.
ReplicaMap replicaMap = new ReplicaMap(dataset.datasetLock); ReplicaMap replicaMap = new ReplicaMap(dataset.datasetRWLock);
try (FsVolumeReferences refs = dataset.getFsVolumeReferences()) { try (FsVolumeReferences refs = dataset.getFsVolumeReferences()) {
for (FsVolumeSpi vol : refs) { for (FsVolumeSpi vol : refs) {
FsVolumeImpl volume = (FsVolumeImpl) vol; FsVolumeImpl volume = (FsVolumeImpl) vol;

View File

@ -37,7 +37,6 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy; import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy; import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -53,6 +52,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DU_RESERVED_PERCENTAGE_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DU_RESERVED_PERCENTAGE_KEY;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
@ -368,7 +368,7 @@ public class TestFsVolumeList {
fs.close(); fs.close();
FsDatasetImpl fsDataset = (FsDatasetImpl) cluster.getDataNodes().get(0) FsDatasetImpl fsDataset = (FsDatasetImpl) cluster.getDataNodes().get(0)
.getFSDataset(); .getFSDataset();
ReplicaMap volumeMap = new ReplicaMap(new AutoCloseableLock()); ReplicaMap volumeMap = new ReplicaMap(new ReentrantReadWriteLock());
RamDiskReplicaTracker ramDiskReplicaMap = RamDiskReplicaTracker RamDiskReplicaTracker ramDiskReplicaMap = RamDiskReplicaTracker
.getInstance(conf, fsDataset); .getInstance(conf, fsDataset);
FsVolumeImpl vol = (FsVolumeImpl) fsDataset.getFsVolumeReferences().get(0); FsVolumeImpl vol = (FsVolumeImpl) fsDataset.getFsVolumeReferences().get(0);

View File

@ -25,6 +25,7 @@ import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.SocketTimeoutException; import java.net.SocketTimeoutException;
import java.util.List; import java.util.List;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
@ -59,7 +60,6 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server; import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.AutoCloseableLock;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -236,7 +236,7 @@ public class TestInterDatanodeProtocol {
final long firstblockid = 10000L; final long firstblockid = 10000L;
final long gs = 7777L; final long gs = 7777L;
final long length = 22L; final long length = 22L;
final ReplicaMap map = new ReplicaMap(new AutoCloseableLock()); final ReplicaMap map = new ReplicaMap(new ReentrantReadWriteLock());
String bpid = "BP-TEST"; String bpid = "BP-TEST";
final Block[] blocks = new Block[5]; final Block[] blocks = new Block[5];
for(int i = 0; i < blocks.length; i++) { for(int i = 0; i < blocks.length; i++) {

View File

@ -43,6 +43,7 @@ import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.Random; import java.util.Random;
import java.util.Set; import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -77,7 +78,6 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.BlockIterator; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.BlockIterator;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi.FsVolumeReferences; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi.FsVolumeReferences;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -399,7 +399,7 @@ public class TestProvidedImpl {
public void testBlockLoad() throws IOException { public void testBlockLoad() throws IOException {
for (int i = 0; i < providedVolumes.size(); i++) { for (int i = 0; i < providedVolumes.size(); i++) {
FsVolumeImpl vol = providedVolumes.get(i); FsVolumeImpl vol = providedVolumes.get(i);
ReplicaMap volumeMap = new ReplicaMap(new AutoCloseableLock()); ReplicaMap volumeMap = new ReplicaMap(new ReentrantReadWriteLock());
vol.getVolumeMap(volumeMap, null); vol.getVolumeMap(volumeMap, null);
assertEquals(vol.getBlockPoolList().length, BLOCK_POOL_IDS.length); assertEquals(vol.getBlockPoolList().length, BLOCK_POOL_IDS.length);
@ -475,7 +475,7 @@ public class TestProvidedImpl {
vol.setFileRegionProvider(BLOCK_POOL_IDS[CHOSEN_BP_ID], vol.setFileRegionProvider(BLOCK_POOL_IDS[CHOSEN_BP_ID],
new TestFileRegionBlockAliasMap(fileRegionIterator, minBlockId, new TestFileRegionBlockAliasMap(fileRegionIterator, minBlockId,
numBlocks)); numBlocks));
ReplicaMap volumeMap = new ReplicaMap(new AutoCloseableLock()); ReplicaMap volumeMap = new ReplicaMap(new ReentrantReadWriteLock());
vol.getVolumeMap(BLOCK_POOL_IDS[CHOSEN_BP_ID], volumeMap, null); vol.getVolumeMap(BLOCK_POOL_IDS[CHOSEN_BP_ID], volumeMap, null);
totalBlocks += volumeMap.size(BLOCK_POOL_IDS[CHOSEN_BP_ID]); totalBlocks += volumeMap.size(BLOCK_POOL_IDS[CHOSEN_BP_ID]);
} }
@ -585,7 +585,7 @@ public class TestProvidedImpl {
public void testProvidedReplicaPrefix() throws Exception { public void testProvidedReplicaPrefix() throws Exception {
for (int i = 0; i < providedVolumes.size(); i++) { for (int i = 0; i < providedVolumes.size(); i++) {
FsVolumeImpl vol = providedVolumes.get(i); FsVolumeImpl vol = providedVolumes.get(i);
ReplicaMap volumeMap = new ReplicaMap(new AutoCloseableLock()); ReplicaMap volumeMap = new ReplicaMap(new ReentrantReadWriteLock());
vol.getVolumeMap(volumeMap, null); vol.getVolumeMap(volumeMap, null);
Path expectedPrefix = new Path( Path expectedPrefix = new Path(

View File

@ -23,15 +23,16 @@ import static org.junit.Assert.fail;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica; import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
import org.apache.hadoop.util.AutoCloseableLock;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/** /**
* Unit test for ReplicasMap class * Unit test for ReplicasMap class
*/ */
public class TestReplicaMap { public class TestReplicaMap {
private final ReplicaMap map = new ReplicaMap(new AutoCloseableLock()); private final ReplicaMap map = new ReplicaMap(new ReentrantReadWriteLock());
private final String bpid = "BP-TEST"; private final String bpid = "BP-TEST";
private final Block block = new Block(1234, 1234, 1234); private final Block block = new Block(1234, 1234, 1234);
@ -111,7 +112,7 @@ public class TestReplicaMap {
@Test @Test
public void testMergeAll() { public void testMergeAll() {
ReplicaMap temReplicaMap = new ReplicaMap(new AutoCloseableLock()); ReplicaMap temReplicaMap = new ReplicaMap(new ReentrantReadWriteLock());
Block tmpBlock = new Block(5678, 5678, 5678); Block tmpBlock = new Block(5678, 5678, 5678);
temReplicaMap.add(bpid, new FinalizedReplica(tmpBlock, null, null)); temReplicaMap.add(bpid, new FinalizedReplica(tmpBlock, null, null));
@ -122,7 +123,7 @@ public class TestReplicaMap {
@Test @Test
public void testAddAll() { public void testAddAll() {
ReplicaMap temReplicaMap = new ReplicaMap(new AutoCloseableLock()); ReplicaMap temReplicaMap = new ReplicaMap(new ReentrantReadWriteLock());
Block tmpBlock = new Block(5678, 5678, 5678); Block tmpBlock = new Block(5678, 5678, 5678);
temReplicaMap.add(bpid, new FinalizedReplica(tmpBlock, null, null)); temReplicaMap.add(bpid, new FinalizedReplica(tmpBlock, null, null));

View File

@ -27,6 +27,7 @@ import java.io.RandomAccessFile;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.StorageType;
@ -47,7 +48,6 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException; import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -550,7 +550,7 @@ public class TestWriteToReplica {
bpList.size() == 2); bpList.size() == 2);
createReplicas(bpList, volumes, cluster.getFsDatasetTestUtils(dn)); createReplicas(bpList, volumes, cluster.getFsDatasetTestUtils(dn));
ReplicaMap oldReplicaMap = new ReplicaMap(new AutoCloseableLock()); ReplicaMap oldReplicaMap = new ReplicaMap(new ReentrantReadWriteLock());
oldReplicaMap.addAll(dataSet.volumeMap); oldReplicaMap.addAll(dataSet.volumeMap);
cluster.restartDataNode(0); cluster.restartDataNode(0);