HDFS-10682. Replace FsDatasetImpl object lock with a separate lock object. (Contributed by Chen Liang)
This commit is contained in:
parent
b89d79ca1d
commit
1fe08c919a
|
@ -46,6 +46,7 @@ import org.apache.hadoop.io.LongWritable;
|
|||
import org.apache.hadoop.io.ReadaheadPool.ReadaheadRequest;
|
||||
import org.apache.hadoop.io.nativeio.NativeIO;
|
||||
import org.apache.hadoop.net.SocketOutputStream;
|
||||
import org.apache.hadoop.util.AutoCloseableLock;
|
||||
import org.apache.hadoop.util.DataChecksum;
|
||||
import org.apache.htrace.core.Sampler;
|
||||
import org.apache.htrace.core.TraceScope;
|
||||
|
@ -240,7 +241,7 @@ class BlockSender implements java.io.Closeable {
|
|||
|
||||
final Replica replica;
|
||||
final long replicaVisibleLength;
|
||||
synchronized(datanode.data) {
|
||||
try(AutoCloseableLock lock = datanode.data.acquireDatasetLock()) {
|
||||
replica = getReplica(block, datanode);
|
||||
replicaVisibleLength = replica.getVisibleLength();
|
||||
}
|
||||
|
|
|
@ -196,6 +196,7 @@ import org.apache.hadoop.tracing.TraceAdminProtocolPB;
|
|||
import org.apache.hadoop.tracing.TraceAdminProtocolServerSideTranslatorPB;
|
||||
import org.apache.hadoop.tracing.TraceUtils;
|
||||
import org.apache.hadoop.tracing.TracerConfigurationManager;
|
||||
import org.apache.hadoop.util.AutoCloseableLock;
|
||||
import org.apache.hadoop.util.Daemon;
|
||||
import org.apache.hadoop.util.DiskChecker;
|
||||
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
|
||||
|
@ -2817,7 +2818,7 @@ public class DataNode extends ReconfigurableBase
|
|||
final BlockConstructionStage stage;
|
||||
|
||||
//get replica information
|
||||
synchronized(data) {
|
||||
try(AutoCloseableLock lock = data.acquireDatasetLock()) {
|
||||
Block storedBlock = data.getStoredBlock(b.getBlockPoolId(),
|
||||
b.getBlockId());
|
||||
if (null == storedBlock) {
|
||||
|
|
|
@ -43,6 +43,7 @@ import org.apache.commons.logging.LogFactory;
|
|||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileUtil;
|
||||
import org.apache.hadoop.util.AutoCloseableLock;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
|
@ -581,7 +582,7 @@ public class DirectoryScanner implements Runnable {
|
|||
Map<String, ScanInfo[]> diskReport = getDiskReport();
|
||||
|
||||
// Hold FSDataset lock to prevent further changes to the block map
|
||||
synchronized(dataset) {
|
||||
try(AutoCloseableLock lock = dataset.acquireDatasetLock()) {
|
||||
for (Entry<String, ScanInfo[]> entry : diskReport.entrySet()) {
|
||||
String bpid = entry.getKey();
|
||||
ScanInfo[] blockpoolReport = entry.getValue();
|
||||
|
|
|
@ -60,6 +60,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
|||
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
|
||||
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
||||
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
|
||||
import org.apache.hadoop.util.AutoCloseableLock;
|
||||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
|
||||
/**
|
||||
|
@ -641,4 +642,9 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
|
|||
* Confirm whether the block is deleting
|
||||
*/
|
||||
boolean isDeletingBlock(String bpid, long blockId);
|
||||
|
||||
/**
|
||||
* Acquire the lock of the dataset.
|
||||
*/
|
||||
AutoCloseableLock acquireDatasetLock();
|
||||
}
|
||||
|
|
|
@ -63,6 +63,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|||
import org.apache.hadoop.hdfs.DFSUtilClient;
|
||||
import org.apache.hadoop.hdfs.ExtendedBlockId;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.util.AutoCloseableLock;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
|
||||
|
@ -178,14 +179,18 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
}
|
||||
|
||||
@Override
|
||||
public synchronized FsVolumeImpl getVolume(final ExtendedBlock b) {
|
||||
final ReplicaInfo r = volumeMap.get(b.getBlockPoolId(), b.getLocalBlock());
|
||||
public FsVolumeImpl getVolume(final ExtendedBlock b) {
|
||||
try(AutoCloseableLock lock = datasetLock.acquire()) {
|
||||
final ReplicaInfo r =
|
||||
volumeMap.get(b.getBlockPoolId(), b.getLocalBlock());
|
||||
return r != null ? (FsVolumeImpl) r.getVolume() : null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override // FsDatasetSpi
|
||||
public synchronized Block getStoredBlock(String bpid, long blkid)
|
||||
public Block getStoredBlock(String bpid, long blkid)
|
||||
throws IOException {
|
||||
try(AutoCloseableLock lock = datasetLock.acquire()) {
|
||||
File blockfile = getFile(bpid, blkid, false);
|
||||
if (blockfile == null) {
|
||||
return null;
|
||||
|
@ -194,6 +199,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
final long gs = FsDatasetUtil.parseGenerationStamp(blockfile, metafile);
|
||||
return new Block(blkid, blockfile.length(), gs);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
|
@ -262,6 +268,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
private boolean blockPinningEnabled;
|
||||
private final int maxDataLength;
|
||||
|
||||
private final AutoCloseableLock datasetLock;
|
||||
/**
|
||||
* An FSDataset has a directory where it loads its data files.
|
||||
*/
|
||||
|
@ -272,6 +279,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
this.dataStorage = storage;
|
||||
this.conf = conf;
|
||||
this.smallBufferSize = DFSUtilClient.getSmallBufferSize(conf);
|
||||
this.datasetLock = new AutoCloseableLock();
|
||||
// The number of volumes required for operation is the total number
|
||||
// of volumes minus the number of failed volumes we can tolerate.
|
||||
volFailuresTolerated = datanode.getDnConf().getVolFailuresTolerated();
|
||||
|
@ -378,10 +386,11 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
* Activate a volume to serve requests.
|
||||
* @throws IOException if the storage UUID already exists.
|
||||
*/
|
||||
private synchronized void activateVolume(
|
||||
private void activateVolume(
|
||||
ReplicaMap replicaMap,
|
||||
Storage.StorageDirectory sd, StorageType storageType,
|
||||
FsVolumeReference ref) throws IOException {
|
||||
try(AutoCloseableLock lock = datasetLock.acquire()) {
|
||||
DatanodeStorage dnStorage = storageMap.get(sd.getStorageUuid());
|
||||
if (dnStorage != null) {
|
||||
final String errorMsg = String.format(
|
||||
|
@ -398,6 +407,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
asyncDiskService.addVolume(sd.getCurrentDir());
|
||||
volumes.addVolume(ref);
|
||||
}
|
||||
}
|
||||
|
||||
private void addVolume(Collection<StorageLocation> dataLocations,
|
||||
Storage.StorageDirectory sd) throws IOException {
|
||||
|
@ -491,7 +501,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
|
||||
Map<String, List<ReplicaInfo>> blkToInvalidate = new HashMap<>();
|
||||
List<String> storageToRemove = new ArrayList<>();
|
||||
synchronized (this) {
|
||||
try(AutoCloseableLock lock = datasetLock.acquire()) {
|
||||
for (int idx = 0; idx < dataStorage.getNumStorageDirs(); idx++) {
|
||||
Storage.StorageDirectory sd = dataStorage.getStorageDir(idx);
|
||||
final File absRoot = sd.getRoot().getAbsoluteFile();
|
||||
|
@ -537,7 +547,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
}
|
||||
}
|
||||
|
||||
synchronized (this) {
|
||||
try(AutoCloseableLock lock = datasetLock.acquire()) {
|
||||
for(String storageUuid : storageToRemove) {
|
||||
storageMap.remove(storageUuid);
|
||||
}
|
||||
|
@ -746,7 +756,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
boolean touch)
|
||||
throws IOException {
|
||||
final File f;
|
||||
synchronized(this) {
|
||||
try(AutoCloseableLock lock = datasetLock.acquire()) {
|
||||
f = getFile(b.getBlockPoolId(), b.getLocalBlock().getBlockId(), touch);
|
||||
}
|
||||
if (f == null) {
|
||||
|
@ -812,14 +822,16 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
* Returns handles to the block file and its metadata file
|
||||
*/
|
||||
@Override // FsDatasetSpi
|
||||
public synchronized ReplicaInputStreams getTmpInputStreams(ExtendedBlock b,
|
||||
public ReplicaInputStreams getTmpInputStreams(ExtendedBlock b,
|
||||
long blkOffset, long metaOffset) throws IOException {
|
||||
try(AutoCloseableLock lock = datasetLock.acquire()) {
|
||||
ReplicaInfo info = getReplicaInfo(b);
|
||||
FsVolumeReference ref = info.getVolume().obtainReference();
|
||||
try {
|
||||
InputStream blockInStream = openAndSeek(info.getBlockFile(), blkOffset);
|
||||
try {
|
||||
InputStream metaInStream = openAndSeek(info.getMetaFile(), metaOffset);
|
||||
InputStream metaInStream =
|
||||
openAndSeek(info.getMetaFile(), metaOffset);
|
||||
return new ReplicaInputStreams(blockInStream, metaInStream, ref);
|
||||
} catch (IOException e) {
|
||||
IOUtils.cleanup(null, blockInStream);
|
||||
|
@ -830,6 +842,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static FileInputStream openAndSeek(File file, long offset)
|
||||
throws IOException {
|
||||
|
@ -946,7 +959,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
}
|
||||
|
||||
FsVolumeReference volumeRef = null;
|
||||
synchronized (this) {
|
||||
try(AutoCloseableLock lock = datasetLock.acquire()) {
|
||||
volumeRef = volumes.getNextVolume(targetStorageType, block.getNumBytes());
|
||||
}
|
||||
try {
|
||||
|
@ -965,7 +978,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
newReplicaInfo.setNumBytes(blockFiles[1].length());
|
||||
// Finalize the copied files
|
||||
newReplicaInfo = finalizeReplica(block.getBlockPoolId(), newReplicaInfo);
|
||||
synchronized (this) {
|
||||
try(AutoCloseableLock lock = datasetLock.acquire()) {
|
||||
// Increment numBlocks here as this block moved without knowing to BPS
|
||||
FsVolumeImpl volume = (FsVolumeImpl) newReplicaInfo.getVolume();
|
||||
volume.getBlockPoolSlice(block.getBlockPoolId()).incrNumBlocks();
|
||||
|
@ -1097,8 +1110,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
|
||||
|
||||
@Override // FsDatasetSpi
|
||||
public synchronized ReplicaHandler append(ExtendedBlock b,
|
||||
public ReplicaHandler append(ExtendedBlock b,
|
||||
long newGS, long expectedBlockLen) throws IOException {
|
||||
try(AutoCloseableLock lock = datasetLock.acquire()) {
|
||||
// If the block was successfully finalized because all packets
|
||||
// were successfully processed at the Datanode but the ack for
|
||||
// some of the packets were not received by the client. The client
|
||||
|
@ -1125,14 +1139,15 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
FsVolumeReference ref = replicaInfo.getVolume().obtainReference();
|
||||
ReplicaBeingWritten replica = null;
|
||||
try {
|
||||
replica = append(b.getBlockPoolId(), (FinalizedReplica)replicaInfo, newGS,
|
||||
b.getNumBytes());
|
||||
replica = append(b.getBlockPoolId(), (FinalizedReplica) replicaInfo,
|
||||
newGS, b.getNumBytes());
|
||||
} catch (IOException e) {
|
||||
IOUtils.cleanup(null, ref);
|
||||
throw e;
|
||||
}
|
||||
return new ReplicaHandler(replica, ref);
|
||||
}
|
||||
}
|
||||
|
||||
/** Append to a finalized replica
|
||||
* Change a finalized replica to be a RBW replica and
|
||||
|
@ -1146,14 +1161,15 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
* @throws IOException if moving the replica from finalized directory
|
||||
* to rbw directory fails
|
||||
*/
|
||||
private synchronized ReplicaBeingWritten append(String bpid,
|
||||
private ReplicaBeingWritten append(String bpid,
|
||||
FinalizedReplica replicaInfo, long newGS, long estimateBlockLen)
|
||||
throws IOException {
|
||||
try(AutoCloseableLock lock = datasetLock.acquire()) {
|
||||
// If the block is cached, start uncaching it.
|
||||
cacheManager.uncacheBlock(bpid, replicaInfo.getBlockId());
|
||||
|
||||
// If there are any hardlinks to the block, break them. This ensures we are
|
||||
// not appending to a file that is part of a previous/ directory.
|
||||
// If there are any hardlinks to the block, break them. This ensures we
|
||||
// are not appending to a file that is part of a previous/ directory.
|
||||
replicaInfo.breakHardLinksIfNeeded();
|
||||
|
||||
// construct a RBW replica with the new GS
|
||||
|
@ -1207,6 +1223,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
v.reserveSpaceForReplica(bytesReserved);
|
||||
return newReplicaInfo;
|
||||
}
|
||||
}
|
||||
|
||||
private static class MustStopExistingWriter extends Exception {
|
||||
private final ReplicaInPipeline rip;
|
||||
|
@ -1275,7 +1292,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
|
||||
while (true) {
|
||||
try {
|
||||
synchronized (this) {
|
||||
try(AutoCloseableLock lock = datasetLock.acquire()) {
|
||||
ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen);
|
||||
|
||||
FsVolumeReference ref = replicaInfo.getVolume().obtainReference();
|
||||
|
@ -1307,7 +1324,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
LOG.info("Recover failed close " + b);
|
||||
while (true) {
|
||||
try {
|
||||
synchronized (this) {
|
||||
try(AutoCloseableLock lock = datasetLock.acquire()) {
|
||||
// check replica's state
|
||||
ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen);
|
||||
// bump the replica's GS
|
||||
|
@ -1354,9 +1371,10 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
}
|
||||
|
||||
@Override // FsDatasetSpi
|
||||
public synchronized ReplicaHandler createRbw(
|
||||
public ReplicaHandler createRbw(
|
||||
StorageType storageType, ExtendedBlock b, boolean allowLazyPersist)
|
||||
throws IOException {
|
||||
try(AutoCloseableLock lock = datasetLock.acquire()) {
|
||||
ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
|
||||
b.getBlockId());
|
||||
if (replicaInfo != null) {
|
||||
|
@ -1406,11 +1424,13 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
throw e;
|
||||
}
|
||||
|
||||
ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(b.getBlockId(),
|
||||
ReplicaBeingWritten newReplicaInfo =
|
||||
new ReplicaBeingWritten(b.getBlockId(),
|
||||
b.getGenerationStamp(), v, f.getParentFile(), b.getNumBytes());
|
||||
volumeMap.add(b.getBlockPoolId(), newReplicaInfo);
|
||||
return new ReplicaHandler(newReplicaInfo, ref);
|
||||
}
|
||||
}
|
||||
|
||||
@Override // FsDatasetSpi
|
||||
public ReplicaHandler recoverRbw(
|
||||
|
@ -1420,7 +1440,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
|
||||
while (true) {
|
||||
try {
|
||||
synchronized (this) {
|
||||
try(AutoCloseableLock lock = datasetLock.acquire()) {
|
||||
ReplicaInfo replicaInfo = getReplicaInfo(b.getBlockPoolId(), b.getBlockId());
|
||||
|
||||
// check the replica's state
|
||||
|
@ -1441,9 +1461,10 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
}
|
||||
}
|
||||
|
||||
private synchronized ReplicaHandler recoverRbwImpl(ReplicaBeingWritten rbw,
|
||||
private ReplicaHandler recoverRbwImpl(ReplicaBeingWritten rbw,
|
||||
ExtendedBlock b, long newGS, long minBytesRcvd, long maxBytesRcvd)
|
||||
throws IOException {
|
||||
try(AutoCloseableLock lock = datasetLock.acquire()) {
|
||||
// check generation stamp
|
||||
long replicaGenerationStamp = rbw.getGenerationStamp();
|
||||
if (replicaGenerationStamp < b.getGenerationStamp() ||
|
||||
|
@ -1484,10 +1505,12 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
}
|
||||
return new ReplicaHandler(rbw, ref);
|
||||
}
|
||||
}
|
||||
|
||||
@Override // FsDatasetSpi
|
||||
public synchronized ReplicaInPipeline convertTemporaryToRbw(
|
||||
public ReplicaInPipeline convertTemporaryToRbw(
|
||||
final ExtendedBlock b) throws IOException {
|
||||
try(AutoCloseableLock lock = datasetLock.acquire()) {
|
||||
final long blockId = b.getBlockId();
|
||||
final long expectedGs = b.getGenerationStamp();
|
||||
final long visible = b.getNumBytes();
|
||||
|
@ -1495,7 +1518,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
+ visible);
|
||||
|
||||
final ReplicaInPipeline temp;
|
||||
{
|
||||
|
||||
// get replica
|
||||
final ReplicaInfo r = volumeMap.get(b.getBlockPoolId(), blockId);
|
||||
if (r == null) {
|
||||
|
@ -1508,7 +1531,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
"r.getState() != ReplicaState.TEMPORARY, r=" + r);
|
||||
}
|
||||
temp = (ReplicaInPipeline) r;
|
||||
}
|
||||
|
||||
// check generation stamp
|
||||
if (temp.getGenerationStamp() != expectedGs) {
|
||||
throw new ReplicaAlreadyExistsException(
|
||||
|
@ -1545,6 +1568,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
volumeMap.add(b.getBlockPoolId(), rbw);
|
||||
return rbw;
|
||||
}
|
||||
}
|
||||
|
||||
@Override // FsDatasetSpi
|
||||
public ReplicaHandler createTemporary(
|
||||
|
@ -1553,7 +1577,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
long writerStopTimeoutMs = datanode.getDnConf().getXceiverStopTimeout();
|
||||
ReplicaInfo lastFoundReplicaInfo = null;
|
||||
do {
|
||||
synchronized (this) {
|
||||
try(AutoCloseableLock lock = datasetLock.acquire()) {
|
||||
ReplicaInfo currentReplicaInfo =
|
||||
volumeMap.get(b.getBlockPoolId(), b.getBlockId());
|
||||
if (currentReplicaInfo == lastFoundReplicaInfo) {
|
||||
|
@ -1632,7 +1656,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
* Complete the block write!
|
||||
*/
|
||||
@Override // FsDatasetSpi
|
||||
public synchronized void finalizeBlock(ExtendedBlock b) throws IOException {
|
||||
public void finalizeBlock(ExtendedBlock b) throws IOException {
|
||||
try(AutoCloseableLock lock = datasetLock.acquire()) {
|
||||
if (Thread.interrupted()) {
|
||||
// Don't allow data modifications from interrupted threads
|
||||
throw new IOException("Cannot finalize block from Interrupted Thread");
|
||||
|
@ -1645,13 +1670,15 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
}
|
||||
finalizeReplica(b.getBlockPoolId(), replicaInfo);
|
||||
}
|
||||
}
|
||||
|
||||
private synchronized FinalizedReplica finalizeReplica(String bpid,
|
||||
private FinalizedReplica finalizeReplica(String bpid,
|
||||
ReplicaInfo replicaInfo) throws IOException {
|
||||
try(AutoCloseableLock lock = datasetLock.acquire()) {
|
||||
FinalizedReplica newReplicaInfo = null;
|
||||
if (replicaInfo.getState() == ReplicaState.RUR &&
|
||||
((ReplicaUnderRecovery)replicaInfo).getOriginalReplica().getState() ==
|
||||
ReplicaState.FINALIZED) {
|
||||
((ReplicaUnderRecovery) replicaInfo).getOriginalReplica().getState()
|
||||
== ReplicaState.FINALIZED) {
|
||||
newReplicaInfo = (FinalizedReplica)
|
||||
((ReplicaUnderRecovery) replicaInfo).getOriginalReplica();
|
||||
} else {
|
||||
|
@ -1664,11 +1691,13 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
|
||||
File dest = v.addFinalizedBlock(
|
||||
bpid, replicaInfo, f, replicaInfo.getBytesReserved());
|
||||
newReplicaInfo = new FinalizedReplica(replicaInfo, v, dest.getParentFile());
|
||||
newReplicaInfo =
|
||||
new FinalizedReplica(replicaInfo, v, dest.getParentFile());
|
||||
|
||||
if (v.isTransientStorage()) {
|
||||
releaseLockedMemory(
|
||||
replicaInfo.getOriginalBytesReserved() - replicaInfo.getNumBytes(),
|
||||
replicaInfo.getOriginalBytesReserved()
|
||||
- replicaInfo.getNumBytes(),
|
||||
false);
|
||||
ramDiskReplicaTracker.addReplica(
|
||||
bpid, replicaInfo.getBlockId(), v, replicaInfo.getNumBytes());
|
||||
|
@ -1679,15 +1708,18 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
|
||||
return newReplicaInfo;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove the temporary block file (if any)
|
||||
*/
|
||||
@Override // FsDatasetSpi
|
||||
public synchronized void unfinalizeBlock(ExtendedBlock b) throws IOException {
|
||||
public void unfinalizeBlock(ExtendedBlock b) throws IOException {
|
||||
try(AutoCloseableLock lock = datasetLock.acquire()) {
|
||||
ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
|
||||
b.getLocalBlock());
|
||||
if (replicaInfo != null && replicaInfo.getState() == ReplicaState.TEMPORARY) {
|
||||
if (replicaInfo != null
|
||||
&& replicaInfo.getState() == ReplicaState.TEMPORARY) {
|
||||
// remove from volumeMap
|
||||
volumeMap.remove(b.getBlockPoolId(), b.getLocalBlock());
|
||||
|
||||
|
@ -1697,7 +1729,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
LOG.warn("Block " + b + " unfinalized and removed. ");
|
||||
}
|
||||
if (replicaInfo.getVolume().isTransientStorage()) {
|
||||
ramDiskReplicaTracker.discardReplica(b.getBlockPoolId(), b.getBlockId(), true);
|
||||
ramDiskReplicaTracker.discardReplica(b.getBlockPoolId(),
|
||||
b.getBlockId(), true);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1740,7 +1774,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
builders.put(v.getStorageID(), BlockListAsLongs.builder(maxDataLength));
|
||||
}
|
||||
|
||||
synchronized(this) {
|
||||
try(AutoCloseableLock lock = datasetLock.acquire()) {
|
||||
for (ReplicaInfo b : volumeMap.replicas(bpid)) {
|
||||
switch(b.getState()) {
|
||||
case FINALIZED:
|
||||
|
@ -1778,7 +1812,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
* Get the list of finalized blocks from in-memory blockmap for a block pool.
|
||||
*/
|
||||
@Override
|
||||
public synchronized List<FinalizedReplica> getFinalizedBlocks(String bpid) {
|
||||
public List<FinalizedReplica> getFinalizedBlocks(String bpid) {
|
||||
try(AutoCloseableLock lock = datasetLock.acquire()) {
|
||||
ArrayList<FinalizedReplica> finalized =
|
||||
new ArrayList<FinalizedReplica>(volumeMap.size(bpid));
|
||||
for (ReplicaInfo b : volumeMap.replicas(bpid)) {
|
||||
|
@ -1788,12 +1823,15 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
}
|
||||
return finalized;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the list of finalized blocks from in-memory blockmap for a block pool.
|
||||
*/
|
||||
@Override
|
||||
public synchronized List<FinalizedReplica> getFinalizedBlocksOnPersistentStorage(String bpid) {
|
||||
public List<FinalizedReplica>
|
||||
getFinalizedBlocksOnPersistentStorage(String bpid) {
|
||||
try(AutoCloseableLock lock = datasetLock.acquire()) {
|
||||
ArrayList<FinalizedReplica> finalized =
|
||||
new ArrayList<FinalizedReplica>(volumeMap.size(bpid));
|
||||
for (ReplicaInfo b : volumeMap.replicas(bpid)) {
|
||||
|
@ -1804,6 +1842,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
}
|
||||
return finalized;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if a block is valid.
|
||||
|
@ -1878,7 +1917,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
File validateBlockFile(String bpid, long blockId) {
|
||||
//Should we check for metadata file too?
|
||||
final File f;
|
||||
synchronized(this) {
|
||||
try(AutoCloseableLock lock = datasetLock.acquire()) {
|
||||
f = getFile(bpid, blockId, false);
|
||||
}
|
||||
|
||||
|
@ -1927,7 +1966,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
for (int i = 0; i < invalidBlks.length; i++) {
|
||||
final File f;
|
||||
final FsVolumeImpl v;
|
||||
synchronized (this) {
|
||||
try(AutoCloseableLock lock = datasetLock.acquire()) {
|
||||
final ReplicaInfo info = volumeMap.get(bpid, invalidBlks[i]);
|
||||
if (info == null) {
|
||||
// It is okay if the block is not found -- it may be deleted earlier.
|
||||
|
@ -2038,7 +2077,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
long length, genstamp;
|
||||
Executor volumeExecutor;
|
||||
|
||||
synchronized (this) {
|
||||
try(AutoCloseableLock lock = datasetLock.acquire()) {
|
||||
ReplicaInfo info = volumeMap.get(bpid, blockId);
|
||||
boolean success = false;
|
||||
try {
|
||||
|
@ -2105,10 +2144,12 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
}
|
||||
|
||||
@Override // FsDatasetSpi
|
||||
public synchronized boolean contains(final ExtendedBlock block) {
|
||||
public boolean contains(final ExtendedBlock block) {
|
||||
try(AutoCloseableLock lock = datasetLock.acquire()) {
|
||||
final long blockId = block.getLocalBlock().getBlockId();
|
||||
return getFile(block.getBlockPoolId(), blockId, false) != null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Turn the block identifier into a filename
|
||||
|
@ -2233,7 +2274,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
File diskMetaFile, FsVolumeSpi vol) throws IOException {
|
||||
Block corruptBlock = null;
|
||||
ReplicaInfo memBlockInfo;
|
||||
synchronized (this) {
|
||||
try(AutoCloseableLock lock = datasetLock.acquire()) {
|
||||
memBlockInfo = volumeMap.get(bpid, blockId);
|
||||
if (memBlockInfo != null && memBlockInfo.getState() != ReplicaState.FINALIZED) {
|
||||
// Block is not finalized - ignore the difference
|
||||
|
@ -2389,10 +2430,12 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
}
|
||||
|
||||
@Override
|
||||
public synchronized String getReplicaString(String bpid, long blockId) {
|
||||
public String getReplicaString(String bpid, long blockId) {
|
||||
try(AutoCloseableLock lock = datasetLock.acquire()) {
|
||||
final Replica r = volumeMap.get(bpid, blockId);
|
||||
return r == null ? "null" : r.toString();
|
||||
}
|
||||
}
|
||||
|
||||
@Override // FsDatasetSpi
|
||||
public ReplicaRecoveryInfo initReplicaRecovery(RecoveringBlock rBlock)
|
||||
|
@ -2484,11 +2527,12 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
}
|
||||
|
||||
@Override // FsDatasetSpi
|
||||
public synchronized Replica updateReplicaUnderRecovery(
|
||||
public Replica updateReplicaUnderRecovery(
|
||||
final ExtendedBlock oldBlock,
|
||||
final long recoveryId,
|
||||
final long newBlockId,
|
||||
final long newlength) throws IOException {
|
||||
try(AutoCloseableLock lock = datasetLock.acquire()) {
|
||||
//get replica
|
||||
final String bpid = oldBlock.getBlockPoolId();
|
||||
final ReplicaInfo replica = volumeMap.get(bpid, oldBlock.getBlockId());
|
||||
|
@ -2546,6 +2590,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
|
||||
return finalized;
|
||||
}
|
||||
}
|
||||
|
||||
private FinalizedReplica updateReplicaUnderRecovery(
|
||||
String bpid,
|
||||
|
@ -2622,8 +2667,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
}
|
||||
|
||||
@Override // FsDatasetSpi
|
||||
public synchronized long getReplicaVisibleLength(final ExtendedBlock block)
|
||||
public long getReplicaVisibleLength(final ExtendedBlock block)
|
||||
throws IOException {
|
||||
try(AutoCloseableLock lock = datasetLock.acquire()) {
|
||||
final Replica replica = getReplicaInfo(block.getBlockPoolId(),
|
||||
block.getBlockId());
|
||||
if (replica.getGenerationStamp() < block.getGenerationStamp()) {
|
||||
|
@ -2633,12 +2679,13 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
}
|
||||
return replica.getVisibleLength();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addBlockPool(String bpid, Configuration conf)
|
||||
throws IOException {
|
||||
LOG.info("Adding block pool " + bpid);
|
||||
synchronized(this) {
|
||||
try(AutoCloseableLock lock = datasetLock.acquire()) {
|
||||
volumes.addBlockPool(bpid, conf);
|
||||
volumeMap.initBlockPool(bpid);
|
||||
}
|
||||
|
@ -2646,12 +2693,15 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
}
|
||||
|
||||
@Override
|
||||
public synchronized void shutdownBlockPool(String bpid) {
|
||||
public void shutdownBlockPool(String bpid) {
|
||||
try(AutoCloseableLock lock = datasetLock.acquire()) {
|
||||
LOG.info("Removing block pool " + bpid);
|
||||
Map<DatanodeStorage, BlockListAsLongs> blocksPerVolume = getBlockReports(bpid);
|
||||
Map<DatanodeStorage, BlockListAsLongs> blocksPerVolume =
|
||||
getBlockReports(bpid);
|
||||
volumeMap.cleanUpBlockPool(bpid);
|
||||
volumes.removeBlockPool(bpid, blocksPerVolume);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Class for representing the Datanode volume information
|
||||
|
@ -2713,14 +2763,16 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
}
|
||||
|
||||
@Override //FsDatasetSpi
|
||||
public synchronized void deleteBlockPool(String bpid, boolean force)
|
||||
public void deleteBlockPool(String bpid, boolean force)
|
||||
throws IOException {
|
||||
try(AutoCloseableLock lock = datasetLock.acquire()) {
|
||||
List<FsVolumeImpl> curVolumes = volumes.getVolumes();
|
||||
if (!force) {
|
||||
for (FsVolumeImpl volume : curVolumes) {
|
||||
try (FsVolumeReference ref = volume.obtainReference()) {
|
||||
if (!volume.isBPDirEmpty(bpid)) {
|
||||
LOG.warn(bpid + " has some block files, cannot delete unless forced");
|
||||
LOG.warn(bpid
|
||||
+ " has some block files, cannot delete unless forced");
|
||||
throw new IOException("Cannot delete block pool, "
|
||||
+ "it contains some block files");
|
||||
}
|
||||
|
@ -2737,11 +2789,12 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override // FsDatasetSpi
|
||||
public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block)
|
||||
throws IOException {
|
||||
synchronized(this) {
|
||||
try(AutoCloseableLock lock = datasetLock.acquire()) {
|
||||
final Replica replica = volumeMap.get(block.getBlockPoolId(),
|
||||
block.getBlockId());
|
||||
if (replica == null) {
|
||||
|
@ -2834,7 +2887,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
@Override
|
||||
public void onCompleteLazyPersist(String bpId, long blockId,
|
||||
long creationTime, File[] savedFiles, FsVolumeImpl targetVolume) {
|
||||
synchronized (FsDatasetImpl.this) {
|
||||
try(AutoCloseableLock lock = datasetLock.acquire()) {
|
||||
ramDiskReplicaTracker.recordEndLazyPersist(bpId, blockId, savedFiles);
|
||||
|
||||
targetVolume.incDfsUsedAndNumBlocks(bpId, savedFiles[0].length()
|
||||
|
@ -2968,7 +3021,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
try {
|
||||
block = ramDiskReplicaTracker.dequeueNextReplicaToPersist();
|
||||
if (block != null) {
|
||||
synchronized (FsDatasetImpl.this) {
|
||||
try(AutoCloseableLock lock = datasetLock.acquire()) {
|
||||
replicaInfo = volumeMap.get(block.getBlockPoolId(), block.getBlockId());
|
||||
|
||||
// If replicaInfo is null, the block was either deleted before
|
||||
|
@ -3038,7 +3091,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
long blockFileUsed, metaFileUsed;
|
||||
final String bpid = replicaState.getBlockPoolId();
|
||||
|
||||
synchronized (FsDatasetImpl.this) {
|
||||
try(AutoCloseableLock lock = datasetLock.acquire()) {
|
||||
replicaInfo = getReplicaInfo(replicaState.getBlockPoolId(),
|
||||
replicaState.getBlockId());
|
||||
Preconditions.checkState(replicaInfo.getVolume().isTransientStorage());
|
||||
|
@ -3148,6 +3201,11 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public AutoCloseableLock acquireDatasetLock() {
|
||||
return datasetLock.acquire();
|
||||
}
|
||||
|
||||
public void removeDeletedBlocks(String bpid, Set<Long> blockIds) {
|
||||
synchronized (deletingBlock) {
|
||||
Set<Long> s = deletingBlock.get(bpid);
|
||||
|
@ -3210,17 +3268,20 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
return cacheManager.reserve(bytesNeeded) > 0;
|
||||
}
|
||||
|
||||
synchronized void stopAllDataxceiverThreads(FsVolumeImpl volume) {
|
||||
void stopAllDataxceiverThreads(FsVolumeImpl volume) {
|
||||
try(AutoCloseableLock lock = datasetLock.acquire()) {
|
||||
for (String blockPoolId : volumeMap.getBlockPoolList()) {
|
||||
Collection<ReplicaInfo> replicas = volumeMap.replicas(blockPoolId);
|
||||
for (ReplicaInfo replicaInfo : replicas) {
|
||||
if (replicaInfo instanceof ReplicaInPipeline
|
||||
&& replicaInfo.getVolume().equals(volume)) {
|
||||
ReplicaInPipeline replicaInPipeline = (ReplicaInPipeline) replicaInfo;
|
||||
ReplicaInPipeline replicaInPipeline =
|
||||
(ReplicaInPipeline) replicaInfo;
|
||||
replicaInPipeline.interruptThread();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -56,6 +56,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
|
|||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.util.AutoCloseableLock;
|
||||
import org.apache.hadoop.util.CloseableReferenceCount;
|
||||
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
|
@ -303,7 +304,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
|
|||
|
||||
private void decDfsUsedAndNumBlocks(String bpid, long value,
|
||||
boolean blockFileDeleted) {
|
||||
synchronized(dataset) {
|
||||
try(AutoCloseableLock lock = dataset.acquireDatasetLock()) {
|
||||
BlockPoolSlice bp = bpSlices.get(bpid);
|
||||
if (bp != null) {
|
||||
bp.decDfsUsed(value);
|
||||
|
@ -315,7 +316,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
|
|||
}
|
||||
|
||||
void incDfsUsedAndNumBlocks(String bpid, long value) {
|
||||
synchronized (dataset) {
|
||||
try(AutoCloseableLock lock = dataset.acquireDatasetLock()) {
|
||||
BlockPoolSlice bp = bpSlices.get(bpid);
|
||||
if (bp != null) {
|
||||
bp.incDfsUsed(value);
|
||||
|
@ -325,7 +326,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
|
|||
}
|
||||
|
||||
void incDfsUsed(String bpid, long value) {
|
||||
synchronized(dataset) {
|
||||
try(AutoCloseableLock lock = dataset.acquireDatasetLock()) {
|
||||
BlockPoolSlice bp = bpSlices.get(bpid);
|
||||
if (bp != null) {
|
||||
bp.incDfsUsed(value);
|
||||
|
@ -336,7 +337,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
|
|||
@VisibleForTesting
|
||||
public long getDfsUsed() throws IOException {
|
||||
long dfsUsed = 0;
|
||||
synchronized(dataset) {
|
||||
try(AutoCloseableLock lock = dataset.acquireDatasetLock()) {
|
||||
for(BlockPoolSlice s : bpSlices.values()) {
|
||||
dfsUsed += s.getDfsUsed();
|
||||
}
|
||||
|
|
|
@ -62,6 +62,7 @@ import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
|
|||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.metrics2.MetricsCollector;
|
||||
import org.apache.hadoop.metrics2.util.MBeans;
|
||||
import org.apache.hadoop.util.AutoCloseableLock;
|
||||
import org.apache.hadoop.util.DataChecksum;
|
||||
|
||||
/**
|
||||
|
@ -114,6 +115,8 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
|
|||
private static final DatanodeStorage.State DEFAULT_STATE =
|
||||
DatanodeStorage.State.NORMAL;
|
||||
|
||||
private final AutoCloseableLock datasetLock;
|
||||
|
||||
static final byte[] nullCrcFileData;
|
||||
static {
|
||||
DataChecksum checksum = DataChecksum.newDataChecksum(
|
||||
|
@ -550,6 +553,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
|
|||
conf.getLong(CONFIG_PROPERTY_CAPACITY, DEFAULT_CAPACITY),
|
||||
conf.getEnum(CONFIG_PROPERTY_STATE, DEFAULT_STATE));
|
||||
this.volume = new SimulatedVolume(this.storage);
|
||||
this.datasetLock = new AutoCloseableLock();
|
||||
}
|
||||
|
||||
public synchronized void injectBlocks(String bpid,
|
||||
|
@ -1365,5 +1369,10 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
|
|||
public boolean isDeletingBlock(String bpid, long blockId) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public AutoCloseableLock acquireDatasetLock() {
|
||||
return datasetLock.acquire();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -85,6 +85,7 @@ import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
|
|||
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
||||
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.util.AutoCloseableLock;
|
||||
import org.apache.hadoop.util.DataChecksum;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.apache.log4j.Level;
|
||||
|
@ -693,7 +694,7 @@ public class TestBlockRecovery {
|
|||
final RecoveringBlock recoveringBlock = new RecoveringBlock(
|
||||
block.getBlock(), locations, block.getBlock()
|
||||
.getGenerationStamp() + 1);
|
||||
synchronized (dataNode.data) {
|
||||
try(AutoCloseableLock lock = dataNode.data.acquireDatasetLock()) {
|
||||
Thread.sleep(2000);
|
||||
dataNode.initReplicaRecovery(recoveringBlock);
|
||||
}
|
||||
|
|
|
@ -60,6 +60,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil;
|
|||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.LazyPersistTestCase;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.util.AutoCloseableLock;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
@ -109,7 +110,7 @@ public class TestDirectoryScanner {
|
|||
|
||||
/** Truncate a block file */
|
||||
private long truncateBlockFile() throws IOException {
|
||||
synchronized (fds) {
|
||||
try(AutoCloseableLock lock = fds.acquireDatasetLock()) {
|
||||
for (ReplicaInfo b : FsDatasetTestUtil.getReplicas(fds, bpid)) {
|
||||
File f = b.getBlockFile();
|
||||
File mf = b.getMetaFile();
|
||||
|
@ -134,7 +135,7 @@ public class TestDirectoryScanner {
|
|||
|
||||
/** Delete a block file */
|
||||
private long deleteBlockFile() {
|
||||
synchronized(fds) {
|
||||
try(AutoCloseableLock lock = fds.acquireDatasetLock()) {
|
||||
for (ReplicaInfo b : FsDatasetTestUtil.getReplicas(fds, bpid)) {
|
||||
File f = b.getBlockFile();
|
||||
File mf = b.getMetaFile();
|
||||
|
@ -150,7 +151,7 @@ public class TestDirectoryScanner {
|
|||
|
||||
/** Delete block meta file */
|
||||
private long deleteMetaFile() {
|
||||
synchronized(fds) {
|
||||
try(AutoCloseableLock lock = fds.acquireDatasetLock()) {
|
||||
for (ReplicaInfo b : FsDatasetTestUtil.getReplicas(fds, bpid)) {
|
||||
File file = b.getMetaFile();
|
||||
// Delete a metadata file
|
||||
|
@ -169,7 +170,7 @@ public class TestDirectoryScanner {
|
|||
* @throws IOException
|
||||
*/
|
||||
private void duplicateBlock(long blockId) throws IOException {
|
||||
synchronized (fds) {
|
||||
try(AutoCloseableLock lock = fds.acquireDatasetLock()) {
|
||||
ReplicaInfo b = FsDatasetTestUtil.fetchReplicaInfo(fds, bpid, blockId);
|
||||
try (FsDatasetSpi.FsVolumeReferences volumes =
|
||||
fds.getFsVolumeReferences()) {
|
||||
|
|
|
@ -43,6 +43,7 @@ import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
|
|||
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
||||
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
|
||||
import org.apache.hadoop.metrics2.MetricsCollector;
|
||||
import org.apache.hadoop.util.AutoCloseableLock;
|
||||
|
||||
public class ExternalDatasetImpl implements FsDatasetSpi<ExternalVolumeImpl> {
|
||||
|
||||
|
@ -448,4 +449,9 @@ public class ExternalDatasetImpl implements FsDatasetSpi<ExternalVolumeImpl> {
|
|||
public boolean isDeletingBlock(String bpid, long blockId) {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public AutoCloseableLock acquireDatasetLock() {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue