HDFS-11187. Optimize disk access for last partial chunk checksum of Finalized replica. Contributed by Gabor Bota, Wei-Chiu Chuang.
This commit is contained in:
parent
a278ad6b91
commit
49ed7d7fc9
|
@ -175,8 +175,13 @@ class BlockSender implements java.io.Closeable {
|
|||
* See {{@link BlockSender#isLongRead()}
|
||||
*/
|
||||
private static final long LONG_READ_THRESHOLD_BYTES = 256 * 1024;
|
||||
|
||||
|
||||
// The number of bytes per checksum here determines the alignment
|
||||
// of reads: we always start reading at a checksum chunk boundary,
|
||||
// even if the checksum type is NULL. So, choosing too big of a value
|
||||
// would risk sending too much unnecessary data. 512 (1 disk sector)
|
||||
// is likely to result in minimal extra IO.
|
||||
private static final long CHUNK_SIZE = 512;
|
||||
/**
|
||||
* Constructor
|
||||
*
|
||||
|
@ -250,12 +255,6 @@ class BlockSender implements java.io.Closeable {
|
|||
try(AutoCloseableLock lock = datanode.data.acquireDatasetLock()) {
|
||||
replica = getReplica(block, datanode);
|
||||
replicaVisibleLength = replica.getVisibleLength();
|
||||
if (replica instanceof FinalizedReplica) {
|
||||
// Load last checksum in case the replica is being written
|
||||
// concurrently
|
||||
final FinalizedReplica frep = (FinalizedReplica) replica;
|
||||
chunkChecksum = frep.getLastChecksumAndDataLen();
|
||||
}
|
||||
}
|
||||
// if there is a write in progress
|
||||
if (replica instanceof ReplicaBeingWritten) {
|
||||
|
@ -263,6 +262,10 @@ class BlockSender implements java.io.Closeable {
|
|||
waitForMinLength(rbw, startOffset + length);
|
||||
chunkChecksum = rbw.getLastChecksumAndDataLen();
|
||||
}
|
||||
if (replica instanceof FinalizedReplica) {
|
||||
chunkChecksum = getPartialChunkChecksumForFinalized(
|
||||
(FinalizedReplica)replica);
|
||||
}
|
||||
|
||||
if (replica.getGenerationStamp() < block.getGenerationStamp()) {
|
||||
throw new IOException("Replica gen stamp < block genstamp, block="
|
||||
|
@ -349,12 +352,8 @@ class BlockSender implements java.io.Closeable {
|
|||
}
|
||||
}
|
||||
if (csum == null) {
|
||||
// The number of bytes per checksum here determines the alignment
|
||||
// of reads: we always start reading at a checksum chunk boundary,
|
||||
// even if the checksum type is NULL. So, choosing too big of a value
|
||||
// would risk sending too much unnecessary data. 512 (1 disk sector)
|
||||
// is likely to result in minimal extra IO.
|
||||
csum = DataChecksum.newDataChecksum(DataChecksum.Type.NULL, 512);
|
||||
csum = DataChecksum.newDataChecksum(DataChecksum.Type.NULL,
|
||||
(int)CHUNK_SIZE);
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -428,6 +427,37 @@ class BlockSender implements java.io.Closeable {
|
|||
}
|
||||
}
|
||||
|
||||
private ChunkChecksum getPartialChunkChecksumForFinalized(
|
||||
FinalizedReplica finalized) throws IOException {
|
||||
// There are a number of places in the code base where a finalized replica
|
||||
// object is created. If last partial checksum is loaded whenever a
|
||||
// finalized replica is created, it would increase latency in DataNode
|
||||
// initialization. Therefore, the last partial chunk checksum is loaded
|
||||
// lazily.
|
||||
|
||||
// Load last checksum in case the replica is being written concurrently
|
||||
final long replicaVisibleLength = replica.getVisibleLength();
|
||||
if (replicaVisibleLength % CHUNK_SIZE != 0 &&
|
||||
finalized.getLastPartialChunkChecksum() == null) {
|
||||
// the finalized replica does not have precomputed last partial
|
||||
// chunk checksum. Recompute now.
|
||||
try {
|
||||
finalized.loadLastPartialChunkChecksum();
|
||||
return new ChunkChecksum(finalized.getVisibleLength(),
|
||||
finalized.getLastPartialChunkChecksum());
|
||||
} catch (FileNotFoundException e) {
|
||||
// meta file is lost. Continue anyway to preserve existing behavior.
|
||||
DataNode.LOG.warn(
|
||||
"meta file " + finalized.getMetaFile() + " is missing!");
|
||||
return null;
|
||||
}
|
||||
} else {
|
||||
// If the checksum is null, BlockSender will use on-disk checksum.
|
||||
return new ChunkChecksum(finalized.getVisibleLength(),
|
||||
finalized.getLastPartialChunkChecksum());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* close opened files.
|
||||
*/
|
||||
|
|
|
@ -18,7 +18,6 @@
|
|||
package org.apache.hadoop.hdfs.server.datanode;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
|
@ -29,7 +28,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
|||
* This class describes a replica that has been finalized.
|
||||
*/
|
||||
public class FinalizedReplica extends ReplicaInfo {
|
||||
|
||||
private byte[] lastPartialChunkChecksum;
|
||||
/**
|
||||
* Constructor
|
||||
* @param blockId block id
|
||||
|
@ -40,9 +39,24 @@ public class FinalizedReplica extends ReplicaInfo {
|
|||
*/
|
||||
public FinalizedReplica(long blockId, long len, long genStamp,
|
||||
FsVolumeSpi vol, File dir) {
|
||||
super(blockId, len, genStamp, vol, dir);
|
||||
this(blockId, len, genStamp, vol, dir, null);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Constructor.
|
||||
* @param blockId block id
|
||||
* @param len replica length
|
||||
* @param genStamp replica generation stamp
|
||||
* @param vol volume where replica is located
|
||||
* @param dir directory path where block and meta files are located
|
||||
* @param checksum the last partial chunk checksum
|
||||
*/
|
||||
public FinalizedReplica(long blockId, long len, long genStamp,
|
||||
FsVolumeSpi vol, File dir, byte[] checksum) {
|
||||
super(blockId, len, genStamp, vol, dir);
|
||||
this.setLastPartialChunkChecksum(checksum);
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructor
|
||||
* @param block a block
|
||||
|
@ -50,7 +64,20 @@ public class FinalizedReplica extends ReplicaInfo {
|
|||
* @param dir directory path where block and meta files are located
|
||||
*/
|
||||
public FinalizedReplica(Block block, FsVolumeSpi vol, File dir) {
|
||||
this(block, vol, dir, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructor.
|
||||
* @param block a block
|
||||
* @param vol volume where replica is located
|
||||
* @param dir directory path where block and meta files are located
|
||||
* @param checksum the last partial chunk checksum
|
||||
*/
|
||||
public FinalizedReplica(Block block, FsVolumeSpi vol, File dir,
|
||||
byte[] checksum) {
|
||||
super(block, vol, dir);
|
||||
this.setLastPartialChunkChecksum(checksum);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -59,6 +86,7 @@ public class FinalizedReplica extends ReplicaInfo {
|
|||
*/
|
||||
public FinalizedReplica(FinalizedReplica from) {
|
||||
super(from);
|
||||
this.setLastPartialChunkChecksum(from.getLastPartialChunkChecksum());
|
||||
}
|
||||
|
||||
@Override // ReplicaInfo
|
||||
|
@ -91,30 +119,18 @@ public class FinalizedReplica extends ReplicaInfo {
|
|||
return super.toString();
|
||||
}
|
||||
|
||||
/**
|
||||
* gets the last chunk checksum and the length of the block corresponding
|
||||
* to that checksum.
|
||||
* Note, need to be called with the FsDataset lock acquired. May improve to
|
||||
* lock only the FsVolume in the future.
|
||||
* @throws IOException
|
||||
*/
|
||||
public ChunkChecksum getLastChecksumAndDataLen() throws IOException {
|
||||
ChunkChecksum chunkChecksum = null;
|
||||
try {
|
||||
byte[] lastChecksum = getVolume().loadLastPartialChunkChecksum(
|
||||
getBlockFile(), getMetaFile());
|
||||
if (lastChecksum != null) {
|
||||
chunkChecksum =
|
||||
new ChunkChecksum(getVisibleLength(), lastChecksum);
|
||||
}
|
||||
} catch (FileNotFoundException e) {
|
||||
// meta file is lost. Try to continue anyway.
|
||||
DataNode.LOG.warn("meta file " + getMetaFile() +
|
||||
" is missing!");
|
||||
} catch (IOException ioe) {
|
||||
DataNode.LOG.warn("Unable to read checksum from meta file " +
|
||||
getMetaFile(), ioe);
|
||||
}
|
||||
return chunkChecksum;
|
||||
public byte[] getLastPartialChunkChecksum() {
|
||||
return lastPartialChunkChecksum;
|
||||
}
|
||||
|
||||
public void setLastPartialChunkChecksum(byte[] checksum) {
|
||||
lastPartialChunkChecksum = checksum;
|
||||
}
|
||||
|
||||
public void loadLastPartialChunkChecksum()
|
||||
throws IOException {
|
||||
byte[] lastChecksum = getVolume().loadLastPartialChunkChecksum(
|
||||
getBlockFile(), getMetaFile());
|
||||
setLastPartialChunkChecksum(lastChecksum);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1244,10 +1244,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
v, newBlkFile.getParentFile(), Thread.currentThread(), bytesReserved);
|
||||
|
||||
// load last checksum and datalen
|
||||
byte[] lastChunkChecksum = v.loadLastPartialChunkChecksum(
|
||||
replicaInfo.getBlockFile(), replicaInfo.getMetaFile());
|
||||
newReplicaInfo.setLastChecksumAndDataLen(
|
||||
replicaInfo.getNumBytes(), lastChunkChecksum);
|
||||
replicaInfo.getNumBytes(), replicaInfo.getLastPartialChunkChecksum());
|
||||
|
||||
File newmeta = newReplicaInfo.getMetaFile();
|
||||
|
||||
|
@ -1805,6 +1803,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
== ReplicaState.FINALIZED) {
|
||||
newReplicaInfo = (FinalizedReplica)
|
||||
((ReplicaUnderRecovery) replicaInfo).getOriginalReplica();
|
||||
newReplicaInfo.loadLastPartialChunkChecksum();
|
||||
} else {
|
||||
FsVolumeImpl v = (FsVolumeImpl) replicaInfo.getVolume();
|
||||
File f = replicaInfo.getBlockFile();
|
||||
|
@ -1817,6 +1816,19 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
bpid, replicaInfo, f, replicaInfo.getBytesReserved());
|
||||
newReplicaInfo =
|
||||
new FinalizedReplica(replicaInfo, v, dest.getParentFile());
|
||||
|
||||
byte[] checksum = null;
|
||||
// copy the last partial checksum if the replica is originally
|
||||
// in finalized or rbw state.
|
||||
if (replicaInfo.getState() == ReplicaState.FINALIZED) {
|
||||
FinalizedReplica finalized = (FinalizedReplica)replicaInfo;
|
||||
checksum = finalized.getLastPartialChunkChecksum();
|
||||
} else if (replicaInfo.getState() == ReplicaState.RBW) {
|
||||
ReplicaBeingWritten rbw = (ReplicaBeingWritten)replicaInfo;
|
||||
checksum = rbw.getLastChecksumAndDataLen().getChecksum();
|
||||
}
|
||||
newReplicaInfo.setLastPartialChunkChecksum(checksum);
|
||||
|
||||
if (v.isTransientStorage()) {
|
||||
releaseLockedMemory(
|
||||
replicaInfo.getOriginalBytesReserved()
|
||||
|
|
|
@ -2871,6 +2871,29 @@ public class MiniDFSCluster implements AutoCloseable {
|
|||
blk.getBlockPoolId()), blk.getBlockId()), blk.getBlockName());
|
||||
}
|
||||
|
||||
/**
|
||||
* Return all block files in given directory (recursive search).
|
||||
*/
|
||||
public static List<File> getAllBlockFiles(File storageDir) {
|
||||
List<File> results = new ArrayList<File>();
|
||||
File[] files = storageDir.listFiles();
|
||||
if (files == null) {
|
||||
return null;
|
||||
}
|
||||
for (File f : files) {
|
||||
if (f.getName().startsWith(Block.BLOCK_FILE_PREFIX) &&
|
||||
!f.getName().endsWith(Block.METADATA_EXTENSION)) {
|
||||
results.add(f);
|
||||
} else if (f.isDirectory()) {
|
||||
List<File> subdirResults = getAllBlockFiles(f);
|
||||
if (subdirResults != null) {
|
||||
results.addAll(subdirResults);
|
||||
}
|
||||
}
|
||||
}
|
||||
return results;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the latest metadata file correpsonding to a block
|
||||
* @param storageDir storage directory
|
||||
|
|
|
@ -92,7 +92,7 @@ public class TestListCorruptFileBlocks {
|
|||
File storageDir = cluster.getInstanceStorageDir(0, 1);
|
||||
File data_dir = MiniDFSCluster.getFinalizedDir(storageDir, bpid);
|
||||
assertTrue("data directory does not exist", data_dir.exists());
|
||||
List<File> metaFiles = MiniDFSCluster.getAllBlockMetadataFiles(data_dir);
|
||||
List<File> metaFiles = MiniDFSCluster.getAllBlockFiles(data_dir);
|
||||
assertTrue("Data directory does not contain any blocks or there was an "
|
||||
+ "IO error", metaFiles != null && !metaFiles.isEmpty());
|
||||
File metaFile = metaFiles.get(0);
|
||||
|
@ -172,7 +172,7 @@ public class TestListCorruptFileBlocks {
|
|||
File data_dir = MiniDFSCluster.getFinalizedDir(storageDir,
|
||||
cluster.getNamesystem().getBlockPoolId());
|
||||
assertTrue("data directory does not exist", data_dir.exists());
|
||||
List<File> metaFiles = MiniDFSCluster.getAllBlockMetadataFiles(data_dir);
|
||||
List<File> metaFiles = MiniDFSCluster.getAllBlockFiles(data_dir);
|
||||
assertTrue("Data directory does not contain any blocks or there was an "
|
||||
+ "IO error", metaFiles != null && !metaFiles.isEmpty());
|
||||
File metaFile = metaFiles.get(0);
|
||||
|
|
Loading…
Reference in New Issue