HDFS-15150. Introduce read write lock to Datanode. Contributed by Stephen O'Donnell and Ahmed Hussein
This commit is contained in:
parent
ab6b5681e8
commit
caa59d7a58
|
@ -37,7 +37,7 @@ public class InstrumentedReadWriteLock implements ReadWriteLock {
|
||||||
private final Lock readLock;
|
private final Lock readLock;
|
||||||
private final Lock writeLock;
|
private final Lock writeLock;
|
||||||
|
|
||||||
InstrumentedReadWriteLock(boolean fair, String name, Logger logger,
|
public InstrumentedReadWriteLock(boolean fair, String name, Logger logger,
|
||||||
long minLoggingGapMs, long lockWarningThresholdMs) {
|
long minLoggingGapMs, long lockWarningThresholdMs) {
|
||||||
ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(fair);
|
ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(fair);
|
||||||
readLock = new InstrumentedReadLock(name, logger, readWriteLock,
|
readLock = new InstrumentedReadLock(name, logger, readWriteLock,
|
||||||
|
|
|
@ -452,6 +452,13 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
||||||
"dfs.lock.suppress.warning.interval";
|
"dfs.lock.suppress.warning.interval";
|
||||||
public static final long DFS_LOCK_SUPPRESS_WARNING_INTERVAL_DEFAULT =
|
public static final long DFS_LOCK_SUPPRESS_WARNING_INTERVAL_DEFAULT =
|
||||||
10000; //ms
|
10000; //ms
|
||||||
|
public static final String DFS_DATANODE_LOCK_FAIR_KEY =
|
||||||
|
"dfs.datanode.lock.fair";
|
||||||
|
public static final boolean DFS_DATANODE_LOCK_FAIR_DEFAULT = true;
|
||||||
|
public static final String DFS_DATANODE_LOCK_REPORTING_THRESHOLD_MS_KEY =
|
||||||
|
"dfs.datanode.lock-reporting-threshold-ms";
|
||||||
|
public static final long
|
||||||
|
DFS_DATANODE_LOCK_REPORTING_THRESHOLD_MS_DEFAULT = 300L;
|
||||||
|
|
||||||
public static final String DFS_UPGRADE_DOMAIN_FACTOR = "dfs.namenode.upgrade.domain.factor";
|
public static final String DFS_UPGRADE_DOMAIN_FACTOR = "dfs.namenode.upgrade.domain.factor";
|
||||||
public static final int DFS_UPGRADE_DOMAIN_FACTOR_DEFAULT = DFS_REPLICATION_DEFAULT;
|
public static final int DFS_UPGRADE_DOMAIN_FACTOR_DEFAULT = DFS_REPLICATION_DEFAULT;
|
||||||
|
|
|
@ -662,6 +662,12 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
|
||||||
*/
|
*/
|
||||||
AutoCloseableLock acquireDatasetLock();
|
AutoCloseableLock acquireDatasetLock();
|
||||||
|
|
||||||
|
/***
|
||||||
|
* Acquire the read lock of the data set.
|
||||||
|
* @return The AutoClosable read lock instance.
|
||||||
|
*/
|
||||||
|
AutoCloseableLock acquireDatasetReadLock();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Deep copy the replica info belonging to given block pool.
|
* Deep copy the replica info belonging to given block pool.
|
||||||
* @param bpid Specified block pool id.
|
* @param bpid Specified block pool id.
|
||||||
|
|
|
@ -42,6 +42,7 @@ import java.util.concurrent.ForkJoinPool;
|
||||||
import java.util.concurrent.ForkJoinTask;
|
import java.util.concurrent.ForkJoinTask;
|
||||||
import java.util.concurrent.RecursiveAction;
|
import java.util.concurrent.RecursiveAction;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
@ -66,7 +67,6 @@ import org.apache.hadoop.hdfs.server.datanode.ReplicaWaitingToBeRecovered;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.hadoop.io.MultipleIOException;
|
import org.apache.hadoop.io.MultipleIOException;
|
||||||
import org.apache.hadoop.util.AutoCloseableLock;
|
|
||||||
import org.apache.hadoop.util.DataChecksum;
|
import org.apache.hadoop.util.DataChecksum;
|
||||||
import org.apache.hadoop.util.DataChecksum.Type;
|
import org.apache.hadoop.util.DataChecksum.Type;
|
||||||
import org.apache.hadoop.util.DiskChecker;
|
import org.apache.hadoop.util.DiskChecker;
|
||||||
|
@ -849,7 +849,7 @@ class BlockPoolSlice {
|
||||||
|
|
||||||
private boolean readReplicasFromCache(ReplicaMap volumeMap,
|
private boolean readReplicasFromCache(ReplicaMap volumeMap,
|
||||||
final RamDiskReplicaTracker lazyWriteReplicaMap) {
|
final RamDiskReplicaTracker lazyWriteReplicaMap) {
|
||||||
ReplicaMap tmpReplicaMap = new ReplicaMap(new AutoCloseableLock());
|
ReplicaMap tmpReplicaMap = new ReplicaMap(new ReentrantReadWriteLock());
|
||||||
File replicaFile = new File(currentDir, REPLICA_CACHE_FILE);
|
File replicaFile = new File(currentDir, REPLICA_CACHE_FILE);
|
||||||
// Check whether the file exists or not.
|
// Check whether the file exists or not.
|
||||||
if (!replicaFile.exists()) {
|
if (!replicaFile.exists()) {
|
||||||
|
|
|
@ -89,7 +89,7 @@ import org.apache.hadoop.util.Daemon;
|
||||||
import org.apache.hadoop.util.DataChecksum;
|
import org.apache.hadoop.util.DataChecksum;
|
||||||
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
|
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
|
||||||
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
|
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
|
||||||
import org.apache.hadoop.util.InstrumentedLock;
|
import org.apache.hadoop.util.InstrumentedReadWriteLock;
|
||||||
import org.apache.hadoop.util.ReflectionUtils;
|
import org.apache.hadoop.util.ReflectionUtils;
|
||||||
import org.apache.hadoop.util.Time;
|
import org.apache.hadoop.util.Time;
|
||||||
import org.apache.hadoop.util.Timer;
|
import org.apache.hadoop.util.Timer;
|
||||||
|
@ -125,7 +125,7 @@ import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.Executor;
|
import java.util.concurrent.Executor;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.locks.Condition;
|
import java.util.concurrent.locks.Condition;
|
||||||
import java.util.concurrent.locks.ReentrantLock;
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
|
|
||||||
/**************************************************
|
/**************************************************
|
||||||
* FSDataset manages a set of data blocks. Each block
|
* FSDataset manages a set of data blocks. Each block
|
||||||
|
@ -179,7 +179,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public FsVolumeImpl getVolume(final ExtendedBlock b) {
|
public FsVolumeImpl getVolume(final ExtendedBlock b) {
|
||||||
try(AutoCloseableLock lock = datasetLock.acquire()) {
|
try(AutoCloseableLock lock = datasetWriteLock.acquire()) {
|
||||||
final ReplicaInfo r =
|
final ReplicaInfo r =
|
||||||
volumeMap.get(b.getBlockPoolId(), b.getLocalBlock());
|
volumeMap.get(b.getBlockPoolId(), b.getLocalBlock());
|
||||||
return r != null ? (FsVolumeImpl) r.getVolume() : null;
|
return r != null ? (FsVolumeImpl) r.getVolume() : null;
|
||||||
|
@ -189,7 +189,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
@Override // FsDatasetSpi
|
@Override // FsDatasetSpi
|
||||||
public Block getStoredBlock(String bpid, long blkid)
|
public Block getStoredBlock(String bpid, long blkid)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
try(AutoCloseableLock lock = datasetLock.acquire()) {
|
try(AutoCloseableLock lock = datasetWriteLock.acquire()) {
|
||||||
File blockfile = null;
|
File blockfile = null;
|
||||||
|
|
||||||
ReplicaInfo info = volumeMap.get(bpid, blkid);
|
ReplicaInfo info = volumeMap.get(bpid, blkid);
|
||||||
|
@ -210,7 +210,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
public Set<? extends Replica> deepCopyReplica(String bpid)
|
public Set<? extends Replica> deepCopyReplica(String bpid)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
Set<? extends Replica> replicas = null;
|
Set<? extends Replica> replicas = null;
|
||||||
try (AutoCloseableLock lock = datasetLock.acquire()) {
|
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
|
||||||
replicas = new HashSet<>(volumeMap.replicas(bpid) == null ? Collections.
|
replicas = new HashSet<>(volumeMap.replicas(bpid) == null ? Collections.
|
||||||
EMPTY_SET : volumeMap.replicas(bpid));
|
EMPTY_SET : volumeMap.replicas(bpid));
|
||||||
}
|
}
|
||||||
|
@ -250,7 +250,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
try (AutoCloseableLock lock = datasetLock.acquire()) {
|
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
|
||||||
final ReplicaInfo replicaInfo = getReplicaInfo(b);
|
final ReplicaInfo replicaInfo = getReplicaInfo(b);
|
||||||
if (replicaInfo != null) {
|
if (replicaInfo != null) {
|
||||||
volume = replicaInfo.getVolume();
|
volume = replicaInfo.getVolume();
|
||||||
|
@ -294,8 +294,12 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
private final int maxDataLength;
|
private final int maxDataLength;
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
final AutoCloseableLock datasetLock;
|
final AutoCloseableLock datasetWriteLock;
|
||||||
private final Condition datasetLockCondition;
|
@VisibleForTesting
|
||||||
|
final AutoCloseableLock datasetReadLock;
|
||||||
|
@VisibleForTesting
|
||||||
|
final InstrumentedReadWriteLock datasetRWLock;
|
||||||
|
private final Condition datasetWriteLockCondition;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* An FSDataset has a directory where it loads its data files.
|
* An FSDataset has a directory where it loads its data files.
|
||||||
|
@ -307,15 +311,20 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
this.dataStorage = storage;
|
this.dataStorage = storage;
|
||||||
this.conf = conf;
|
this.conf = conf;
|
||||||
this.smallBufferSize = DFSUtilClient.getSmallBufferSize(conf);
|
this.smallBufferSize = DFSUtilClient.getSmallBufferSize(conf);
|
||||||
this.datasetLock = new AutoCloseableLock(
|
this.datasetRWLock = new InstrumentedReadWriteLock(
|
||||||
new InstrumentedLock(getClass().getName(), LOG,
|
conf.getBoolean(DFSConfigKeys.DFS_DATANODE_LOCK_FAIR_KEY,
|
||||||
new ReentrantLock(true),
|
DFSConfigKeys.DFS_DATANODE_LOCK_FAIR_DEFAULT),
|
||||||
conf.getTimeDuration(
|
"FsDatasetRWLock", LOG, conf.getTimeDuration(
|
||||||
DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_KEY,
|
DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_KEY,
|
||||||
DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_DEFAULT,
|
DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_DEFAULT,
|
||||||
TimeUnit.MILLISECONDS),
|
TimeUnit.MILLISECONDS),
|
||||||
300));
|
conf.getTimeDuration(
|
||||||
this.datasetLockCondition = datasetLock.newCondition();
|
DFSConfigKeys.DFS_DATANODE_LOCK_REPORTING_THRESHOLD_MS_KEY,
|
||||||
|
DFSConfigKeys.DFS_DATANODE_LOCK_REPORTING_THRESHOLD_MS_DEFAULT,
|
||||||
|
TimeUnit.MILLISECONDS));
|
||||||
|
this.datasetWriteLock = new AutoCloseableLock(datasetRWLock.writeLock());
|
||||||
|
this.datasetReadLock = new AutoCloseableLock(datasetRWLock.readLock());
|
||||||
|
this.datasetWriteLockCondition = datasetWriteLock.newCondition();
|
||||||
|
|
||||||
// The number of volumes required for operation is the total number
|
// The number of volumes required for operation is the total number
|
||||||
// of volumes minus the number of failed volumes we can tolerate.
|
// of volumes minus the number of failed volumes we can tolerate.
|
||||||
|
@ -354,7 +363,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
}
|
}
|
||||||
|
|
||||||
storageMap = new ConcurrentHashMap<String, DatanodeStorage>();
|
storageMap = new ConcurrentHashMap<String, DatanodeStorage>();
|
||||||
volumeMap = new ReplicaMap(datasetLock);
|
volumeMap = new ReplicaMap(datasetRWLock);
|
||||||
ramDiskReplicaTracker = RamDiskReplicaTracker.getInstance(conf, this);
|
ramDiskReplicaTracker = RamDiskReplicaTracker.getInstance(conf, this);
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
|
@ -444,7 +453,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
ReplicaMap replicaMap,
|
ReplicaMap replicaMap,
|
||||||
Storage.StorageDirectory sd, StorageType storageType,
|
Storage.StorageDirectory sd, StorageType storageType,
|
||||||
FsVolumeReference ref) throws IOException {
|
FsVolumeReference ref) throws IOException {
|
||||||
try(AutoCloseableLock lock = datasetLock.acquire()) {
|
try(AutoCloseableLock lock = datasetWriteLock.acquire()) {
|
||||||
DatanodeStorage dnStorage = storageMap.get(sd.getStorageUuid());
|
DatanodeStorage dnStorage = storageMap.get(sd.getStorageUuid());
|
||||||
if (dnStorage != null) {
|
if (dnStorage != null) {
|
||||||
final String errorMsg = String.format(
|
final String errorMsg = String.format(
|
||||||
|
@ -475,7 +484,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
FsVolumeImpl fsVolume = new FsVolumeImpl(
|
FsVolumeImpl fsVolume = new FsVolumeImpl(
|
||||||
this, sd.getStorageUuid(), dir, this.conf, storageType);
|
this, sd.getStorageUuid(), dir, this.conf, storageType);
|
||||||
FsVolumeReference ref = fsVolume.obtainReference();
|
FsVolumeReference ref = fsVolume.obtainReference();
|
||||||
ReplicaMap tempVolumeMap = new ReplicaMap(datasetLock);
|
ReplicaMap tempVolumeMap = new ReplicaMap(datasetRWLock);
|
||||||
fsVolume.getVolumeMap(tempVolumeMap, ramDiskReplicaTracker);
|
fsVolume.getVolumeMap(tempVolumeMap, ramDiskReplicaTracker);
|
||||||
|
|
||||||
activateVolume(tempVolumeMap, sd, storageType, ref);
|
activateVolume(tempVolumeMap, sd, storageType, ref);
|
||||||
|
@ -509,7 +518,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
StorageType storageType = location.getStorageType();
|
StorageType storageType = location.getStorageType();
|
||||||
final FsVolumeImpl fsVolume =
|
final FsVolumeImpl fsVolume =
|
||||||
createFsVolume(sd.getStorageUuid(), sd.getCurrentDir(), storageType);
|
createFsVolume(sd.getStorageUuid(), sd.getCurrentDir(), storageType);
|
||||||
final ReplicaMap tempVolumeMap = new ReplicaMap(new AutoCloseableLock());
|
final ReplicaMap tempVolumeMap =
|
||||||
|
new ReplicaMap(new ReentrantReadWriteLock());
|
||||||
ArrayList<IOException> exceptions = Lists.newArrayList();
|
ArrayList<IOException> exceptions = Lists.newArrayList();
|
||||||
|
|
||||||
for (final NamespaceInfo nsInfo : nsInfos) {
|
for (final NamespaceInfo nsInfo : nsInfos) {
|
||||||
|
@ -542,7 +552,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Removes a set of volumes from FsDataset.
|
* Removes a set of volumes from FsDataset.
|
||||||
* @param volumesToRemove a set of absolute root path of each volume.
|
* @param storageLocsToRemove a set of
|
||||||
|
* {@link StorageLocation}s for each volume.
|
||||||
* @param clearFailure set true to clear failure information.
|
* @param clearFailure set true to clear failure information.
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
|
@ -558,7 +569,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
|
|
||||||
Map<String, List<ReplicaInfo>> blkToInvalidate = new HashMap<>();
|
Map<String, List<ReplicaInfo>> blkToInvalidate = new HashMap<>();
|
||||||
List<String> storageToRemove = new ArrayList<>();
|
List<String> storageToRemove = new ArrayList<>();
|
||||||
try(AutoCloseableLock lock = datasetLock.acquire()) {
|
try(AutoCloseableLock lock = datasetWriteLock.acquire()) {
|
||||||
for (int idx = 0; idx < dataStorage.getNumStorageDirs(); idx++) {
|
for (int idx = 0; idx < dataStorage.getNumStorageDirs(); idx++) {
|
||||||
Storage.StorageDirectory sd = dataStorage.getStorageDir(idx);
|
Storage.StorageDirectory sd = dataStorage.getStorageDir(idx);
|
||||||
final File absRoot = sd.getRoot().getAbsoluteFile();
|
final File absRoot = sd.getRoot().getAbsoluteFile();
|
||||||
|
@ -568,7 +579,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
// Disable the volume from the service.
|
// Disable the volume from the service.
|
||||||
asyncDiskService.removeVolume(sd.getCurrentDir());
|
asyncDiskService.removeVolume(sd.getCurrentDir());
|
||||||
volumes.removeVolume(absRoot, clearFailure);
|
volumes.removeVolume(absRoot, clearFailure);
|
||||||
volumes.waitVolumeRemoved(5000, datasetLockCondition);
|
volumes.waitVolumeRemoved(5000, datasetWriteLockCondition);
|
||||||
|
|
||||||
// Removed all replica information for the blocks on the volume.
|
// Removed all replica information for the blocks on the volume.
|
||||||
// Unlike updating the volumeMap in addVolume(), this operation does
|
// Unlike updating the volumeMap in addVolume(), this operation does
|
||||||
|
@ -616,7 +627,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
try(AutoCloseableLock lock = datasetLock.acquire()) {
|
try(AutoCloseableLock lock = datasetWriteLock.acquire()) {
|
||||||
for(String storageUuid : storageToRemove) {
|
for(String storageUuid : storageToRemove) {
|
||||||
storageMap.remove(storageUuid);
|
storageMap.remove(storageUuid);
|
||||||
}
|
}
|
||||||
|
@ -814,7 +825,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
public InputStream getBlockInputStream(ExtendedBlock b,
|
public InputStream getBlockInputStream(ExtendedBlock b,
|
||||||
long seekOffset) throws IOException {
|
long seekOffset) throws IOException {
|
||||||
ReplicaInfo info;
|
ReplicaInfo info;
|
||||||
try (AutoCloseableLock lock = datasetLock.acquire()) {
|
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
|
||||||
info = volumeMap.get(b.getBlockPoolId(), b.getLocalBlock());
|
info = volumeMap.get(b.getBlockPoolId(), b.getLocalBlock());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -904,7 +915,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
@Override // FsDatasetSpi
|
@Override // FsDatasetSpi
|
||||||
public ReplicaInputStreams getTmpInputStreams(ExtendedBlock b,
|
public ReplicaInputStreams getTmpInputStreams(ExtendedBlock b,
|
||||||
long blkOffset, long metaOffset) throws IOException {
|
long blkOffset, long metaOffset) throws IOException {
|
||||||
try(AutoCloseableLock lock = datasetLock.acquire()) {
|
try(AutoCloseableLock lock = datasetWriteLock.acquire()) {
|
||||||
final ReplicaInfo info = getReplicaInfo(b);
|
final ReplicaInfo info = getReplicaInfo(b);
|
||||||
final FileIoProvider fileIoProvider = datanode.getFileIoProvider();
|
final FileIoProvider fileIoProvider = datanode.getFileIoProvider();
|
||||||
FsVolumeReference ref = info.getVolume().obtainReference();
|
FsVolumeReference ref = info.getVolume().obtainReference();
|
||||||
|
@ -1026,7 +1037,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
}
|
}
|
||||||
|
|
||||||
FsVolumeReference volumeRef = null;
|
FsVolumeReference volumeRef = null;
|
||||||
try(AutoCloseableLock lock = datasetLock.acquire()) {
|
try(AutoCloseableLock lock = datasetWriteLock.acquire()) {
|
||||||
volumeRef = volumes.getNextVolume(targetStorageType, block.getNumBytes());
|
volumeRef = volumes.getNextVolume(targetStorageType, block.getNumBytes());
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
|
@ -1045,7 +1056,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
newReplicaInfo.setNumBytes(blockFiles[1].length());
|
newReplicaInfo.setNumBytes(blockFiles[1].length());
|
||||||
// Finalize the copied files
|
// Finalize the copied files
|
||||||
newReplicaInfo = finalizeReplica(block.getBlockPoolId(), newReplicaInfo);
|
newReplicaInfo = finalizeReplica(block.getBlockPoolId(), newReplicaInfo);
|
||||||
try(AutoCloseableLock lock = datasetLock.acquire()) {
|
try(AutoCloseableLock lock = datasetWriteLock.acquire()) {
|
||||||
// Increment numBlocks here as this block moved without knowing to BPS
|
// Increment numBlocks here as this block moved without knowing to BPS
|
||||||
FsVolumeImpl volume = (FsVolumeImpl) newReplicaInfo.getVolume();
|
FsVolumeImpl volume = (FsVolumeImpl) newReplicaInfo.getVolume();
|
||||||
volume.getBlockPoolSlice(block.getBlockPoolId()).incrNumBlocks();
|
volume.getBlockPoolSlice(block.getBlockPoolId()).incrNumBlocks();
|
||||||
|
@ -1190,7 +1201,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
@Override // FsDatasetSpi
|
@Override // FsDatasetSpi
|
||||||
public ReplicaHandler append(ExtendedBlock b,
|
public ReplicaHandler append(ExtendedBlock b,
|
||||||
long newGS, long expectedBlockLen) throws IOException {
|
long newGS, long expectedBlockLen) throws IOException {
|
||||||
try(AutoCloseableLock lock = datasetLock.acquire()) {
|
try(AutoCloseableLock lock = datasetWriteLock.acquire()) {
|
||||||
// If the block was successfully finalized because all packets
|
// If the block was successfully finalized because all packets
|
||||||
// were successfully processed at the Datanode but the ack for
|
// were successfully processed at the Datanode but the ack for
|
||||||
// some of the packets were not received by the client. The client
|
// some of the packets were not received by the client. The client
|
||||||
|
@ -1242,7 +1253,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
private ReplicaBeingWritten append(String bpid,
|
private ReplicaBeingWritten append(String bpid,
|
||||||
FinalizedReplica replicaInfo, long newGS, long estimateBlockLen)
|
FinalizedReplica replicaInfo, long newGS, long estimateBlockLen)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
try(AutoCloseableLock lock = datasetLock.acquire()) {
|
try(AutoCloseableLock lock = datasetWriteLock.acquire()) {
|
||||||
// If the block is cached, start uncaching it.
|
// If the block is cached, start uncaching it.
|
||||||
cacheManager.uncacheBlock(bpid, replicaInfo.getBlockId());
|
cacheManager.uncacheBlock(bpid, replicaInfo.getBlockId());
|
||||||
|
|
||||||
|
@ -1378,7 +1389,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
try {
|
try {
|
||||||
try(AutoCloseableLock lock = datasetLock.acquire()) {
|
try(AutoCloseableLock lock = datasetWriteLock.acquire()) {
|
||||||
ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen);
|
ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen);
|
||||||
|
|
||||||
FsVolumeReference ref = replicaInfo.getVolume().obtainReference();
|
FsVolumeReference ref = replicaInfo.getVolume().obtainReference();
|
||||||
|
@ -1410,7 +1421,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
LOG.info("Recover failed close " + b);
|
LOG.info("Recover failed close " + b);
|
||||||
while (true) {
|
while (true) {
|
||||||
try {
|
try {
|
||||||
try(AutoCloseableLock lock = datasetLock.acquire()) {
|
try(AutoCloseableLock lock = datasetWriteLock.acquire()) {
|
||||||
// check replica's state
|
// check replica's state
|
||||||
ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen);
|
ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen);
|
||||||
// bump the replica's GS
|
// bump the replica's GS
|
||||||
|
@ -1461,7 +1472,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
public ReplicaHandler createRbw(
|
public ReplicaHandler createRbw(
|
||||||
StorageType storageType, ExtendedBlock b, boolean allowLazyPersist)
|
StorageType storageType, ExtendedBlock b, boolean allowLazyPersist)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
try(AutoCloseableLock lock = datasetLock.acquire()) {
|
try(AutoCloseableLock lock = datasetWriteLock.acquire()) {
|
||||||
ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
|
ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
|
||||||
b.getBlockId());
|
b.getBlockId());
|
||||||
if (replicaInfo != null) {
|
if (replicaInfo != null) {
|
||||||
|
@ -1527,7 +1538,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
try {
|
try {
|
||||||
try(AutoCloseableLock lock = datasetLock.acquire()) {
|
try(AutoCloseableLock lock = datasetWriteLock.acquire()) {
|
||||||
ReplicaInfo replicaInfo = getReplicaInfo(b.getBlockPoolId(), b.getBlockId());
|
ReplicaInfo replicaInfo = getReplicaInfo(b.getBlockPoolId(), b.getBlockId());
|
||||||
|
|
||||||
// check the replica's state
|
// check the replica's state
|
||||||
|
@ -1551,7 +1562,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
private ReplicaHandler recoverRbwImpl(ReplicaBeingWritten rbw,
|
private ReplicaHandler recoverRbwImpl(ReplicaBeingWritten rbw,
|
||||||
ExtendedBlock b, long newGS, long minBytesRcvd, long maxBytesRcvd)
|
ExtendedBlock b, long newGS, long minBytesRcvd, long maxBytesRcvd)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
try(AutoCloseableLock lock = datasetLock.acquire()) {
|
try(AutoCloseableLock lock = datasetWriteLock.acquire()) {
|
||||||
// check generation stamp
|
// check generation stamp
|
||||||
long replicaGenerationStamp = rbw.getGenerationStamp();
|
long replicaGenerationStamp = rbw.getGenerationStamp();
|
||||||
if (replicaGenerationStamp < b.getGenerationStamp() ||
|
if (replicaGenerationStamp < b.getGenerationStamp() ||
|
||||||
|
@ -1613,7 +1624,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
@Override // FsDatasetSpi
|
@Override // FsDatasetSpi
|
||||||
public ReplicaInPipeline convertTemporaryToRbw(
|
public ReplicaInPipeline convertTemporaryToRbw(
|
||||||
final ExtendedBlock b) throws IOException {
|
final ExtendedBlock b) throws IOException {
|
||||||
try(AutoCloseableLock lock = datasetLock.acquire()) {
|
try(AutoCloseableLock lock = datasetWriteLock.acquire()) {
|
||||||
final long blockId = b.getBlockId();
|
final long blockId = b.getBlockId();
|
||||||
final long expectedGs = b.getGenerationStamp();
|
final long expectedGs = b.getGenerationStamp();
|
||||||
final long visible = b.getNumBytes();
|
final long visible = b.getNumBytes();
|
||||||
|
@ -1687,7 +1698,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
ReplicaInfo lastFoundReplicaInfo = null;
|
ReplicaInfo lastFoundReplicaInfo = null;
|
||||||
boolean isInPipeline = false;
|
boolean isInPipeline = false;
|
||||||
do {
|
do {
|
||||||
try(AutoCloseableLock lock = datasetLock.acquire()) {
|
try(AutoCloseableLock lock = datasetWriteLock.acquire()) {
|
||||||
ReplicaInfo currentReplicaInfo =
|
ReplicaInfo currentReplicaInfo =
|
||||||
volumeMap.get(b.getBlockPoolId(), b.getBlockId());
|
volumeMap.get(b.getBlockPoolId(), b.getBlockId());
|
||||||
if (currentReplicaInfo == lastFoundReplicaInfo) {
|
if (currentReplicaInfo == lastFoundReplicaInfo) {
|
||||||
|
@ -1733,7 +1744,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
invalidate(b.getBlockPoolId(), new Block[] { lastFoundReplicaInfo },
|
invalidate(b.getBlockPoolId(), new Block[] { lastFoundReplicaInfo },
|
||||||
false);
|
false);
|
||||||
}
|
}
|
||||||
try (AutoCloseableLock lock = datasetLock.acquire()) {
|
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
|
||||||
FsVolumeReference ref = volumes.getNextVolume(storageType, b
|
FsVolumeReference ref = volumes.getNextVolume(storageType, b
|
||||||
.getNumBytes());
|
.getNumBytes());
|
||||||
FsVolumeImpl v = (FsVolumeImpl) ref.getVolume();
|
FsVolumeImpl v = (FsVolumeImpl) ref.getVolume();
|
||||||
|
@ -1787,7 +1798,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
throws IOException {
|
throws IOException {
|
||||||
ReplicaInfo replicaInfo = null;
|
ReplicaInfo replicaInfo = null;
|
||||||
ReplicaInfo finalizedReplicaInfo = null;
|
ReplicaInfo finalizedReplicaInfo = null;
|
||||||
try(AutoCloseableLock lock = datasetLock.acquire()) {
|
try(AutoCloseableLock lock = datasetWriteLock.acquire()) {
|
||||||
if (Thread.interrupted()) {
|
if (Thread.interrupted()) {
|
||||||
// Don't allow data modifications from interrupted threads
|
// Don't allow data modifications from interrupted threads
|
||||||
throw new IOException("Cannot finalize block from Interrupted Thread");
|
throw new IOException("Cannot finalize block from Interrupted Thread");
|
||||||
|
@ -1817,7 +1828,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
|
|
||||||
private FinalizedReplica finalizeReplica(String bpid, ReplicaInfo replicaInfo)
|
private FinalizedReplica finalizeReplica(String bpid, ReplicaInfo replicaInfo)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
try(AutoCloseableLock lock = datasetLock.acquire()) {
|
try(AutoCloseableLock lock = datasetWriteLock.acquire()) {
|
||||||
FinalizedReplica newReplicaInfo = null;
|
FinalizedReplica newReplicaInfo = null;
|
||||||
if (replicaInfo.getState() == ReplicaState.RUR &&
|
if (replicaInfo.getState() == ReplicaState.RUR &&
|
||||||
((ReplicaUnderRecovery) replicaInfo).getOriginalReplica().getState()
|
((ReplicaUnderRecovery) replicaInfo).getOriginalReplica().getState()
|
||||||
|
@ -1871,7 +1882,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
*/
|
*/
|
||||||
@Override // FsDatasetSpi
|
@Override // FsDatasetSpi
|
||||||
public void unfinalizeBlock(ExtendedBlock b) throws IOException {
|
public void unfinalizeBlock(ExtendedBlock b) throws IOException {
|
||||||
try(AutoCloseableLock lock = datasetLock.acquire()) {
|
try(AutoCloseableLock lock = datasetWriteLock.acquire()) {
|
||||||
ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
|
ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
|
||||||
b.getLocalBlock());
|
b.getLocalBlock());
|
||||||
if (replicaInfo != null
|
if (replicaInfo != null
|
||||||
|
@ -1926,7 +1937,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
new HashMap<String, BlockListAsLongs.Builder>();
|
new HashMap<String, BlockListAsLongs.Builder>();
|
||||||
|
|
||||||
List<FsVolumeImpl> curVolumes = null;
|
List<FsVolumeImpl> curVolumes = null;
|
||||||
try(AutoCloseableLock lock = datasetLock.acquire()) {
|
try(AutoCloseableLock lock = datasetWriteLock.acquire()) {
|
||||||
curVolumes = volumes.getVolumes();
|
curVolumes = volumes.getVolumes();
|
||||||
for (FsVolumeSpi v : curVolumes) {
|
for (FsVolumeSpi v : curVolumes) {
|
||||||
builders.put(v.getStorageID(), BlockListAsLongs.builder(maxDataLength));
|
builders.put(v.getStorageID(), BlockListAsLongs.builder(maxDataLength));
|
||||||
|
@ -1983,7 +1994,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
* Gets a list of references to the finalized blocks for the given block pool.
|
* Gets a list of references to the finalized blocks for the given block pool.
|
||||||
* <p>
|
* <p>
|
||||||
* Callers of this function should call
|
* Callers of this function should call
|
||||||
* {@link FsDatasetSpi#acquireDatasetLock} to avoid blocks' status being
|
* {@link FsDatasetSpi#acquireDatasetLock()} to avoid blocks' status being
|
||||||
* changed during list iteration.
|
* changed during list iteration.
|
||||||
* </p>
|
* </p>
|
||||||
* @return a list of references to the finalized blocks for the given block
|
* @return a list of references to the finalized blocks for the given block
|
||||||
|
@ -1991,7 +2002,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public List<FinalizedReplica> getFinalizedBlocks(String bpid) {
|
public List<FinalizedReplica> getFinalizedBlocks(String bpid) {
|
||||||
try(AutoCloseableLock lock = datasetLock.acquire()) {
|
try(AutoCloseableLock lock = datasetWriteLock.acquire()) {
|
||||||
final ArrayList<FinalizedReplica> finalized =
|
final ArrayList<FinalizedReplica> finalized =
|
||||||
new ArrayList<FinalizedReplica>(volumeMap.size(bpid));
|
new ArrayList<FinalizedReplica>(volumeMap.size(bpid));
|
||||||
for (ReplicaInfo b : volumeMap.replicas(bpid)) {
|
for (ReplicaInfo b : volumeMap.replicas(bpid)) {
|
||||||
|
@ -2077,7 +2088,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
//Should we check for metadata file too?
|
//Should we check for metadata file too?
|
||||||
File f = null;
|
File f = null;
|
||||||
ReplicaInfo info;
|
ReplicaInfo info;
|
||||||
try(AutoCloseableLock lock = datasetLock.acquire()) {
|
try(AutoCloseableLock lock = datasetWriteLock.acquire()) {
|
||||||
info = volumeMap.get(bpid, blockId);
|
info = volumeMap.get(bpid, blockId);
|
||||||
if (info != null) {
|
if (info != null) {
|
||||||
f = info.getBlockFile();
|
f = info.getBlockFile();
|
||||||
|
@ -2135,7 +2146,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
for (int i = 0; i < invalidBlks.length; i++) {
|
for (int i = 0; i < invalidBlks.length; i++) {
|
||||||
final File f;
|
final File f;
|
||||||
final FsVolumeImpl v;
|
final FsVolumeImpl v;
|
||||||
try(AutoCloseableLock lock = datasetLock.acquire()) {
|
try(AutoCloseableLock lock = datasetWriteLock.acquire()) {
|
||||||
final ReplicaInfo info = volumeMap.get(bpid, invalidBlks[i]);
|
final ReplicaInfo info = volumeMap.get(bpid, invalidBlks[i]);
|
||||||
if (info == null) {
|
if (info == null) {
|
||||||
ReplicaInfo infoByBlockId =
|
ReplicaInfo infoByBlockId =
|
||||||
|
@ -2258,7 +2269,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
long length, genstamp;
|
long length, genstamp;
|
||||||
Executor volumeExecutor;
|
Executor volumeExecutor;
|
||||||
|
|
||||||
try(AutoCloseableLock lock = datasetLock.acquire()) {
|
try(AutoCloseableLock lock = datasetWriteLock.acquire()) {
|
||||||
ReplicaInfo info = volumeMap.get(bpid, blockId);
|
ReplicaInfo info = volumeMap.get(bpid, blockId);
|
||||||
boolean success = false;
|
boolean success = false;
|
||||||
try {
|
try {
|
||||||
|
@ -2326,7 +2337,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
|
|
||||||
@Override // FsDatasetSpi
|
@Override // FsDatasetSpi
|
||||||
public boolean contains(final ExtendedBlock block) {
|
public boolean contains(final ExtendedBlock block) {
|
||||||
try(AutoCloseableLock lock = datasetLock.acquire()) {
|
try(AutoCloseableLock lock = datasetWriteLock.acquire()) {
|
||||||
final long blockId = block.getLocalBlock().getBlockId();
|
final long blockId = block.getLocalBlock().getBlockId();
|
||||||
return getFile(block.getBlockPoolId(), blockId, false) != null;
|
return getFile(block.getBlockPoolId(), blockId, false) != null;
|
||||||
}
|
}
|
||||||
|
@ -2456,7 +2467,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
File diskMetaFile, FsVolumeSpi vol) throws IOException {
|
File diskMetaFile, FsVolumeSpi vol) throws IOException {
|
||||||
Block corruptBlock = null;
|
Block corruptBlock = null;
|
||||||
ReplicaInfo memBlockInfo;
|
ReplicaInfo memBlockInfo;
|
||||||
try(AutoCloseableLock lock = datasetLock.acquire()) {
|
try(AutoCloseableLock lock = datasetWriteLock.acquire()) {
|
||||||
memBlockInfo = volumeMap.get(bpid, blockId);
|
memBlockInfo = volumeMap.get(bpid, blockId);
|
||||||
if (memBlockInfo != null && memBlockInfo.getState() != ReplicaState.FINALIZED) {
|
if (memBlockInfo != null && memBlockInfo.getState() != ReplicaState.FINALIZED) {
|
||||||
// Block is not finalized - ignore the difference
|
// Block is not finalized - ignore the difference
|
||||||
|
@ -2621,7 +2632,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String getReplicaString(String bpid, long blockId) {
|
public String getReplicaString(String bpid, long blockId) {
|
||||||
try(AutoCloseableLock lock = datasetLock.acquire()) {
|
try(AutoCloseableLock lock = datasetWriteLock.acquire()) {
|
||||||
final Replica r = volumeMap.get(bpid, blockId);
|
final Replica r = volumeMap.get(bpid, blockId);
|
||||||
return r == null ? "null" : r.toString();
|
return r == null ? "null" : r.toString();
|
||||||
}
|
}
|
||||||
|
@ -2726,7 +2737,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
final long recoveryId,
|
final long recoveryId,
|
||||||
final long newBlockId,
|
final long newBlockId,
|
||||||
final long newlength) throws IOException {
|
final long newlength) throws IOException {
|
||||||
try(AutoCloseableLock lock = datasetLock.acquire()) {
|
try(AutoCloseableLock lock = datasetWriteLock.acquire()) {
|
||||||
//get replica
|
//get replica
|
||||||
final String bpid = oldBlock.getBlockPoolId();
|
final String bpid = oldBlock.getBlockPoolId();
|
||||||
final ReplicaInfo replica = volumeMap.get(bpid, oldBlock.getBlockId());
|
final ReplicaInfo replica = volumeMap.get(bpid, oldBlock.getBlockId());
|
||||||
|
@ -2868,7 +2879,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
@Override // FsDatasetSpi
|
@Override // FsDatasetSpi
|
||||||
public long getReplicaVisibleLength(final ExtendedBlock block)
|
public long getReplicaVisibleLength(final ExtendedBlock block)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
try(AutoCloseableLock lock = datasetLock.acquire()) {
|
try(AutoCloseableLock lock = datasetWriteLock.acquire()) {
|
||||||
final Replica replica = getReplicaInfo(block.getBlockPoolId(),
|
final Replica replica = getReplicaInfo(block.getBlockPoolId(),
|
||||||
block.getBlockId());
|
block.getBlockId());
|
||||||
if (replica.getGenerationStamp() < block.getGenerationStamp()) {
|
if (replica.getGenerationStamp() < block.getGenerationStamp()) {
|
||||||
|
@ -2884,7 +2895,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
public void addBlockPool(String bpid, Configuration conf)
|
public void addBlockPool(String bpid, Configuration conf)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
LOG.info("Adding block pool " + bpid);
|
LOG.info("Adding block pool " + bpid);
|
||||||
try(AutoCloseableLock lock = datasetLock.acquire()) {
|
try(AutoCloseableLock lock = datasetWriteLock.acquire()) {
|
||||||
volumes.addBlockPool(bpid, conf);
|
volumes.addBlockPool(bpid, conf);
|
||||||
volumeMap.initBlockPool(bpid);
|
volumeMap.initBlockPool(bpid);
|
||||||
}
|
}
|
||||||
|
@ -2893,7 +2904,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void shutdownBlockPool(String bpid) {
|
public void shutdownBlockPool(String bpid) {
|
||||||
try(AutoCloseableLock lock = datasetLock.acquire()) {
|
try(AutoCloseableLock lock = datasetWriteLock.acquire()) {
|
||||||
LOG.info("Removing block pool " + bpid);
|
LOG.info("Removing block pool " + bpid);
|
||||||
Map<DatanodeStorage, BlockListAsLongs> blocksPerVolume =
|
Map<DatanodeStorage, BlockListAsLongs> blocksPerVolume =
|
||||||
getBlockReports(bpid);
|
getBlockReports(bpid);
|
||||||
|
@ -2967,7 +2978,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
@Override //FsDatasetSpi
|
@Override //FsDatasetSpi
|
||||||
public void deleteBlockPool(String bpid, boolean force)
|
public void deleteBlockPool(String bpid, boolean force)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
try(AutoCloseableLock lock = datasetLock.acquire()) {
|
try(AutoCloseableLock lock = datasetWriteLock.acquire()) {
|
||||||
List<FsVolumeImpl> curVolumes = volumes.getVolumes();
|
List<FsVolumeImpl> curVolumes = volumes.getVolumes();
|
||||||
if (!force) {
|
if (!force) {
|
||||||
for (FsVolumeImpl volume : curVolumes) {
|
for (FsVolumeImpl volume : curVolumes) {
|
||||||
|
@ -2996,7 +3007,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
@Override // FsDatasetSpi
|
@Override // FsDatasetSpi
|
||||||
public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block)
|
public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
try(AutoCloseableLock lock = datasetLock.acquire()) {
|
try(AutoCloseableLock lock = datasetWriteLock.acquire()) {
|
||||||
final Replica replica = volumeMap.get(block.getBlockPoolId(),
|
final Replica replica = volumeMap.get(block.getBlockPoolId(),
|
||||||
block.getBlockId());
|
block.getBlockId());
|
||||||
if (replica == null) {
|
if (replica == null) {
|
||||||
|
@ -3089,7 +3100,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
@Override
|
@Override
|
||||||
public void onCompleteLazyPersist(String bpId, long blockId,
|
public void onCompleteLazyPersist(String bpId, long blockId,
|
||||||
long creationTime, File[] savedFiles, FsVolumeImpl targetVolume) {
|
long creationTime, File[] savedFiles, FsVolumeImpl targetVolume) {
|
||||||
try(AutoCloseableLock lock = datasetLock.acquire()) {
|
try(AutoCloseableLock lock = datasetWriteLock.acquire()) {
|
||||||
ramDiskReplicaTracker.recordEndLazyPersist(bpId, blockId, savedFiles);
|
ramDiskReplicaTracker.recordEndLazyPersist(bpId, blockId, savedFiles);
|
||||||
|
|
||||||
targetVolume.incDfsUsedAndNumBlocks(bpId, savedFiles[0].length()
|
targetVolume.incDfsUsedAndNumBlocks(bpId, savedFiles[0].length()
|
||||||
|
@ -3223,7 +3234,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
try {
|
try {
|
||||||
block = ramDiskReplicaTracker.dequeueNextReplicaToPersist();
|
block = ramDiskReplicaTracker.dequeueNextReplicaToPersist();
|
||||||
if (block != null) {
|
if (block != null) {
|
||||||
try(AutoCloseableLock lock = datasetLock.acquire()) {
|
try(AutoCloseableLock lock = datasetWriteLock.acquire()) {
|
||||||
replicaInfo = volumeMap.get(block.getBlockPoolId(), block.getBlockId());
|
replicaInfo = volumeMap.get(block.getBlockPoolId(), block.getBlockId());
|
||||||
|
|
||||||
// If replicaInfo is null, the block was either deleted before
|
// If replicaInfo is null, the block was either deleted before
|
||||||
|
@ -3293,7 +3304,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
long blockFileUsed, metaFileUsed;
|
long blockFileUsed, metaFileUsed;
|
||||||
final String bpid = replicaState.getBlockPoolId();
|
final String bpid = replicaState.getBlockPoolId();
|
||||||
|
|
||||||
try(AutoCloseableLock lock = datasetLock.acquire()) {
|
try(AutoCloseableLock lock = datasetWriteLock.acquire()) {
|
||||||
replicaInfo = getReplicaInfo(replicaState.getBlockPoolId(),
|
replicaInfo = getReplicaInfo(replicaState.getBlockPoolId(),
|
||||||
replicaState.getBlockId());
|
replicaState.getBlockId());
|
||||||
Preconditions.checkState(replicaInfo.getVolume().isTransientStorage());
|
Preconditions.checkState(replicaInfo.getVolume().isTransientStorage());
|
||||||
|
@ -3405,7 +3416,12 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public AutoCloseableLock acquireDatasetLock() {
|
public AutoCloseableLock acquireDatasetLock() {
|
||||||
return datasetLock.acquire();
|
return datasetWriteLock.acquire();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public AutoCloseableLock acquireDatasetReadLock() {
|
||||||
|
return datasetReadLock.acquire();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void removeDeletedBlocks(String bpid, Set<Long> blockIds) {
|
public void removeDeletedBlocks(String bpid, Set<Long> blockIds) {
|
||||||
|
@ -3495,7 +3511,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
}
|
}
|
||||||
|
|
||||||
void stopAllDataxceiverThreads(FsVolumeImpl volume) {
|
void stopAllDataxceiverThreads(FsVolumeImpl volume) {
|
||||||
try(AutoCloseableLock lock = datasetLock.acquire()) {
|
try(AutoCloseableLock lock = datasetWriteLock.acquire()) {
|
||||||
for (String blockPoolId : volumeMap.getBlockPoolList()) {
|
for (String blockPoolId : volumeMap.getBlockPoolList()) {
|
||||||
Collection<ReplicaInfo> replicas = volumeMap.replicas(blockPoolId);
|
Collection<ReplicaInfo> replicas = volumeMap.replicas(blockPoolId);
|
||||||
for (ReplicaInfo replicaInfo : replicas) {
|
for (ReplicaInfo replicaInfo : replicas) {
|
||||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.locks.ReadWriteLock;
|
||||||
|
|
||||||
import org.apache.hadoop.HadoopIllegalArgumentException;
|
import org.apache.hadoop.HadoopIllegalArgumentException;
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
|
@ -31,23 +32,27 @@ import org.apache.hadoop.util.AutoCloseableLock;
|
||||||
* Maintains the replica map.
|
* Maintains the replica map.
|
||||||
*/
|
*/
|
||||||
class ReplicaMap {
|
class ReplicaMap {
|
||||||
|
private final ReadWriteLock rwLock;
|
||||||
// Lock object to synchronize this instance.
|
// Lock object to synchronize this instance.
|
||||||
private final AutoCloseableLock lock;
|
private final AutoCloseableLock readLock;
|
||||||
|
private final AutoCloseableLock writeLock;
|
||||||
|
|
||||||
// Map of block pool Id to another map of block Id to ReplicaInfo.
|
// Map of block pool Id to another map of block Id to ReplicaInfo.
|
||||||
private final Map<String, LightWeightResizableGSet<Block, ReplicaInfo>> map =
|
private final Map<String, LightWeightResizableGSet<Block, ReplicaInfo>> map =
|
||||||
new HashMap<String, LightWeightResizableGSet<Block, ReplicaInfo>>();
|
new HashMap<String, LightWeightResizableGSet<Block, ReplicaInfo>>();
|
||||||
|
|
||||||
ReplicaMap(AutoCloseableLock lock) {
|
ReplicaMap(ReadWriteLock lock) {
|
||||||
if (lock == null) {
|
if (lock == null) {
|
||||||
throw new HadoopIllegalArgumentException(
|
throw new HadoopIllegalArgumentException(
|
||||||
"Lock to synchronize on cannot be null");
|
"Lock to synchronize on cannot be null");
|
||||||
}
|
}
|
||||||
this.lock = lock;
|
this.rwLock = lock;
|
||||||
|
this.readLock = new AutoCloseableLock(rwLock.readLock());
|
||||||
|
this.writeLock = new AutoCloseableLock(rwLock.writeLock());
|
||||||
}
|
}
|
||||||
|
|
||||||
String[] getBlockPoolList() {
|
String[] getBlockPoolList() {
|
||||||
try (AutoCloseableLock l = lock.acquire()) {
|
try (AutoCloseableLock l = writeLock.acquire()) {
|
||||||
return map.keySet().toArray(new String[map.keySet().size()]);
|
return map.keySet().toArray(new String[map.keySet().size()]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -92,7 +97,7 @@ class ReplicaMap {
|
||||||
*/
|
*/
|
||||||
ReplicaInfo get(String bpid, long blockId) {
|
ReplicaInfo get(String bpid, long blockId) {
|
||||||
checkBlockPool(bpid);
|
checkBlockPool(bpid);
|
||||||
try (AutoCloseableLock l = lock.acquire()) {
|
try (AutoCloseableLock l = writeLock.acquire()) {
|
||||||
LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
|
LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
|
||||||
return m != null ? m.get(new Block(blockId)) : null;
|
return m != null ? m.get(new Block(blockId)) : null;
|
||||||
}
|
}
|
||||||
|
@ -109,7 +114,7 @@ class ReplicaMap {
|
||||||
ReplicaInfo add(String bpid, ReplicaInfo replicaInfo) {
|
ReplicaInfo add(String bpid, ReplicaInfo replicaInfo) {
|
||||||
checkBlockPool(bpid);
|
checkBlockPool(bpid);
|
||||||
checkBlock(replicaInfo);
|
checkBlock(replicaInfo);
|
||||||
try (AutoCloseableLock l = lock.acquire()) {
|
try (AutoCloseableLock l = writeLock.acquire()) {
|
||||||
LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
|
LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
|
||||||
if (m == null) {
|
if (m == null) {
|
||||||
// Add an entry for block pool if it does not exist already
|
// Add an entry for block pool if it does not exist already
|
||||||
|
@ -127,7 +132,7 @@ class ReplicaMap {
|
||||||
ReplicaInfo addAndGet(String bpid, ReplicaInfo replicaInfo) {
|
ReplicaInfo addAndGet(String bpid, ReplicaInfo replicaInfo) {
|
||||||
checkBlockPool(bpid);
|
checkBlockPool(bpid);
|
||||||
checkBlock(replicaInfo);
|
checkBlock(replicaInfo);
|
||||||
try (AutoCloseableLock l = lock.acquire()) {
|
try (AutoCloseableLock l = writeLock.acquire()) {
|
||||||
LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
|
LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
|
||||||
if (m == null) {
|
if (m == null) {
|
||||||
// Add an entry for block pool if it does not exist already
|
// Add an entry for block pool if it does not exist already
|
||||||
|
@ -174,7 +179,7 @@ class ReplicaMap {
|
||||||
ReplicaInfo remove(String bpid, Block block) {
|
ReplicaInfo remove(String bpid, Block block) {
|
||||||
checkBlockPool(bpid);
|
checkBlockPool(bpid);
|
||||||
checkBlock(block);
|
checkBlock(block);
|
||||||
try (AutoCloseableLock l = lock.acquire()) {
|
try (AutoCloseableLock l = writeLock.acquire()) {
|
||||||
LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
|
LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
|
||||||
if (m != null) {
|
if (m != null) {
|
||||||
ReplicaInfo replicaInfo = m.get(block);
|
ReplicaInfo replicaInfo = m.get(block);
|
||||||
|
@ -196,7 +201,7 @@ class ReplicaMap {
|
||||||
*/
|
*/
|
||||||
ReplicaInfo remove(String bpid, long blockId) {
|
ReplicaInfo remove(String bpid, long blockId) {
|
||||||
checkBlockPool(bpid);
|
checkBlockPool(bpid);
|
||||||
try (AutoCloseableLock l = lock.acquire()) {
|
try (AutoCloseableLock l = writeLock.acquire()) {
|
||||||
LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
|
LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
|
||||||
if (m != null) {
|
if (m != null) {
|
||||||
return m.remove(new Block(blockId));
|
return m.remove(new Block(blockId));
|
||||||
|
@ -212,7 +217,7 @@ class ReplicaMap {
|
||||||
*/
|
*/
|
||||||
int size(String bpid) {
|
int size(String bpid) {
|
||||||
LightWeightResizableGSet<Block, ReplicaInfo> m = null;
|
LightWeightResizableGSet<Block, ReplicaInfo> m = null;
|
||||||
try (AutoCloseableLock l = lock.acquire()) {
|
try (AutoCloseableLock l = writeLock.acquire()) {
|
||||||
m = map.get(bpid);
|
m = map.get(bpid);
|
||||||
return m != null ? m.size() : 0;
|
return m != null ? m.size() : 0;
|
||||||
}
|
}
|
||||||
|
@ -236,7 +241,7 @@ class ReplicaMap {
|
||||||
|
|
||||||
void initBlockPool(String bpid) {
|
void initBlockPool(String bpid) {
|
||||||
checkBlockPool(bpid);
|
checkBlockPool(bpid);
|
||||||
try (AutoCloseableLock l = lock.acquire()) {
|
try (AutoCloseableLock l = writeLock.acquire()) {
|
||||||
LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
|
LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
|
||||||
if (m == null) {
|
if (m == null) {
|
||||||
// Add an entry for block pool if it does not exist already
|
// Add an entry for block pool if it does not exist already
|
||||||
|
@ -248,7 +253,7 @@ class ReplicaMap {
|
||||||
|
|
||||||
void cleanUpBlockPool(String bpid) {
|
void cleanUpBlockPool(String bpid) {
|
||||||
checkBlockPool(bpid);
|
checkBlockPool(bpid);
|
||||||
try (AutoCloseableLock l = lock.acquire()) {
|
try (AutoCloseableLock l = writeLock.acquire()) {
|
||||||
map.remove(bpid);
|
map.remove(bpid);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -258,6 +263,6 @@ class ReplicaMap {
|
||||||
* @return lock object
|
* @return lock object
|
||||||
*/
|
*/
|
||||||
AutoCloseableLock getLock() {
|
AutoCloseableLock getLock() {
|
||||||
return lock;
|
return writeLock;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -3036,6 +3036,27 @@
|
||||||
</description>
|
</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>dfs.datanode.lock.fair</name>
|
||||||
|
<value>true</value>
|
||||||
|
<description>If this is true, the Datanode FsDataset lock will be used in Fair
|
||||||
|
mode, which will help to prevent writer threads from being starved, but can
|
||||||
|
lower lock throughput. See java.util.concurrent.locks.ReentrantReadWriteLock
|
||||||
|
for more information on fair/non-fair locks.
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>dfs.datanode.lock-reporting-threshold-ms</name>
|
||||||
|
<value>300</value>
|
||||||
|
<description>When thread waits to obtain a lock, or a thread holds a lock for
|
||||||
|
more than the threshold, a log message will be written. Note that
|
||||||
|
dfs.lock.suppress.warning.interval ensures a single log message is
|
||||||
|
emitted per interval for waiting threads and a single message for holding
|
||||||
|
threads to avoid excessive logging.
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<name>dfs.namenode.startup.delay.block.deletion.sec</name>
|
<name>dfs.namenode.startup.delay.block.deletion.sec</name>
|
||||||
<value>0</value>
|
<value>0</value>
|
||||||
|
|
|
@ -1528,6 +1528,12 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
|
||||||
return datasetLock.acquire();
|
return datasetLock.acquire();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public AutoCloseableLock acquireDatasetReadLock() {
|
||||||
|
// No RW lock implementation in simulated dataset currently.
|
||||||
|
return datasetLock.acquire();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Set<? extends Replica> deepCopyReplica(String bpid)
|
public Set<? extends Replica> deepCopyReplica(String bpid)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
|
|
@ -450,6 +450,11 @@ public class ExternalDatasetImpl implements FsDatasetSpi<ExternalVolumeImpl> {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public AutoCloseableLock acquireDatasetReadLock() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Set<? extends Replica> deepCopyReplica(String bpid)
|
public Set<? extends Replica> deepCopyReplica(String bpid)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
|
|
@ -442,7 +442,7 @@ public class FsDatasetImplTestUtils implements FsDatasetTestUtils {
|
||||||
@Override
|
@Override
|
||||||
public Iterator<Replica> getStoredReplicas(String bpid) throws IOException {
|
public Iterator<Replica> getStoredReplicas(String bpid) throws IOException {
|
||||||
// Reload replicas from the disk.
|
// Reload replicas from the disk.
|
||||||
ReplicaMap replicaMap = new ReplicaMap(dataset.datasetLock);
|
ReplicaMap replicaMap = new ReplicaMap(dataset.datasetRWLock);
|
||||||
try (FsVolumeReferences refs = dataset.getFsVolumeReferences()) {
|
try (FsVolumeReferences refs = dataset.getFsVolumeReferences()) {
|
||||||
for (FsVolumeSpi vol : refs) {
|
for (FsVolumeSpi vol : refs) {
|
||||||
FsVolumeImpl volume = (FsVolumeImpl) vol;
|
FsVolumeImpl volume = (FsVolumeImpl) vol;
|
||||||
|
|
|
@ -33,7 +33,6 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy;
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.apache.hadoop.util.AutoCloseableLock;
|
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
@ -48,6 +47,7 @@ import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
|
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DU_RESERVED_PERCENTAGE_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DU_RESERVED_PERCENTAGE_KEY;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
@ -291,7 +291,7 @@ public class TestFsVolumeList {
|
||||||
fs.close();
|
fs.close();
|
||||||
FsDatasetImpl fsDataset = (FsDatasetImpl) cluster.getDataNodes().get(0)
|
FsDatasetImpl fsDataset = (FsDatasetImpl) cluster.getDataNodes().get(0)
|
||||||
.getFSDataset();
|
.getFSDataset();
|
||||||
ReplicaMap volumeMap = new ReplicaMap(new AutoCloseableLock());
|
ReplicaMap volumeMap = new ReplicaMap(new ReentrantReadWriteLock());
|
||||||
RamDiskReplicaTracker ramDiskReplicaMap = RamDiskReplicaTracker
|
RamDiskReplicaTracker ramDiskReplicaMap = RamDiskReplicaTracker
|
||||||
.getInstance(conf, fsDataset);
|
.getInstance(conf, fsDataset);
|
||||||
FsVolumeImpl vol = (FsVolumeImpl) fsDataset.getFsVolumeReferences().get(0);
|
FsVolumeImpl vol = (FsVolumeImpl) fsDataset.getFsVolumeReferences().get(0);
|
||||||
|
|
|
@ -25,6 +25,7 @@ import java.io.IOException;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.net.SocketTimeoutException;
|
import java.net.SocketTimeoutException;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
@ -59,7 +60,6 @@ import org.apache.hadoop.io.Writable;
|
||||||
import org.apache.hadoop.ipc.RPC;
|
import org.apache.hadoop.ipc.RPC;
|
||||||
import org.apache.hadoop.ipc.Server;
|
import org.apache.hadoop.ipc.Server;
|
||||||
import org.apache.hadoop.net.NetUtils;
|
import org.apache.hadoop.net.NetUtils;
|
||||||
import org.apache.hadoop.util.AutoCloseableLock;
|
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
@ -236,7 +236,7 @@ public class TestInterDatanodeProtocol {
|
||||||
final long firstblockid = 10000L;
|
final long firstblockid = 10000L;
|
||||||
final long gs = 7777L;
|
final long gs = 7777L;
|
||||||
final long length = 22L;
|
final long length = 22L;
|
||||||
final ReplicaMap map = new ReplicaMap(new AutoCloseableLock());
|
final ReplicaMap map = new ReplicaMap(new ReentrantReadWriteLock());
|
||||||
String bpid = "BP-TEST";
|
String bpid = "BP-TEST";
|
||||||
final Block[] blocks = new Block[5];
|
final Block[] blocks = new Block[5];
|
||||||
for(int i = 0; i < blocks.length; i++) {
|
for(int i = 0; i < blocks.length; i++) {
|
||||||
|
|
|
@ -17,13 +17,14 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
|
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
|
||||||
|
|
||||||
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
|
|
||||||
import static org.junit.Assert.assertNotNull;
|
import static org.junit.Assert.assertNotNull;
|
||||||
import static org.junit.Assert.assertNull;
|
import static org.junit.Assert.assertNull;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
|
import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
|
||||||
import org.apache.hadoop.util.AutoCloseableLock;
|
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
@ -31,7 +32,7 @@ import org.junit.Test;
|
||||||
* Unit test for ReplicasMap class
|
* Unit test for ReplicasMap class
|
||||||
*/
|
*/
|
||||||
public class TestReplicaMap {
|
public class TestReplicaMap {
|
||||||
private final ReplicaMap map = new ReplicaMap(new AutoCloseableLock());
|
private final ReplicaMap map = new ReplicaMap(new ReentrantReadWriteLock());
|
||||||
private final String bpid = "BP-TEST";
|
private final String bpid = "BP-TEST";
|
||||||
private final Block block = new Block(1234, 1234, 1234);
|
private final Block block = new Block(1234, 1234, 1234);
|
||||||
|
|
||||||
|
@ -111,7 +112,7 @@ public class TestReplicaMap {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testMergeAll() {
|
public void testMergeAll() {
|
||||||
ReplicaMap temReplicaMap = new ReplicaMap(new AutoCloseableLock());
|
ReplicaMap temReplicaMap = new ReplicaMap(new ReentrantReadWriteLock());
|
||||||
Block tmpBlock = new Block(5678, 5678, 5678);
|
Block tmpBlock = new Block(5678, 5678, 5678);
|
||||||
temReplicaMap.add(bpid, new FinalizedReplica(tmpBlock, null, null));
|
temReplicaMap.add(bpid, new FinalizedReplica(tmpBlock, null, null));
|
||||||
|
|
||||||
|
@ -122,7 +123,7 @@ public class TestReplicaMap {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testAddAll() {
|
public void testAddAll() {
|
||||||
ReplicaMap temReplicaMap = new ReplicaMap(new AutoCloseableLock());
|
ReplicaMap temReplicaMap = new ReplicaMap(new ReentrantReadWriteLock());
|
||||||
Block tmpBlock = new Block(5678, 5678, 5678);
|
Block tmpBlock = new Block(5678, 5678, 5678);
|
||||||
temReplicaMap.add(bpid, new FinalizedReplica(tmpBlock, null, null));
|
temReplicaMap.add(bpid, new FinalizedReplica(tmpBlock, null, null));
|
||||||
|
|
||||||
|
|
|
@ -26,6 +26,7 @@ import java.io.RandomAccessFile;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.StorageType;
|
import org.apache.hadoop.fs.StorageType;
|
||||||
|
@ -46,7 +47,6 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.apache.hadoop.util.AutoCloseableLock;
|
|
||||||
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
|
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
@ -545,7 +545,7 @@ public class TestWriteToReplica {
|
||||||
bpList.size() == 2);
|
bpList.size() == 2);
|
||||||
|
|
||||||
createReplicas(bpList, volumes, cluster.getFsDatasetTestUtils(dn));
|
createReplicas(bpList, volumes, cluster.getFsDatasetTestUtils(dn));
|
||||||
ReplicaMap oldReplicaMap = new ReplicaMap(new AutoCloseableLock());
|
ReplicaMap oldReplicaMap = new ReplicaMap(new ReentrantReadWriteLock());
|
||||||
oldReplicaMap.addAll(dataSet.volumeMap);
|
oldReplicaMap.addAll(dataSet.volumeMap);
|
||||||
|
|
||||||
cluster.restartDataNode(0);
|
cluster.restartDataNode(0);
|
||||||
|
|
Loading…
Reference in New Issue