HADOOP-10682. Replace FsDatasetImpl object lock with a separate lock object. (Chen Liang)

This commit is contained in:
Arpit Agarwal 2016-08-08 12:02:53 -07:00
parent 625585950a
commit 8c0638471f
10 changed files with 535 additions and 446 deletions

View File

@ -115,6 +115,7 @@ import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DFSUtilClient; import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.HDFSPolicyProvider; import org.apache.hadoop.hdfs.HDFSPolicyProvider;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.hdfs.client.BlockReportOptions; import org.apache.hadoop.hdfs.client.BlockReportOptions;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.net.DomainPeerServer; import org.apache.hadoop.hdfs.net.DomainPeerServer;
@ -2877,7 +2878,7 @@ public class DataNode extends ReconfigurableBase
final BlockConstructionStage stage; final BlockConstructionStage stage;
//get replica information //get replica information
synchronized(data) { try(AutoCloseableLock lock = data.acquireDatasetLock()) {
Block storedBlock = data.getStoredBlock(b.getBlockPoolId(), Block storedBlock = data.getStoredBlock(b.getBlockPoolId(),
b.getBlockId()); b.getBlockId());
if (null == storedBlock) { if (null == storedBlock) {

View File

@ -44,6 +44,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
@ -583,7 +584,7 @@ public class DirectoryScanner implements Runnable {
Map<String, ScanInfo[]> diskReport = getDiskReport(); Map<String, ScanInfo[]> diskReport = getDiskReport();
// Hold FSDataset lock to prevent further changes to the block map // Hold FSDataset lock to prevent further changes to the block map
synchronized(dataset) { try(AutoCloseableLock lock = dataset.acquireDatasetLock()) {
for (Entry<String, ScanInfo[]> entry : diskReport.entrySet()) { for (Entry<String, ScanInfo[]> entry : diskReport.entrySet()) {
String bpid = entry.getKey(); String bpid = entry.getKey();
ScanInfo[] blockpoolReport = entry.getValue(); ScanInfo[] blockpoolReport = entry.getValue();

View File

@ -22,6 +22,7 @@ import com.google.common.base.Preconditions;
import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.codec.digest.DigestUtils;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus import org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus
@ -454,7 +455,7 @@ public class DiskBalancer {
Map<String, FsVolumeSpi> pathMap = new HashMap<>(); Map<String, FsVolumeSpi> pathMap = new HashMap<>();
FsDatasetSpi.FsVolumeReferences references; FsDatasetSpi.FsVolumeReferences references;
try { try {
synchronized (this.dataset) { try(AutoCloseableLock lock = this.dataset.acquireDatasetLock()) {
references = this.dataset.getFsVolumeReferences(); references = this.dataset.getFsVolumeReferences();
for (int ndx = 0; ndx < references.size(); ndx++) { for (int ndx = 0; ndx < references.size(); ndx++) {
FsVolumeSpi vol = references.get(ndx); FsVolumeSpi vol = references.get(ndx);

View File

@ -35,6 +35,7 @@ import java.util.Set;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
@ -639,4 +640,9 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
*/ */
ReplicaInfo moveBlockAcrossVolumes(final ExtendedBlock block, ReplicaInfo moveBlockAcrossVolumes(final ExtendedBlock block,
FsVolumeSpi destination) throws IOException; FsVolumeSpi destination) throws IOException;
/**
* Acquire the lock of the data set.
*/
AutoCloseableLock acquireDatasetLock();
} }

View File

@ -60,6 +60,7 @@ import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtilClient; import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.ExtendedBlockId; import org.apache.hadoop.hdfs.ExtendedBlockId;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
@ -175,14 +176,18 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
} }
@Override @Override
public synchronized FsVolumeImpl getVolume(final ExtendedBlock b) { public FsVolumeImpl getVolume(final ExtendedBlock b) {
final ReplicaInfo r = volumeMap.get(b.getBlockPoolId(), b.getLocalBlock()); try (AutoCloseableLock lock = datasetLock.acquire()) {
final ReplicaInfo r =
volumeMap.get(b.getBlockPoolId(), b.getLocalBlock());
return r != null ? (FsVolumeImpl) r.getVolume() : null; return r != null ? (FsVolumeImpl) r.getVolume() : null;
} }
}
@Override // FsDatasetSpi @Override // FsDatasetSpi
public synchronized Block getStoredBlock(String bpid, long blkid) public Block getStoredBlock(String bpid, long blkid)
throws IOException { throws IOException {
try (AutoCloseableLock lock = datasetLock.acquire()) {
File blockfile = getFile(bpid, blkid, false); File blockfile = getFile(bpid, blkid, false);
if (blockfile == null) { if (blockfile == null) {
return null; return null;
@ -191,6 +196,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
final long gs = FsDatasetUtil.parseGenerationStamp(blockfile, metafile); final long gs = FsDatasetUtil.parseGenerationStamp(blockfile, metafile);
return new Block(blkid, blockfile.length(), gs); return new Block(blkid, blockfile.length(), gs);
} }
}
/** /**
@ -259,6 +265,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
private boolean blockPinningEnabled; private boolean blockPinningEnabled;
private final int maxDataLength; private final int maxDataLength;
private final AutoCloseableLock datasetLock;
/** /**
* An FSDataset has a directory where it loads its data files. * An FSDataset has a directory where it loads its data files.
*/ */
@ -269,6 +277,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
this.dataStorage = storage; this.dataStorage = storage;
this.conf = conf; this.conf = conf;
this.smallBufferSize = DFSUtilClient.getSmallBufferSize(conf); this.smallBufferSize = DFSUtilClient.getSmallBufferSize(conf);
this.datasetLock = new AutoCloseableLock();
// The number of volumes required for operation is the total number // The number of volumes required for operation is the total number
// of volumes minus the number of failed volumes we can tolerate. // of volumes minus the number of failed volumes we can tolerate.
volFailuresTolerated = datanode.getDnConf().getVolFailuresTolerated(); volFailuresTolerated = datanode.getDnConf().getVolFailuresTolerated();
@ -341,6 +350,11 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH_DEFAULT); CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH_DEFAULT);
} }
@Override
public AutoCloseableLock acquireDatasetLock() {
return datasetLock.acquire();
}
/** /**
* Gets initial volume failure information for all volumes that failed * Gets initial volume failure information for all volumes that failed
* immediately at startup. The method works by determining the set difference * immediately at startup. The method works by determining the set difference
@ -375,10 +389,11 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
* Activate a volume to serve requests. * Activate a volume to serve requests.
* @throws IOException if the storage UUID already exists. * @throws IOException if the storage UUID already exists.
*/ */
private synchronized void activateVolume( private void activateVolume(
ReplicaMap replicaMap, ReplicaMap replicaMap,
Storage.StorageDirectory sd, StorageType storageType, Storage.StorageDirectory sd, StorageType storageType,
FsVolumeReference ref) throws IOException { FsVolumeReference ref) throws IOException {
try (AutoCloseableLock lock = datasetLock.acquire()) {
DatanodeStorage dnStorage = storageMap.get(sd.getStorageUuid()); DatanodeStorage dnStorage = storageMap.get(sd.getStorageUuid());
if (dnStorage != null) { if (dnStorage != null) {
final String errorMsg = String.format( final String errorMsg = String.format(
@ -395,6 +410,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
asyncDiskService.addVolume(sd.getCurrentDir()); asyncDiskService.addVolume(sd.getCurrentDir());
volumes.addVolume(ref); volumes.addVolume(ref);
} }
}
private void addVolume(Collection<StorageLocation> dataLocations, private void addVolume(Collection<StorageLocation> dataLocations,
Storage.StorageDirectory sd) throws IOException { Storage.StorageDirectory sd) throws IOException {
@ -488,7 +504,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
Map<String, List<ReplicaInfo>> blkToInvalidate = new HashMap<>(); Map<String, List<ReplicaInfo>> blkToInvalidate = new HashMap<>();
List<String> storageToRemove = new ArrayList<>(); List<String> storageToRemove = new ArrayList<>();
synchronized (this) { try (AutoCloseableLock lock = datasetLock.acquire()) {
for (int idx = 0; idx < dataStorage.getNumStorageDirs(); idx++) { for (int idx = 0; idx < dataStorage.getNumStorageDirs(); idx++) {
Storage.StorageDirectory sd = dataStorage.getStorageDir(idx); Storage.StorageDirectory sd = dataStorage.getStorageDir(idx);
final File absRoot = sd.getRoot().getAbsoluteFile(); final File absRoot = sd.getRoot().getAbsoluteFile();
@ -534,7 +550,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
} }
} }
synchronized (this) { try (AutoCloseableLock lock = datasetLock.acquire()) {
for(String storageUuid : storageToRemove) { for(String storageUuid : storageToRemove) {
storageMap.remove(storageUuid); storageMap.remove(storageUuid);
} }
@ -743,7 +759,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
boolean touch) boolean touch)
throws IOException { throws IOException {
final File f; final File f;
synchronized(this) { try (AutoCloseableLock lock = datasetLock.acquire()) {
f = getFile(b.getBlockPoolId(), b.getLocalBlock().getBlockId(), touch); f = getFile(b.getBlockPoolId(), b.getLocalBlock().getBlockId(), touch);
} }
if (f == null) { if (f == null) {
@ -809,14 +825,16 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
* Returns handles to the block file and its metadata file * Returns handles to the block file and its metadata file
*/ */
@Override // FsDatasetSpi @Override // FsDatasetSpi
public synchronized ReplicaInputStreams getTmpInputStreams(ExtendedBlock b, public ReplicaInputStreams getTmpInputStreams(ExtendedBlock b,
long blkOffset, long metaOffset) throws IOException { long blkOffset, long metaOffset) throws IOException {
try (AutoCloseableLock lock = datasetLock.acquire()) {
ReplicaInfo info = getReplicaInfo(b); ReplicaInfo info = getReplicaInfo(b);
FsVolumeReference ref = info.getVolume().obtainReference(); FsVolumeReference ref = info.getVolume().obtainReference();
try { try {
InputStream blockInStream = openAndSeek(info.getBlockFile(), blkOffset); InputStream blockInStream = openAndSeek(info.getBlockFile(), blkOffset);
try { try {
InputStream metaInStream = openAndSeek(info.getMetaFile(), metaOffset); InputStream metaInStream =
openAndSeek(info.getMetaFile(), metaOffset);
return new ReplicaInputStreams(blockInStream, metaInStream, ref); return new ReplicaInputStreams(blockInStream, metaInStream, ref);
} catch (IOException e) { } catch (IOException e) {
IOUtils.cleanup(null, blockInStream); IOUtils.cleanup(null, blockInStream);
@ -827,6 +845,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
throw e; throw e;
} }
} }
}
private static FileInputStream openAndSeek(File file, long offset) private static FileInputStream openAndSeek(File file, long offset)
throws IOException { throws IOException {
@ -943,7 +962,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
} }
FsVolumeReference volumeRef = null; FsVolumeReference volumeRef = null;
synchronized (this) { try (AutoCloseableLock lock = datasetLock.acquire()) {
volumeRef = volumes.getNextVolume(targetStorageType, block.getNumBytes()); volumeRef = volumes.getNextVolume(targetStorageType, block.getNumBytes());
} }
try { try {
@ -985,7 +1004,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
newReplicaInfo.setNumBytes(blockFiles[1].length()); newReplicaInfo.setNumBytes(blockFiles[1].length());
// Finalize the copied files // Finalize the copied files
newReplicaInfo = finalizeReplica(block.getBlockPoolId(), newReplicaInfo); newReplicaInfo = finalizeReplica(block.getBlockPoolId(), newReplicaInfo);
synchronized (this) { try (AutoCloseableLock lock = datasetLock.acquire()) {
// Increment numBlocks here as this block moved without knowing to BPS // Increment numBlocks here as this block moved without knowing to BPS
FsVolumeImpl volume = (FsVolumeImpl) newReplicaInfo.getVolume(); FsVolumeImpl volume = (FsVolumeImpl) newReplicaInfo.getVolume();
volume.getBlockPoolSlice(block.getBlockPoolId()).incrNumBlocks(); volume.getBlockPoolSlice(block.getBlockPoolId()).incrNumBlocks();
@ -1015,7 +1034,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
FsVolumeReference volumeRef = null; FsVolumeReference volumeRef = null;
synchronized (this) { try (AutoCloseableLock lock = datasetLock.acquire()) {
volumeRef = destination.obtainReference(); volumeRef = destination.obtainReference();
} }
@ -1143,8 +1162,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
@Override // FsDatasetSpi @Override // FsDatasetSpi
public synchronized ReplicaHandler append(ExtendedBlock b, public ReplicaHandler append(ExtendedBlock b,
long newGS, long expectedBlockLen) throws IOException { long newGS, long expectedBlockLen) throws IOException {
try (AutoCloseableLock lock = datasetLock.acquire()) {
// If the block was successfully finalized because all packets // If the block was successfully finalized because all packets
// were successfully processed at the Datanode but the ack for // were successfully processed at the Datanode but the ack for
// some of the packets were not received by the client. The client // some of the packets were not received by the client. The client
@ -1171,7 +1191,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
FsVolumeReference ref = replicaInfo.getVolume().obtainReference(); FsVolumeReference ref = replicaInfo.getVolume().obtainReference();
ReplicaBeingWritten replica = null; ReplicaBeingWritten replica = null;
try { try {
replica = append(b.getBlockPoolId(), (FinalizedReplica)replicaInfo, newGS, replica = append(b.getBlockPoolId(),
(FinalizedReplica) replicaInfo, newGS,
b.getNumBytes()); b.getNumBytes());
} catch (IOException e) { } catch (IOException e) {
IOUtils.cleanup(null, ref); IOUtils.cleanup(null, ref);
@ -1179,6 +1200,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
} }
return new ReplicaHandler(replica, ref); return new ReplicaHandler(replica, ref);
} }
}
/** Append to a finalized replica /** Append to a finalized replica
* Change a finalized replica to be a RBW replica and * Change a finalized replica to be a RBW replica and
@ -1192,14 +1214,15 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
* @throws IOException if moving the replica from finalized directory * @throws IOException if moving the replica from finalized directory
* to rbw directory fails * to rbw directory fails
*/ */
private synchronized ReplicaBeingWritten append(String bpid, private ReplicaBeingWritten append(String bpid,
FinalizedReplica replicaInfo, long newGS, long estimateBlockLen) FinalizedReplica replicaInfo, long newGS, long estimateBlockLen)
throws IOException { throws IOException {
try (AutoCloseableLock lock = datasetLock.acquire()) {
// If the block is cached, start uncaching it. // If the block is cached, start uncaching it.
cacheManager.uncacheBlock(bpid, replicaInfo.getBlockId()); cacheManager.uncacheBlock(bpid, replicaInfo.getBlockId());
// If there are any hardlinks to the block, break them. This ensures we are // If there are any hardlinks to the block, break them. This ensures we
// not appending to a file that is part of a previous/ directory. // are not appending to a file that is part of a previous/ directory.
replicaInfo.breakHardLinksIfNeeded(); replicaInfo.breakHardLinksIfNeeded();
// construct a RBW replica with the new GS // construct a RBW replica with the new GS
@ -1253,6 +1276,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
v.reserveSpaceForReplica(bytesReserved); v.reserveSpaceForReplica(bytesReserved);
return newReplicaInfo; return newReplicaInfo;
} }
}
private static class MustStopExistingWriter extends Exception { private static class MustStopExistingWriter extends Exception {
private final ReplicaInPipeline rip; private final ReplicaInPipeline rip;
@ -1321,7 +1345,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
while (true) { while (true) {
try { try {
synchronized (this) { try (AutoCloseableLock lock = datasetLock.acquire()) {
ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen); ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen);
FsVolumeReference ref = replicaInfo.getVolume().obtainReference(); FsVolumeReference ref = replicaInfo.getVolume().obtainReference();
@ -1353,7 +1377,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
LOG.info("Recover failed close " + b); LOG.info("Recover failed close " + b);
while (true) { while (true) {
try { try {
synchronized (this) { try (AutoCloseableLock lock = datasetLock.acquire()) {
// check replica's state // check replica's state
ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen); ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen);
// bump the replica's GS // bump the replica's GS
@ -1400,9 +1424,10 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
} }
@Override // FsDatasetSpi @Override // FsDatasetSpi
public synchronized ReplicaHandler createRbw( public ReplicaHandler createRbw(
StorageType storageType, ExtendedBlock b, boolean allowLazyPersist) StorageType storageType, ExtendedBlock b, boolean allowLazyPersist)
throws IOException { throws IOException {
try (AutoCloseableLock lock = datasetLock.acquire()) {
ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
b.getBlockId()); b.getBlockId());
if (replicaInfo != null) { if (replicaInfo != null) {
@ -1452,11 +1477,13 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
throw e; throw e;
} }
ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(b.getBlockId(), ReplicaBeingWritten newReplicaInfo =
new ReplicaBeingWritten(b.getBlockId(),
b.getGenerationStamp(), v, f.getParentFile(), b.getNumBytes()); b.getGenerationStamp(), v, f.getParentFile(), b.getNumBytes());
volumeMap.add(b.getBlockPoolId(), newReplicaInfo); volumeMap.add(b.getBlockPoolId(), newReplicaInfo);
return new ReplicaHandler(newReplicaInfo, ref); return new ReplicaHandler(newReplicaInfo, ref);
} }
}
@Override // FsDatasetSpi @Override // FsDatasetSpi
public ReplicaHandler recoverRbw( public ReplicaHandler recoverRbw(
@ -1466,7 +1493,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
while (true) { while (true) {
try { try {
synchronized (this) { try (AutoCloseableLock lock = datasetLock.acquire()) {
ReplicaInfo replicaInfo = getReplicaInfo(b.getBlockPoolId(), b.getBlockId()); ReplicaInfo replicaInfo = getReplicaInfo(b.getBlockPoolId(), b.getBlockId());
// check the replica's state // check the replica's state
@ -1487,9 +1514,10 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
} }
} }
private synchronized ReplicaHandler recoverRbwImpl(ReplicaBeingWritten rbw, private ReplicaHandler recoverRbwImpl(ReplicaBeingWritten rbw,
ExtendedBlock b, long newGS, long minBytesRcvd, long maxBytesRcvd) ExtendedBlock b, long newGS, long minBytesRcvd, long maxBytesRcvd)
throws IOException { throws IOException {
try (AutoCloseableLock lock = datasetLock.acquire()) {
// check generation stamp // check generation stamp
long replicaGenerationStamp = rbw.getGenerationStamp(); long replicaGenerationStamp = rbw.getGenerationStamp();
if (replicaGenerationStamp < b.getGenerationStamp() || if (replicaGenerationStamp < b.getGenerationStamp() ||
@ -1530,10 +1558,12 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
} }
return new ReplicaHandler(rbw, ref); return new ReplicaHandler(rbw, ref);
} }
}
@Override // FsDatasetSpi @Override // FsDatasetSpi
public synchronized ReplicaInPipeline convertTemporaryToRbw( public ReplicaInPipeline convertTemporaryToRbw(
final ExtendedBlock b) throws IOException { final ExtendedBlock b) throws IOException {
try (AutoCloseableLock lock = datasetLock.acquire()) {
final long blockId = b.getBlockId(); final long blockId = b.getBlockId();
final long expectedGs = b.getGenerationStamp(); final long expectedGs = b.getGenerationStamp();
final long visible = b.getNumBytes(); final long visible = b.getNumBytes();
@ -1541,7 +1571,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
+ visible); + visible);
final ReplicaInPipeline temp; final ReplicaInPipeline temp;
{
// get replica // get replica
final ReplicaInfo r = volumeMap.get(b.getBlockPoolId(), blockId); final ReplicaInfo r = volumeMap.get(b.getBlockPoolId(), blockId);
if (r == null) { if (r == null) {
@ -1554,7 +1584,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
"r.getState() != ReplicaState.TEMPORARY, r=" + r); "r.getState() != ReplicaState.TEMPORARY, r=" + r);
} }
temp = (ReplicaInPipeline) r; temp = (ReplicaInPipeline) r;
}
// check generation stamp // check generation stamp
if (temp.getGenerationStamp() != expectedGs) { if (temp.getGenerationStamp() != expectedGs) {
throw new ReplicaAlreadyExistsException( throw new ReplicaAlreadyExistsException(
@ -1591,6 +1621,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
volumeMap.add(b.getBlockPoolId(), rbw); volumeMap.add(b.getBlockPoolId(), rbw);
return rbw; return rbw;
} }
}
@Override // FsDatasetSpi @Override // FsDatasetSpi
public ReplicaHandler createTemporary( public ReplicaHandler createTemporary(
@ -1599,7 +1630,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
long writerStopTimeoutMs = datanode.getDnConf().getXceiverStopTimeout(); long writerStopTimeoutMs = datanode.getDnConf().getXceiverStopTimeout();
ReplicaInfo lastFoundReplicaInfo = null; ReplicaInfo lastFoundReplicaInfo = null;
do { do {
synchronized (this) { try (AutoCloseableLock lock = datasetLock.acquire()) {
ReplicaInfo currentReplicaInfo = ReplicaInfo currentReplicaInfo =
volumeMap.get(b.getBlockPoolId(), b.getBlockId()); volumeMap.get(b.getBlockPoolId(), b.getBlockId());
if (currentReplicaInfo == lastFoundReplicaInfo) { if (currentReplicaInfo == lastFoundReplicaInfo) {
@ -1678,7 +1709,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
* Complete the block write! * Complete the block write!
*/ */
@Override // FsDatasetSpi @Override // FsDatasetSpi
public synchronized void finalizeBlock(ExtendedBlock b) throws IOException { public void finalizeBlock(ExtendedBlock b) throws IOException {
try (AutoCloseableLock lock = datasetLock.acquire()) {
if (Thread.interrupted()) { if (Thread.interrupted()) {
// Don't allow data modifications from interrupted threads // Don't allow data modifications from interrupted threads
throw new IOException("Cannot finalize block from Interrupted Thread"); throw new IOException("Cannot finalize block from Interrupted Thread");
@ -1691,13 +1723,15 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
} }
finalizeReplica(b.getBlockPoolId(), replicaInfo); finalizeReplica(b.getBlockPoolId(), replicaInfo);
} }
}
private synchronized FinalizedReplica finalizeReplica(String bpid, private FinalizedReplica finalizeReplica(String bpid,
ReplicaInfo replicaInfo) throws IOException { ReplicaInfo replicaInfo) throws IOException {
try (AutoCloseableLock lock = datasetLock.acquire()) {
FinalizedReplica newReplicaInfo = null; FinalizedReplica newReplicaInfo = null;
if (replicaInfo.getState() == ReplicaState.RUR && if (replicaInfo.getState() == ReplicaState.RUR &&
((ReplicaUnderRecovery)replicaInfo).getOriginalReplica().getState() == ((ReplicaUnderRecovery) replicaInfo).getOriginalReplica().getState()
ReplicaState.FINALIZED) { == ReplicaState.FINALIZED) {
newReplicaInfo = (FinalizedReplica) newReplicaInfo = (FinalizedReplica)
((ReplicaUnderRecovery) replicaInfo).getOriginalReplica(); ((ReplicaUnderRecovery) replicaInfo).getOriginalReplica();
} else { } else {
@ -1710,11 +1744,13 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
File dest = v.addFinalizedBlock( File dest = v.addFinalizedBlock(
bpid, replicaInfo, f, replicaInfo.getBytesReserved()); bpid, replicaInfo, f, replicaInfo.getBytesReserved());
newReplicaInfo = new FinalizedReplica(replicaInfo, v, dest.getParentFile()); newReplicaInfo =
new FinalizedReplica(replicaInfo, v, dest.getParentFile());
if (v.isTransientStorage()) { if (v.isTransientStorage()) {
releaseLockedMemory( releaseLockedMemory(
replicaInfo.getOriginalBytesReserved() - replicaInfo.getNumBytes(), replicaInfo.getOriginalBytesReserved()
- replicaInfo.getNumBytes(),
false); false);
ramDiskReplicaTracker.addReplica( ramDiskReplicaTracker.addReplica(
bpid, replicaInfo.getBlockId(), v, replicaInfo.getNumBytes()); bpid, replicaInfo.getBlockId(), v, replicaInfo.getNumBytes());
@ -1725,15 +1761,18 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
return newReplicaInfo; return newReplicaInfo;
} }
}
/** /**
* Remove the temporary block file (if any) * Remove the temporary block file (if any)
*/ */
@Override // FsDatasetSpi @Override // FsDatasetSpi
public synchronized void unfinalizeBlock(ExtendedBlock b) throws IOException { public void unfinalizeBlock(ExtendedBlock b) throws IOException {
try (AutoCloseableLock lock = datasetLock.acquire()) {
ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
b.getLocalBlock()); b.getLocalBlock());
if (replicaInfo != null && replicaInfo.getState() == ReplicaState.TEMPORARY) { if (replicaInfo != null
&& replicaInfo.getState() == ReplicaState.TEMPORARY) {
// remove from volumeMap // remove from volumeMap
volumeMap.remove(b.getBlockPoolId(), b.getLocalBlock()); volumeMap.remove(b.getBlockPoolId(), b.getLocalBlock());
@ -1743,7 +1782,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
LOG.warn("Block " + b + " unfinalized and removed. "); LOG.warn("Block " + b + " unfinalized and removed. ");
} }
if (replicaInfo.getVolume().isTransientStorage()) { if (replicaInfo.getVolume().isTransientStorage()) {
ramDiskReplicaTracker.discardReplica(b.getBlockPoolId(), b.getBlockId(), true); ramDiskReplicaTracker.discardReplica(b.getBlockPoolId(),
b.getBlockId(), true);
}
} }
} }
} }
@ -1791,7 +1832,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
builders.put(v.getStorageID(), BlockListAsLongs.builder(maxDataLength)); builders.put(v.getStorageID(), BlockListAsLongs.builder(maxDataLength));
} }
synchronized(this) { try (AutoCloseableLock lock = datasetLock.acquire()) {
for (ReplicaInfo b : volumeMap.replicas(bpid)) { for (ReplicaInfo b : volumeMap.replicas(bpid)) {
switch(b.getState()) { switch(b.getState()) {
case FINALIZED: case FINALIZED:
@ -1824,7 +1865,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
* Get the list of finalized blocks from in-memory blockmap for a block pool. * Get the list of finalized blocks from in-memory blockmap for a block pool.
*/ */
@Override @Override
public synchronized List<FinalizedReplica> getFinalizedBlocks(String bpid) { public List<FinalizedReplica> getFinalizedBlocks(String bpid) {
try (AutoCloseableLock lock = datasetLock.acquire()) {
ArrayList<FinalizedReplica> finalized = ArrayList<FinalizedReplica> finalized =
new ArrayList<FinalizedReplica>(volumeMap.size(bpid)); new ArrayList<FinalizedReplica>(volumeMap.size(bpid));
for (ReplicaInfo b : volumeMap.replicas(bpid)) { for (ReplicaInfo b : volumeMap.replicas(bpid)) {
@ -1834,12 +1876,15 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
} }
return finalized; return finalized;
} }
}
/** /**
* Get the list of finalized blocks from in-memory blockmap for a block pool. * Get the list of finalized blocks from in-memory blockmap for a block pool.
*/ */
@Override @Override
public synchronized List<FinalizedReplica> getFinalizedBlocksOnPersistentStorage(String bpid) { public List<FinalizedReplica> getFinalizedBlocksOnPersistentStorage(
String bpid) {
try (AutoCloseableLock lock = datasetLock.acquire()) {
ArrayList<FinalizedReplica> finalized = ArrayList<FinalizedReplica> finalized =
new ArrayList<FinalizedReplica>(volumeMap.size(bpid)); new ArrayList<FinalizedReplica>(volumeMap.size(bpid));
for (ReplicaInfo b : volumeMap.replicas(bpid)) { for (ReplicaInfo b : volumeMap.replicas(bpid)) {
@ -1850,6 +1895,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
} }
return finalized; return finalized;
} }
}
/** /**
* Check if a block is valid. * Check if a block is valid.
@ -1924,7 +1970,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
File validateBlockFile(String bpid, long blockId) { File validateBlockFile(String bpid, long blockId) {
//Should we check for metadata file too? //Should we check for metadata file too?
final File f; final File f;
synchronized(this) { try (AutoCloseableLock lock = datasetLock.acquire()) {
f = getFile(bpid, blockId, false); f = getFile(bpid, blockId, false);
} }
@ -1973,7 +2019,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
for (int i = 0; i < invalidBlks.length; i++) { for (int i = 0; i < invalidBlks.length; i++) {
final File f; final File f;
final FsVolumeImpl v; final FsVolumeImpl v;
synchronized (this) { try (AutoCloseableLock lock = datasetLock.acquire()) {
final ReplicaInfo info = volumeMap.get(bpid, invalidBlks[i]); final ReplicaInfo info = volumeMap.get(bpid, invalidBlks[i]);
if (info == null) { if (info == null) {
// It is okay if the block is not found -- it may be deleted earlier. // It is okay if the block is not found -- it may be deleted earlier.
@ -2084,7 +2130,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
long length, genstamp; long length, genstamp;
Executor volumeExecutor; Executor volumeExecutor;
synchronized (this) { try (AutoCloseableLock lock = datasetLock.acquire()) {
ReplicaInfo info = volumeMap.get(bpid, blockId); ReplicaInfo info = volumeMap.get(bpid, blockId);
boolean success = false; boolean success = false;
try { try {
@ -2151,10 +2197,12 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
} }
@Override // FsDatasetSpi @Override // FsDatasetSpi
public synchronized boolean contains(final ExtendedBlock block) { public boolean contains(final ExtendedBlock block) {
try (AutoCloseableLock lock = datasetLock.acquire()) {
final long blockId = block.getLocalBlock().getBlockId(); final long blockId = block.getLocalBlock().getBlockId();
return getFile(block.getBlockPoolId(), blockId, false) != null; return getFile(block.getBlockPoolId(), blockId, false) != null;
} }
}
/** /**
* Turn the block identifier into a filename * Turn the block identifier into a filename
@ -2279,7 +2327,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
File diskMetaFile, FsVolumeSpi vol) throws IOException { File diskMetaFile, FsVolumeSpi vol) throws IOException {
Block corruptBlock = null; Block corruptBlock = null;
ReplicaInfo memBlockInfo; ReplicaInfo memBlockInfo;
synchronized (this) { try (AutoCloseableLock lock = datasetLock.acquire()) {
memBlockInfo = volumeMap.get(bpid, blockId); memBlockInfo = volumeMap.get(bpid, blockId);
if (memBlockInfo != null && memBlockInfo.getState() != ReplicaState.FINALIZED) { if (memBlockInfo != null && memBlockInfo.getState() != ReplicaState.FINALIZED) {
// Block is not finalized - ignore the difference // Block is not finalized - ignore the difference
@ -2435,10 +2483,12 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
} }
@Override @Override
public synchronized String getReplicaString(String bpid, long blockId) { public String getReplicaString(String bpid, long blockId) {
try (AutoCloseableLock lock = datasetLock.acquire()) {
final Replica r = volumeMap.get(bpid, blockId); final Replica r = volumeMap.get(bpid, blockId);
return r == null ? "null" : r.toString(); return r == null ? "null" : r.toString();
} }
}
@Override // FsDatasetSpi @Override // FsDatasetSpi
public ReplicaRecoveryInfo initReplicaRecovery(RecoveringBlock rBlock) public ReplicaRecoveryInfo initReplicaRecovery(RecoveringBlock rBlock)
@ -2530,11 +2580,12 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
} }
@Override // FsDatasetSpi @Override // FsDatasetSpi
public synchronized Replica updateReplicaUnderRecovery( public Replica updateReplicaUnderRecovery(
final ExtendedBlock oldBlock, final ExtendedBlock oldBlock,
final long recoveryId, final long recoveryId,
final long newBlockId, final long newBlockId,
final long newlength) throws IOException { final long newlength) throws IOException {
try (AutoCloseableLock lock = datasetLock.acquire()) {
//get replica //get replica
final String bpid = oldBlock.getBlockPoolId(); final String bpid = oldBlock.getBlockPoolId();
final ReplicaInfo replica = volumeMap.get(bpid, oldBlock.getBlockId()); final ReplicaInfo replica = volumeMap.get(bpid, oldBlock.getBlockId());
@ -2592,6 +2643,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
return finalized; return finalized;
} }
}
private FinalizedReplica updateReplicaUnderRecovery( private FinalizedReplica updateReplicaUnderRecovery(
String bpid, String bpid,
@ -2668,8 +2720,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
} }
@Override // FsDatasetSpi @Override // FsDatasetSpi
public synchronized long getReplicaVisibleLength(final ExtendedBlock block) public long getReplicaVisibleLength(final ExtendedBlock block)
throws IOException { throws IOException {
try (AutoCloseableLock lock = datasetLock.acquire()) {
final Replica replica = getReplicaInfo(block.getBlockPoolId(), final Replica replica = getReplicaInfo(block.getBlockPoolId(),
block.getBlockId()); block.getBlockId());
if (replica.getGenerationStamp() < block.getGenerationStamp()) { if (replica.getGenerationStamp() < block.getGenerationStamp()) {
@ -2679,12 +2732,13 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
} }
return replica.getVisibleLength(); return replica.getVisibleLength();
} }
}
@Override @Override
public void addBlockPool(String bpid, Configuration conf) public void addBlockPool(String bpid, Configuration conf)
throws IOException { throws IOException {
LOG.info("Adding block pool " + bpid); LOG.info("Adding block pool " + bpid);
synchronized(this) { try (AutoCloseableLock lock = datasetLock.acquire()) {
volumes.addBlockPool(bpid, conf); volumes.addBlockPool(bpid, conf);
volumeMap.initBlockPool(bpid); volumeMap.initBlockPool(bpid);
} }
@ -2692,12 +2746,15 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
} }
@Override @Override
public synchronized void shutdownBlockPool(String bpid) { public void shutdownBlockPool(String bpid) {
try (AutoCloseableLock lock = datasetLock.acquire()) {
LOG.info("Removing block pool " + bpid); LOG.info("Removing block pool " + bpid);
Map<DatanodeStorage, BlockListAsLongs> blocksPerVolume = getBlockReports(bpid); Map<DatanodeStorage, BlockListAsLongs> blocksPerVolume
= getBlockReports(bpid);
volumeMap.cleanUpBlockPool(bpid); volumeMap.cleanUpBlockPool(bpid);
volumes.removeBlockPool(bpid, blocksPerVolume); volumes.removeBlockPool(bpid, blocksPerVolume);
} }
}
/** /**
* Class for representing the Datanode volume information * Class for representing the Datanode volume information
@ -2759,14 +2816,16 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
} }
@Override //FsDatasetSpi @Override //FsDatasetSpi
public synchronized void deleteBlockPool(String bpid, boolean force) public void deleteBlockPool(String bpid, boolean force)
throws IOException { throws IOException {
try (AutoCloseableLock lock = datasetLock.acquire()) {
List<FsVolumeImpl> curVolumes = volumes.getVolumes(); List<FsVolumeImpl> curVolumes = volumes.getVolumes();
if (!force) { if (!force) {
for (FsVolumeImpl volume : curVolumes) { for (FsVolumeImpl volume : curVolumes) {
try (FsVolumeReference ref = volume.obtainReference()) { try (FsVolumeReference ref = volume.obtainReference()) {
if (!volume.isBPDirEmpty(bpid)) { if (!volume.isBPDirEmpty(bpid)) {
LOG.warn(bpid + " has some block files, cannot delete unless forced"); LOG.warn(bpid
+ " has some block files, cannot delete unless forced");
throw new IOException("Cannot delete block pool, " throw new IOException("Cannot delete block pool, "
+ "it contains some block files"); + "it contains some block files");
} }
@ -2783,11 +2842,12 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
} }
} }
} }
}
@Override // FsDatasetSpi @Override // FsDatasetSpi
public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block) public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block)
throws IOException { throws IOException {
synchronized(this) { try (AutoCloseableLock lock = datasetLock.acquire()) {
final Replica replica = volumeMap.get(block.getBlockPoolId(), final Replica replica = volumeMap.get(block.getBlockPoolId(),
block.getBlockId()); block.getBlockId());
if (replica == null) { if (replica == null) {
@ -2838,7 +2898,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
@Override @Override
public void onCompleteLazyPersist(String bpId, long blockId, public void onCompleteLazyPersist(String bpId, long blockId,
long creationTime, File[] savedFiles, FsVolumeImpl targetVolume) { long creationTime, File[] savedFiles, FsVolumeImpl targetVolume) {
synchronized (FsDatasetImpl.this) { try (AutoCloseableLock lock = datasetLock.acquire()) {
ramDiskReplicaTracker.recordEndLazyPersist(bpId, blockId, savedFiles); ramDiskReplicaTracker.recordEndLazyPersist(bpId, blockId, savedFiles);
targetVolume.incDfsUsedAndNumBlocks(bpId, savedFiles[0].length() targetVolume.incDfsUsedAndNumBlocks(bpId, savedFiles[0].length()
@ -2972,7 +3032,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
try { try {
block = ramDiskReplicaTracker.dequeueNextReplicaToPersist(); block = ramDiskReplicaTracker.dequeueNextReplicaToPersist();
if (block != null) { if (block != null) {
synchronized (FsDatasetImpl.this) { try (AutoCloseableLock lock = datasetLock.acquire()) {
replicaInfo = volumeMap.get(block.getBlockPoolId(), block.getBlockId()); replicaInfo = volumeMap.get(block.getBlockPoolId(), block.getBlockId());
// If replicaInfo is null, the block was either deleted before // If replicaInfo is null, the block was either deleted before
@ -3042,7 +3102,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
long blockFileUsed, metaFileUsed; long blockFileUsed, metaFileUsed;
final String bpid = replicaState.getBlockPoolId(); final String bpid = replicaState.getBlockPoolId();
synchronized (FsDatasetImpl.this) { try (AutoCloseableLock lock = datasetLock.acquire()) {
replicaInfo = getReplicaInfo(replicaState.getBlockPoolId(), replicaInfo = getReplicaInfo(replicaState.getBlockPoolId(),
replicaState.getBlockId()); replicaState.getBlockId());
Preconditions.checkState(replicaInfo.getVolume().isTransientStorage()); Preconditions.checkState(replicaInfo.getVolume().isTransientStorage());
@ -3219,17 +3279,20 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
this.timer = newTimer; this.timer = newTimer;
} }
synchronized void stopAllDataxceiverThreads(FsVolumeImpl volume) { void stopAllDataxceiverThreads(FsVolumeImpl volume) {
try (AutoCloseableLock lock = datasetLock.acquire()) {
for (String blockPoolId : volumeMap.getBlockPoolList()) { for (String blockPoolId : volumeMap.getBlockPoolList()) {
Collection<ReplicaInfo> replicas = volumeMap.replicas(blockPoolId); Collection<ReplicaInfo> replicas = volumeMap.replicas(blockPoolId);
for (ReplicaInfo replicaInfo : replicas) { for (ReplicaInfo replicaInfo : replicas) {
if (replicaInfo instanceof ReplicaInPipeline if (replicaInfo instanceof ReplicaInPipeline
&& replicaInfo.getVolume().equals(volume)) { && replicaInfo.getVolume().equals(volume)) {
ReplicaInPipeline replicaInPipeline = (ReplicaInPipeline) replicaInfo; ReplicaInPipeline replicaInPipeline
= (ReplicaInPipeline) replicaInfo;
replicaInPipeline.interruptThread(); replicaInPipeline.interruptThread();
} }
} }
} }
} }
} }
}

View File

@ -45,6 +45,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.DF; import org.apache.hadoop.fs.DF;
import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
@ -304,7 +305,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
private void decDfsUsedAndNumBlocks(String bpid, long value, private void decDfsUsedAndNumBlocks(String bpid, long value,
boolean blockFileDeleted) { boolean blockFileDeleted) {
synchronized(dataset) { try(AutoCloseableLock lock = dataset.acquireDatasetLock()) {
BlockPoolSlice bp = bpSlices.get(bpid); BlockPoolSlice bp = bpSlices.get(bpid);
if (bp != null) { if (bp != null) {
bp.decDfsUsed(value); bp.decDfsUsed(value);
@ -316,7 +317,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
} }
void incDfsUsedAndNumBlocks(String bpid, long value) { void incDfsUsedAndNumBlocks(String bpid, long value) {
synchronized (dataset) { try(AutoCloseableLock lock = dataset.acquireDatasetLock()) {
BlockPoolSlice bp = bpSlices.get(bpid); BlockPoolSlice bp = bpSlices.get(bpid);
if (bp != null) { if (bp != null) {
bp.incDfsUsed(value); bp.incDfsUsed(value);
@ -326,7 +327,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
} }
void incDfsUsed(String bpid, long value) { void incDfsUsed(String bpid, long value) {
synchronized(dataset) { try(AutoCloseableLock lock = dataset.acquireDatasetLock()) {
BlockPoolSlice bp = bpSlices.get(bpid); BlockPoolSlice bp = bpSlices.get(bpid);
if (bp != null) { if (bp != null) {
bp.incDfsUsed(value); bp.incDfsUsed(value);
@ -337,7 +338,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
@VisibleForTesting @VisibleForTesting
public long getDfsUsed() throws IOException { public long getDfsUsed() throws IOException {
long dfsUsed = 0; long dfsUsed = 0;
synchronized(dataset) { try(AutoCloseableLock lock = dataset.acquireDatasetLock()) {
for(BlockPoolSlice s : bpSlices.values()) { for(BlockPoolSlice s : bpSlices.values()) {
dfsUsed += s.getDfsUsed(); dfsUsed += s.getDfsUsed();
} }

View File

@ -39,6 +39,7 @@ import javax.management.StandardMBean;
import org.apache.commons.lang.ArrayUtils; import org.apache.commons.lang.ArrayUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
@ -115,6 +116,9 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
DatanodeStorage.State.NORMAL; DatanodeStorage.State.NORMAL;
static final byte[] nullCrcFileData; static final byte[] nullCrcFileData;
private final AutoCloseableLock datasetLock;
static { static {
DataChecksum checksum = DataChecksum.newDataChecksum( DataChecksum checksum = DataChecksum.newDataChecksum(
DataChecksum.Type.NULL, 16*1024 ); DataChecksum.Type.NULL, 16*1024 );
@ -550,6 +554,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
conf.getLong(CONFIG_PROPERTY_CAPACITY, DEFAULT_CAPACITY), conf.getLong(CONFIG_PROPERTY_CAPACITY, DEFAULT_CAPACITY),
conf.getEnum(CONFIG_PROPERTY_STATE, DEFAULT_STATE)); conf.getEnum(CONFIG_PROPERTY_STATE, DEFAULT_STATE));
this.volume = new SimulatedVolume(this.storage); this.volume = new SimulatedVolume(this.storage);
this.datasetLock = new AutoCloseableLock();
} }
public synchronized void injectBlocks(String bpid, public synchronized void injectBlocks(String bpid,
@ -1366,5 +1371,9 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
return null; return null;
} }
@Override
public AutoCloseableLock acquireDatasetLock() {
return datasetLock.acquire();
}
} }

View File

@ -66,6 +66,7 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.StripedFileTestUtil; import org.apache.hadoop.hdfs.StripedFileTestUtil;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
@ -725,7 +726,7 @@ public class TestBlockRecovery {
final RecoveringBlock recoveringBlock = new RecoveringBlock( final RecoveringBlock recoveringBlock = new RecoveringBlock(
block.getBlock(), locations, block.getBlock() block.getBlock(), locations, block.getBlock()
.getGenerationStamp() + 1); .getGenerationStamp() + 1);
synchronized (dataNode.data) { try(AutoCloseableLock lock = dataNode.data.acquireDatasetLock()) {
Thread.sleep(2000); Thread.sleep(2000);
dataNode.initReplicaRecovery(recoveringBlock); dataNode.initReplicaRecovery(recoveringBlock);
} }

View File

@ -52,6 +52,7 @@ import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@ -113,7 +114,7 @@ public class TestDirectoryScanner {
/** Truncate a block file */ /** Truncate a block file */
private long truncateBlockFile() throws IOException { private long truncateBlockFile() throws IOException {
synchronized (fds) { try(AutoCloseableLock lock = fds.acquireDatasetLock()) {
for (ReplicaInfo b : FsDatasetTestUtil.getReplicas(fds, bpid)) { for (ReplicaInfo b : FsDatasetTestUtil.getReplicas(fds, bpid)) {
File f = b.getBlockFile(); File f = b.getBlockFile();
File mf = b.getMetaFile(); File mf = b.getMetaFile();
@ -138,7 +139,7 @@ public class TestDirectoryScanner {
/** Delete a block file */ /** Delete a block file */
private long deleteBlockFile() { private long deleteBlockFile() {
synchronized(fds) { try(AutoCloseableLock lock = fds.acquireDatasetLock()) {
for (ReplicaInfo b : FsDatasetTestUtil.getReplicas(fds, bpid)) { for (ReplicaInfo b : FsDatasetTestUtil.getReplicas(fds, bpid)) {
File f = b.getBlockFile(); File f = b.getBlockFile();
File mf = b.getMetaFile(); File mf = b.getMetaFile();
@ -154,7 +155,7 @@ public class TestDirectoryScanner {
/** Delete block meta file */ /** Delete block meta file */
private long deleteMetaFile() { private long deleteMetaFile() {
synchronized(fds) { try(AutoCloseableLock lock = fds.acquireDatasetLock()) {
for (ReplicaInfo b : FsDatasetTestUtil.getReplicas(fds, bpid)) { for (ReplicaInfo b : FsDatasetTestUtil.getReplicas(fds, bpid)) {
File file = b.getMetaFile(); File file = b.getMetaFile();
// Delete a metadata file // Delete a metadata file
@ -173,7 +174,7 @@ public class TestDirectoryScanner {
* @throws IOException * @throws IOException
*/ */
private void duplicateBlock(long blockId) throws IOException { private void duplicateBlock(long blockId) throws IOException {
synchronized (fds) { try(AutoCloseableLock lock = fds.acquireDatasetLock()) {
ReplicaInfo b = FsDatasetTestUtil.fetchReplicaInfo(fds, bpid, blockId); ReplicaInfo b = FsDatasetTestUtil.fetchReplicaInfo(fds, bpid, blockId);
try (FsDatasetSpi.FsVolumeReferences volumes = try (FsDatasetSpi.FsVolumeReferences volumes =
fds.getFsVolumeReferences()) { fds.getFsVolumeReferences()) {

View File

@ -23,6 +23,7 @@ import java.util.*;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
@ -450,4 +451,8 @@ public class ExternalDatasetImpl implements FsDatasetSpi<ExternalVolumeImpl> {
return null; return null;
} }
@Override
public AutoCloseableLock acquireDatasetLock() {
return null;
}
} }