HDFS-10682. Replace FsDatasetImpl object lock with a separate lock object. (Contributed by Chen Liang)

This commit is contained in:
Arpit Agarwal 2016-08-17 16:22:00 -07:00
parent bb6d866207
commit ad0ac6cced
10 changed files with 535 additions and 447 deletions

View File

@ -46,6 +46,7 @@ import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.ReadaheadPool.ReadaheadRequest;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.net.SocketOutputStream;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.util.DataChecksum;
import org.apache.htrace.core.Sampler;
import org.apache.htrace.core.TraceScope;
@ -242,7 +243,7 @@ class BlockSender implements java.io.Closeable {
}
final long replicaVisibleLength;
synchronized(datanode.data) {
try(AutoCloseableLock lock = datanode.data.acquireDatasetLock()) {
replica = getReplica(block, datanode);
replicaVisibleLength = replica.getVisibleLength();
}

View File

@ -201,6 +201,7 @@ import org.apache.hadoop.tracing.TraceAdminProtocolPB;
import org.apache.hadoop.tracing.TraceAdminProtocolServerSideTranslatorPB;
import org.apache.hadoop.tracing.TraceUtils;
import org.apache.hadoop.tracing.TracerConfigurationManager;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.DiskChecker;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
@ -2831,7 +2832,7 @@ public class DataNode extends ReconfigurableBase
final BlockConstructionStage stage;
//get replica information
synchronized(data) {
try(AutoCloseableLock lock = data.acquireDatasetLock()) {
Block storedBlock = data.getStoredBlock(b.getBlockPoolId(),
b.getBlockId());
if (null == storedBlock) {

View File

@ -44,12 +44,14 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.StopWatch;
import org.apache.hadoop.util.Time;
@ -583,7 +585,7 @@ public class DirectoryScanner implements Runnable {
Map<String, ScanInfo[]> diskReport = getDiskReport();
// Hold FSDataset lock to prevent further changes to the block map
synchronized(dataset) {
try(AutoCloseableLock lock = dataset.acquireDatasetLock()) {
for (Entry<String, ScanInfo[]> entry : diskReport.entrySet()) {
String bpid = entry.getKey();
ScanInfo[] blockpoolReport = entry.getValue();

View File

@ -60,6 +60,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.util.ReflectionUtils;
/**
@ -641,4 +642,9 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
* Confirm whether the block is deleting
*/
boolean isDeletingBlock(String bpid, long blockId);
/**
* Acquire the lock of the dataset.
*/
AutoCloseableLock acquireDatasetLock();
}

View File

@ -63,6 +63,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.ExtendedBlockId;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
@ -181,14 +182,18 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
}
@Override
public synchronized FsVolumeImpl getVolume(final ExtendedBlock b) {
final ReplicaInfo r = volumeMap.get(b.getBlockPoolId(), b.getLocalBlock());
return r != null? (FsVolumeImpl)r.getVolume(): null;
public FsVolumeImpl getVolume(final ExtendedBlock b) {
try(AutoCloseableLock lock = datasetLock.acquire()) {
final ReplicaInfo r =
volumeMap.get(b.getBlockPoolId(), b.getLocalBlock());
return r != null ? (FsVolumeImpl) r.getVolume() : null;
}
}
@Override // FsDatasetSpi
public synchronized Block getStoredBlock(String bpid, long blkid)
public Block getStoredBlock(String bpid, long blkid)
throws IOException {
try(AutoCloseableLock lock = datasetLock.acquire()) {
File blockfile = getFile(bpid, blkid, false);
if (blockfile == null) {
return null;
@ -197,6 +202,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
final long gs = FsDatasetUtil.parseGenerationStamp(blockfile, metafile);
return new Block(blkid, blockfile.length(), gs);
}
}
/**
@ -265,6 +271,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
private boolean blockPinningEnabled;
private final int maxDataLength;
private final AutoCloseableLock datasetLock;
/**
* An FSDataset has a directory where it loads its data files.
*/
@ -275,6 +282,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
this.dataStorage = storage;
this.conf = conf;
this.smallBufferSize = DFSUtilClient.getSmallBufferSize(conf);
this.datasetLock = new AutoCloseableLock();
// The number of volumes required for operation is the total number
// of volumes minus the number of failed volumes we can tolerate.
volFailuresTolerated = datanode.getDnConf().getVolFailuresTolerated();
@ -381,10 +389,11 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
* Activate a volume to serve requests.
* @throws IOException if the storage UUID already exists.
*/
private synchronized void activateVolume(
private void activateVolume(
ReplicaMap replicaMap,
Storage.StorageDirectory sd, StorageType storageType,
FsVolumeReference ref) throws IOException {
try(AutoCloseableLock lock = datasetLock.acquire()) {
DatanodeStorage dnStorage = storageMap.get(sd.getStorageUuid());
if (dnStorage != null) {
final String errorMsg = String.format(
@ -401,6 +410,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
asyncDiskService.addVolume(sd.getCurrentDir());
volumes.addVolume(ref);
}
}
private void addVolume(Collection<StorageLocation> dataLocations,
Storage.StorageDirectory sd) throws IOException {
@ -494,7 +504,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
Map<String, List<ReplicaInfo>> blkToInvalidate = new HashMap<>();
List<String> storageToRemove = new ArrayList<>();
synchronized (this) {
try(AutoCloseableLock lock = datasetLock.acquire()) {
for (int idx = 0; idx < dataStorage.getNumStorageDirs(); idx++) {
Storage.StorageDirectory sd = dataStorage.getStorageDir(idx);
final File absRoot = sd.getRoot().getAbsoluteFile();
@ -540,7 +550,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
}
}
synchronized (this) {
try(AutoCloseableLock lock = datasetLock.acquire()) {
for(String storageUuid : storageToRemove) {
storageMap.remove(storageUuid);
}
@ -749,7 +759,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
boolean touch)
throws IOException {
final File f;
synchronized(this) {
try(AutoCloseableLock lock = datasetLock.acquire()) {
f = getFile(b.getBlockPoolId(), b.getLocalBlock().getBlockId(), touch);
}
if (f == null) {
@ -815,14 +825,16 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
* Returns handles to the block file and its metadata file
*/
@Override // FsDatasetSpi
public synchronized ReplicaInputStreams getTmpInputStreams(ExtendedBlock b,
public ReplicaInputStreams getTmpInputStreams(ExtendedBlock b,
long blkOffset, long metaOffset) throws IOException {
try(AutoCloseableLock lock = datasetLock.acquire()) {
ReplicaInfo info = getReplicaInfo(b);
FsVolumeReference ref = info.getVolume().obtainReference();
try {
InputStream blockInStream = openAndSeek(info.getBlockFile(), blkOffset);
try {
InputStream metaInStream = openAndSeek(info.getMetaFile(), metaOffset);
InputStream metaInStream =
openAndSeek(info.getMetaFile(), metaOffset);
return new ReplicaInputStreams(blockInStream, metaInStream, ref);
} catch (IOException e) {
IOUtils.cleanup(null, blockInStream);
@ -833,6 +845,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
throw e;
}
}
}
private static FileInputStream openAndSeek(File file, long offset)
throws IOException {
@ -949,7 +962,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
}
FsVolumeReference volumeRef = null;
synchronized (this) {
try(AutoCloseableLock lock = datasetLock.acquire()) {
volumeRef = volumes.getNextVolume(targetStorageType, block.getNumBytes());
}
try {
@ -968,7 +981,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
newReplicaInfo.setNumBytes(blockFiles[1].length());
// Finalize the copied files
newReplicaInfo = finalizeReplica(block.getBlockPoolId(), newReplicaInfo);
synchronized (this) {
try(AutoCloseableLock lock = datasetLock.acquire()) {
// Increment numBlocks here as this block moved without knowing to BPS
FsVolumeImpl volume = (FsVolumeImpl) newReplicaInfo.getVolume();
volume.getBlockPoolSlice(block.getBlockPoolId()).incrNumBlocks();
@ -1100,8 +1113,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
@Override // FsDatasetSpi
public synchronized ReplicaHandler append(ExtendedBlock b,
public ReplicaHandler append(ExtendedBlock b,
long newGS, long expectedBlockLen) throws IOException {
try(AutoCloseableLock lock = datasetLock.acquire()) {
// If the block was successfully finalized because all packets
// were successfully processed at the Datanode but the ack for
// some of the packets were not received by the client. The client
@ -1128,14 +1142,15 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
FsVolumeReference ref = replicaInfo.getVolume().obtainReference();
ReplicaBeingWritten replica = null;
try {
replica = append(b.getBlockPoolId(), (FinalizedReplica)replicaInfo, newGS,
b.getNumBytes());
replica = append(b.getBlockPoolId(), (FinalizedReplica) replicaInfo,
newGS, b.getNumBytes());
} catch (IOException e) {
IOUtils.cleanup(null, ref);
throw e;
}
return new ReplicaHandler(replica, ref);
}
}
/** Append to a finalized replica
* Change a finalized replica to be a RBW replica and
@ -1149,19 +1164,20 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
* @throws IOException if moving the replica from finalized directory
* to rbw directory fails
*/
private synchronized ReplicaBeingWritten append(String bpid,
private ReplicaBeingWritten append(String bpid,
FinalizedReplica replicaInfo, long newGS, long estimateBlockLen)
throws IOException {
try(AutoCloseableLock lock = datasetLock.acquire()) {
// If the block is cached, start uncaching it.
cacheManager.uncacheBlock(bpid, replicaInfo.getBlockId());
// If there are any hardlinks to the block, break them. This ensures we are
// not appending to a file that is part of a previous/ directory.
// If there are any hardlinks to the block, break them. This ensures we
// are not appending to a file that is part of a previous/ directory.
replicaInfo.breakHardLinksIfNeeded();
// construct a RBW replica with the new GS
File blkfile = replicaInfo.getBlockFile();
FsVolumeImpl v = (FsVolumeImpl)replicaInfo.getVolume();
FsVolumeImpl v = (FsVolumeImpl) replicaInfo.getVolume();
long bytesReserved = estimateBlockLen - replicaInfo.getNumBytes();
if (v.getAvailable() < bytesReserved) {
throw new DiskOutOfSpaceException("Insufficient space for appending to "
@ -1210,6 +1226,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
v.reserveSpaceForReplica(bytesReserved);
return newReplicaInfo;
}
}
private static class MustStopExistingWriter extends Exception {
private final ReplicaInPipeline rip;
@ -1278,7 +1295,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
while (true) {
try {
synchronized (this) {
try(AutoCloseableLock lock = datasetLock.acquire()) {
ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen);
FsVolumeReference ref = replicaInfo.getVolume().obtainReference();
@ -1310,7 +1327,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
LOG.info("Recover failed close " + b);
while (true) {
try {
synchronized (this) {
try(AutoCloseableLock lock = datasetLock.acquire()) {
// check replica's state
ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen);
// bump the replica's GS
@ -1357,9 +1374,10 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
}
@Override // FsDatasetSpi
public synchronized ReplicaHandler createRbw(
public ReplicaHandler createRbw(
StorageType storageType, ExtendedBlock b, boolean allowLazyPersist)
throws IOException {
try(AutoCloseableLock lock = datasetLock.acquire()) {
ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
b.getBlockId());
if (replicaInfo != null) {
@ -1381,7 +1399,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
// First try to place the block on a transient volume.
ref = volumes.getNextTransientVolume(b.getNumBytes());
datanode.getMetrics().incrRamDiskBlocksWrite();
} catch(DiskOutOfSpaceException de) {
} catch (DiskOutOfSpaceException de) {
// Ignore the exception since we just fall back to persistent storage.
} finally {
if (ref == null) {
@ -1409,11 +1427,13 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
throw e;
}
ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(b.getBlockId(),
ReplicaBeingWritten newReplicaInfo =
new ReplicaBeingWritten(b.getBlockId(),
b.getGenerationStamp(), v, f.getParentFile(), b.getNumBytes());
volumeMap.add(b.getBlockPoolId(), newReplicaInfo);
return new ReplicaHandler(newReplicaInfo, ref);
}
}
@Override // FsDatasetSpi
public ReplicaHandler recoverRbw(
@ -1423,7 +1443,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
while (true) {
try {
synchronized (this) {
try(AutoCloseableLock lock = datasetLock.acquire()) {
ReplicaInfo replicaInfo = getReplicaInfo(b.getBlockPoolId(), b.getBlockId());
// check the replica's state
@ -1444,9 +1464,10 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
}
}
private synchronized ReplicaHandler recoverRbwImpl(ReplicaBeingWritten rbw,
private ReplicaHandler recoverRbwImpl(ReplicaBeingWritten rbw,
ExtendedBlock b, long newGS, long minBytesRcvd, long maxBytesRcvd)
throws IOException {
try(AutoCloseableLock lock = datasetLock.acquire()) {
// check generation stamp
long replicaGenerationStamp = rbw.getGenerationStamp();
if (replicaGenerationStamp < b.getGenerationStamp() ||
@ -1460,7 +1481,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
// check replica length
long bytesAcked = rbw.getBytesAcked();
long numBytes = rbw.getNumBytes();
if (bytesAcked < minBytesRcvd || numBytes > maxBytesRcvd){
if (bytesAcked < minBytesRcvd || numBytes > maxBytesRcvd) {
throw new ReplicaNotFoundException("Unmatched length replica " +
rbw + ": BytesAcked = " + bytesAcked +
" BytesRcvd = " + numBytes + " are not in the range of [" +
@ -1487,10 +1508,12 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
}
return new ReplicaHandler(rbw, ref);
}
}
@Override // FsDatasetSpi
public synchronized ReplicaInPipeline convertTemporaryToRbw(
public ReplicaInPipeline convertTemporaryToRbw(
final ExtendedBlock b) throws IOException {
try(AutoCloseableLock lock = datasetLock.acquire()) {
final long blockId = b.getBlockId();
final long expectedGs = b.getGenerationStamp();
final long visible = b.getNumBytes();
@ -1498,7 +1521,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
+ visible);
final ReplicaInPipeline temp;
{
// get replica
final ReplicaInfo r = volumeMap.get(b.getBlockPoolId(), blockId);
if (r == null) {
@ -1510,8 +1533,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
throw new ReplicaAlreadyExistsException(
"r.getState() != ReplicaState.TEMPORARY, r=" + r);
}
temp = (ReplicaInPipeline)r;
}
temp = (ReplicaInPipeline) r;
// check generation stamp
if (temp.getGenerationStamp() != expectedGs) {
throw new ReplicaAlreadyExistsException(
@ -1530,7 +1553,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
+ visible + ", temp=" + temp);
}
// check volume
final FsVolumeImpl v = (FsVolumeImpl)temp.getVolume();
final FsVolumeImpl v = (FsVolumeImpl) temp.getVolume();
if (v == null) {
throw new IOException("r.getVolume() = null, temp=" + temp);
}
@ -1548,6 +1571,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
volumeMap.add(b.getBlockPoolId(), rbw);
return rbw;
}
}
@Override // FsDatasetSpi
public ReplicaHandler createTemporary(
@ -1556,7 +1580,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
long writerStopTimeoutMs = datanode.getDnConf().getXceiverStopTimeout();
ReplicaInfo lastFoundReplicaInfo = null;
do {
synchronized (this) {
try(AutoCloseableLock lock = datasetLock.acquire()) {
ReplicaInfo currentReplicaInfo =
volumeMap.get(b.getBlockPoolId(), b.getBlockId());
if (currentReplicaInfo == lastFoundReplicaInfo) {
@ -1635,7 +1659,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
* Complete the block write!
*/
@Override // FsDatasetSpi
public synchronized void finalizeBlock(ExtendedBlock b) throws IOException {
public void finalizeBlock(ExtendedBlock b) throws IOException {
try(AutoCloseableLock lock = datasetLock.acquire()) {
if (Thread.interrupted()) {
// Don't allow data modifications from interrupted threads
throw new IOException("Cannot finalize block from Interrupted Thread");
@ -1648,17 +1673,19 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
}
finalizeReplica(b.getBlockPoolId(), replicaInfo);
}
}
private synchronized FinalizedReplica finalizeReplica(String bpid,
private FinalizedReplica finalizeReplica(String bpid,
ReplicaInfo replicaInfo) throws IOException {
try(AutoCloseableLock lock = datasetLock.acquire()) {
FinalizedReplica newReplicaInfo = null;
if (replicaInfo.getState() == ReplicaState.RUR &&
((ReplicaUnderRecovery)replicaInfo).getOriginalReplica().getState() ==
ReplicaState.FINALIZED) {
((ReplicaUnderRecovery) replicaInfo).getOriginalReplica().getState()
== ReplicaState.FINALIZED) {
newReplicaInfo = (FinalizedReplica)
((ReplicaUnderRecovery)replicaInfo).getOriginalReplica();
((ReplicaUnderRecovery) replicaInfo).getOriginalReplica();
} else {
FsVolumeImpl v = (FsVolumeImpl)replicaInfo.getVolume();
FsVolumeImpl v = (FsVolumeImpl) replicaInfo.getVolume();
File f = replicaInfo.getBlockFile();
if (v == null) {
throw new IOException("No volume for temporary file " + f +
@ -1667,11 +1694,13 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
File dest = v.addFinalizedBlock(
bpid, replicaInfo, f, replicaInfo.getBytesReserved());
newReplicaInfo = new FinalizedReplica(replicaInfo, v, dest.getParentFile());
newReplicaInfo =
new FinalizedReplica(replicaInfo, v, dest.getParentFile());
if (v.isTransientStorage()) {
releaseLockedMemory(
replicaInfo.getOriginalBytesReserved() - replicaInfo.getNumBytes(),
replicaInfo.getOriginalBytesReserved()
- replicaInfo.getNumBytes(),
false);
ramDiskReplicaTracker.addReplica(
bpid, replicaInfo.getBlockId(), v, replicaInfo.getNumBytes());
@ -1682,25 +1711,30 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
return newReplicaInfo;
}
}
/**
* Remove the temporary block file (if any)
*/
@Override // FsDatasetSpi
public synchronized void unfinalizeBlock(ExtendedBlock b) throws IOException {
public void unfinalizeBlock(ExtendedBlock b) throws IOException {
try(AutoCloseableLock lock = datasetLock.acquire()) {
ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
b.getLocalBlock());
if (replicaInfo != null && replicaInfo.getState() == ReplicaState.TEMPORARY) {
if (replicaInfo != null
&& replicaInfo.getState() == ReplicaState.TEMPORARY) {
// remove from volumeMap
volumeMap.remove(b.getBlockPoolId(), b.getLocalBlock());
// delete the on-disk temp file
if (delBlockFromDisk(replicaInfo.getBlockFile(),
replicaInfo.getMetaFile(), b.getLocalBlock())) {
LOG.warn("Block " + b + " unfinalized and removed. " );
LOG.warn("Block " + b + " unfinalized and removed. ");
}
if (replicaInfo.getVolume().isTransientStorage()) {
ramDiskReplicaTracker.discardReplica(b.getBlockPoolId(), b.getBlockId(), true);
ramDiskReplicaTracker.discardReplica(b.getBlockPoolId(),
b.getBlockId(), true);
}
}
}
}
@ -1739,7 +1773,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
new HashMap<String, BlockListAsLongs.Builder>();
List<FsVolumeImpl> curVolumes = null;
synchronized(this) {
try(AutoCloseableLock lock = datasetLock.acquire()) {
curVolumes = volumes.getVolumes();
for (FsVolumeSpi v : curVolumes) {
builders.put(v.getStorageID(), BlockListAsLongs.builder(maxDataLength));
@ -1792,32 +1826,37 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
* Get the list of finalized blocks from in-memory blockmap for a block pool.
*/
@Override
public synchronized List<FinalizedReplica> getFinalizedBlocks(String bpid) {
public List<FinalizedReplica> getFinalizedBlocks(String bpid) {
try(AutoCloseableLock lock = datasetLock.acquire()) {
ArrayList<FinalizedReplica> finalized =
new ArrayList<FinalizedReplica>(volumeMap.size(bpid));
for (ReplicaInfo b : volumeMap.replicas(bpid)) {
if(b.getState() == ReplicaState.FINALIZED) {
finalized.add(new FinalizedReplica((FinalizedReplica)b));
if (b.getState() == ReplicaState.FINALIZED) {
finalized.add(new FinalizedReplica((FinalizedReplica) b));
}
}
return finalized;
}
}
/**
* Get the list of finalized blocks from in-memory blockmap for a block pool.
*/
@Override
public synchronized List<FinalizedReplica> getFinalizedBlocksOnPersistentStorage(String bpid) {
public List<FinalizedReplica>
getFinalizedBlocksOnPersistentStorage(String bpid) {
try(AutoCloseableLock lock = datasetLock.acquire()) {
ArrayList<FinalizedReplica> finalized =
new ArrayList<FinalizedReplica>(volumeMap.size(bpid));
for (ReplicaInfo b : volumeMap.replicas(bpid)) {
if(!b.getVolume().isTransientStorage() &&
if (!b.getVolume().isTransientStorage() &&
b.getState() == ReplicaState.FINALIZED) {
finalized.add(new FinalizedReplica((FinalizedReplica)b));
finalized.add(new FinalizedReplica((FinalizedReplica) b));
}
}
return finalized;
}
}
/**
* Check if a block is valid.
@ -1892,7 +1931,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
File validateBlockFile(String bpid, long blockId) {
//Should we check for metadata file too?
final File f;
synchronized(this) {
try(AutoCloseableLock lock = datasetLock.acquire()) {
f = getFile(bpid, blockId, false);
}
@ -1941,7 +1980,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
for (int i = 0; i < invalidBlks.length; i++) {
final File f;
final FsVolumeImpl v;
synchronized (this) {
try(AutoCloseableLock lock = datasetLock.acquire()) {
final ReplicaInfo info = volumeMap.get(bpid, invalidBlks[i]);
if (info == null) {
// It is okay if the block is not found -- it may be deleted earlier.
@ -2052,7 +2091,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
long length, genstamp;
Executor volumeExecutor;
synchronized (this) {
try(AutoCloseableLock lock = datasetLock.acquire()) {
ReplicaInfo info = volumeMap.get(bpid, blockId);
boolean success = false;
try {
@ -2119,10 +2158,12 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
}
@Override // FsDatasetSpi
public synchronized boolean contains(final ExtendedBlock block) {
public boolean contains(final ExtendedBlock block) {
try(AutoCloseableLock lock = datasetLock.acquire()) {
final long blockId = block.getLocalBlock().getBlockId();
return getFile(block.getBlockPoolId(), blockId, false) != null;
}
}
/**
* Turn the block identifier into a filename
@ -2247,7 +2288,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
File diskMetaFile, FsVolumeSpi vol) throws IOException {
Block corruptBlock = null;
ReplicaInfo memBlockInfo;
synchronized (this) {
try(AutoCloseableLock lock = datasetLock.acquire()) {
memBlockInfo = volumeMap.get(bpid, blockId);
if (memBlockInfo != null && memBlockInfo.getState() != ReplicaState.FINALIZED) {
// Block is not finalized - ignore the difference
@ -2403,9 +2444,11 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
}
@Override
public synchronized String getReplicaString(String bpid, long blockId) {
public String getReplicaString(String bpid, long blockId) {
try(AutoCloseableLock lock = datasetLock.acquire()) {
final Replica r = volumeMap.get(bpid, blockId);
return r == null? "null": r.toString();
return r == null ? "null" : r.toString();
}
}
@Override // FsDatasetSpi
@ -2498,11 +2541,12 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
}
@Override // FsDatasetSpi
public synchronized Replica updateReplicaUnderRecovery(
public Replica updateReplicaUnderRecovery(
final ExtendedBlock oldBlock,
final long recoveryId,
final long newBlockId,
final long newlength) throws IOException {
try(AutoCloseableLock lock = datasetLock.acquire()) {
//get replica
final String bpid = oldBlock.getBlockPoolId();
final ReplicaInfo replica = volumeMap.get(bpid, oldBlock.getBlockId());
@ -2538,7 +2582,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
newBlockId, newlength);
boolean copyTruncate = newBlockId != oldBlock.getBlockId();
if(!copyTruncate) {
if (!copyTruncate) {
assert finalized.getBlockId() == oldBlock.getBlockId()
&& finalized.getGenerationStamp() == recoveryId
&& finalized.getNumBytes() == newlength
@ -2560,6 +2604,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
return finalized;
}
}
private FinalizedReplica updateReplicaUnderRecovery(
String bpid,
@ -2636,8 +2681,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
}
@Override // FsDatasetSpi
public synchronized long getReplicaVisibleLength(final ExtendedBlock block)
public long getReplicaVisibleLength(final ExtendedBlock block)
throws IOException {
try(AutoCloseableLock lock = datasetLock.acquire()) {
final Replica replica = getReplicaInfo(block.getBlockPoolId(),
block.getBlockId());
if (replica.getGenerationStamp() < block.getGenerationStamp()) {
@ -2647,12 +2693,13 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
}
return replica.getVisibleLength();
}
}
@Override
public void addBlockPool(String bpid, Configuration conf)
throws IOException {
LOG.info("Adding block pool " + bpid);
synchronized(this) {
try(AutoCloseableLock lock = datasetLock.acquire()) {
volumes.addBlockPool(bpid, conf);
volumeMap.initBlockPool(bpid);
}
@ -2660,12 +2707,15 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
}
@Override
public synchronized void shutdownBlockPool(String bpid) {
public void shutdownBlockPool(String bpid) {
try(AutoCloseableLock lock = datasetLock.acquire()) {
LOG.info("Removing block pool " + bpid);
Map<DatanodeStorage, BlockListAsLongs> blocksPerVolume = getBlockReports(bpid);
Map<DatanodeStorage, BlockListAsLongs> blocksPerVolume =
getBlockReports(bpid);
volumeMap.cleanUpBlockPool(bpid);
volumes.removeBlockPool(bpid, blocksPerVolume);
}
}
/**
* Class for representing the Datanode volume information
@ -2727,14 +2777,16 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
}
@Override //FsDatasetSpi
public synchronized void deleteBlockPool(String bpid, boolean force)
public void deleteBlockPool(String bpid, boolean force)
throws IOException {
try(AutoCloseableLock lock = datasetLock.acquire()) {
List<FsVolumeImpl> curVolumes = volumes.getVolumes();
if (!force) {
for (FsVolumeImpl volume : curVolumes) {
try (FsVolumeReference ref = volume.obtainReference()) {
if (!volume.isBPDirEmpty(bpid)) {
LOG.warn(bpid + " has some block files, cannot delete unless forced");
LOG.warn(bpid
+ " has some block files, cannot delete unless forced");
throw new IOException("Cannot delete block pool, "
+ "it contains some block files");
}
@ -2751,11 +2803,12 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
}
}
}
}
@Override // FsDatasetSpi
public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block)
throws IOException {
synchronized(this) {
try(AutoCloseableLock lock = datasetLock.acquire()) {
final Replica replica = volumeMap.get(block.getBlockPoolId(),
block.getBlockId());
if (replica == null) {
@ -2848,7 +2901,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
@Override
public void onCompleteLazyPersist(String bpId, long blockId,
long creationTime, File[] savedFiles, FsVolumeImpl targetVolume) {
synchronized (FsDatasetImpl.this) {
try(AutoCloseableLock lock = datasetLock.acquire()) {
ramDiskReplicaTracker.recordEndLazyPersist(bpId, blockId, savedFiles);
targetVolume.incDfsUsedAndNumBlocks(bpId, savedFiles[0].length()
@ -2982,7 +3035,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
try {
block = ramDiskReplicaTracker.dequeueNextReplicaToPersist();
if (block != null) {
synchronized (FsDatasetImpl.this) {
try(AutoCloseableLock lock = datasetLock.acquire()) {
replicaInfo = volumeMap.get(block.getBlockPoolId(), block.getBlockId());
// If replicaInfo is null, the block was either deleted before
@ -3052,7 +3105,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
long blockFileUsed, metaFileUsed;
final String bpid = replicaState.getBlockPoolId();
synchronized (FsDatasetImpl.this) {
try(AutoCloseableLock lock = datasetLock.acquire()) {
replicaInfo = getReplicaInfo(replicaState.getBlockPoolId(),
replicaState.getBlockId());
Preconditions.checkState(replicaInfo.getVolume().isTransientStorage());
@ -3162,6 +3215,11 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
}
}
@Override
public AutoCloseableLock acquireDatasetLock() {
return datasetLock.acquire();
}
public void removeDeletedBlocks(String bpid, Set<Long> blockIds) {
synchronized (deletingBlock) {
Set<Long> s = deletingBlock.get(bpid);
@ -3229,7 +3287,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
this.timer = newTimer;
}
synchronized void stopAllDataxceiverThreads(FsVolumeImpl volume) {
void stopAllDataxceiverThreads(FsVolumeImpl volume) {
try(AutoCloseableLock lock = datasetLock.acquire()) {
for (String blockPoolId : volumeMap.getBlockPoolList()) {
Collection<ReplicaInfo> replicas = volumeMap.replicas(blockPoolId);
for (ReplicaInfo replicaInfo : replicas) {
@ -3241,5 +3300,6 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
}
}
}
}
}

View File

@ -56,6 +56,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.util.CloseableReferenceCount;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.util.StringUtils;
@ -304,7 +305,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
private void decDfsUsedAndNumBlocks(String bpid, long value,
boolean blockFileDeleted) {
synchronized(dataset) {
try(AutoCloseableLock lock = dataset.acquireDatasetLock()) {
BlockPoolSlice bp = bpSlices.get(bpid);
if (bp != null) {
bp.decDfsUsed(value);
@ -316,7 +317,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
}
void incDfsUsedAndNumBlocks(String bpid, long value) {
synchronized (dataset) {
try(AutoCloseableLock lock = dataset.acquireDatasetLock()) {
BlockPoolSlice bp = bpSlices.get(bpid);
if (bp != null) {
bp.incDfsUsed(value);
@ -326,7 +327,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
}
void incDfsUsed(String bpid, long value) {
synchronized(dataset) {
try(AutoCloseableLock lock = dataset.acquireDatasetLock()) {
BlockPoolSlice bp = bpSlices.get(bpid);
if (bp != null) {
bp.incDfsUsed(value);
@ -337,7 +338,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
@VisibleForTesting
public long getDfsUsed() throws IOException {
long dfsUsed = 0;
synchronized(dataset) {
try(AutoCloseableLock lock = dataset.acquireDatasetLock()) {
for(BlockPoolSlice s : bpSlices.values()) {
dfsUsed += s.getDfsUsed();
}

View File

@ -62,6 +62,7 @@ import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.metrics2.MetricsCollector;
import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.util.DataChecksum;
/**
@ -114,6 +115,8 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
private static final DatanodeStorage.State DEFAULT_STATE =
DatanodeStorage.State.NORMAL;
private final AutoCloseableLock datasetLock;
static final byte[] nullCrcFileData;
static {
DataChecksum checksum = DataChecksum.newDataChecksum(
@ -550,6 +553,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
conf.getLong(CONFIG_PROPERTY_CAPACITY, DEFAULT_CAPACITY),
conf.getEnum(CONFIG_PROPERTY_STATE, DEFAULT_STATE));
this.volume = new SimulatedVolume(this.storage);
this.datasetLock = new AutoCloseableLock();
}
public synchronized void injectBlocks(String bpid,
@ -1365,5 +1369,10 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
public boolean isDeletingBlock(String bpid, long blockId) {
throw new UnsupportedOperationException();
}
@Override
public AutoCloseableLock acquireDatasetLock() {
return datasetLock.acquire();
}
}

View File

@ -85,6 +85,7 @@ import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.Time;
import org.apache.log4j.Level;
@ -693,7 +694,7 @@ public class TestBlockRecovery {
final RecoveringBlock recoveringBlock = new RecoveringBlock(
block.getBlock(), locations, block.getBlock()
.getGenerationStamp() + 1);
synchronized (dataNode.data) {
try(AutoCloseableLock lock = dataNode.data.acquireDatasetLock()) {
Thread.sleep(2000);
dataNode.initReplicaRecovery(recoveringBlock);
}

View File

@ -63,6 +63,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.LazyPersistTestCase;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.util.Time;
import org.junit.Before;
import org.junit.Test;
@ -113,7 +114,7 @@ public class TestDirectoryScanner {
/** Truncate a block file */
private long truncateBlockFile() throws IOException {
synchronized (fds) {
try(AutoCloseableLock lock = fds.acquireDatasetLock()) {
for (ReplicaInfo b : FsDatasetTestUtil.getReplicas(fds, bpid)) {
File f = b.getBlockFile();
File mf = b.getMetaFile();
@ -138,7 +139,7 @@ public class TestDirectoryScanner {
/** Delete a block file */
private long deleteBlockFile() {
synchronized(fds) {
try(AutoCloseableLock lock = fds.acquireDatasetLock()) {
for (ReplicaInfo b : FsDatasetTestUtil.getReplicas(fds, bpid)) {
File f = b.getBlockFile();
File mf = b.getMetaFile();
@ -154,7 +155,7 @@ public class TestDirectoryScanner {
/** Delete block meta file */
private long deleteMetaFile() {
synchronized(fds) {
try(AutoCloseableLock lock = fds.acquireDatasetLock()) {
for (ReplicaInfo b : FsDatasetTestUtil.getReplicas(fds, bpid)) {
File file = b.getMetaFile();
// Delete a metadata file
@ -173,7 +174,7 @@ public class TestDirectoryScanner {
* @throws IOException
*/
private void duplicateBlock(long blockId) throws IOException {
synchronized (fds) {
try(AutoCloseableLock lock = fds.acquireDatasetLock()) {
ReplicaInfo b = FsDatasetTestUtil.fetchReplicaInfo(fds, bpid, blockId);
try (FsDatasetSpi.FsVolumeReferences volumes =
fds.getFsVolumeReferences()) {

View File

@ -43,6 +43,7 @@ import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
import org.apache.hadoop.metrics2.MetricsCollector;
import org.apache.hadoop.util.AutoCloseableLock;
public class ExternalDatasetImpl implements FsDatasetSpi<ExternalVolumeImpl> {
@ -448,4 +449,9 @@ public class ExternalDatasetImpl implements FsDatasetSpi<ExternalVolumeImpl> {
public boolean isDeletingBlock(String bpid, long blockId) {
return false;
}
@Override
public AutoCloseableLock acquireDatasetLock() {
return null;
}
}