HDFS-11187. Optimize disk access for last partial chunk checksum of Finalized replica. Contributed by Wei-Chiu Chuang.
(cherry picked from commit 2021f4bdce3b27c46edaad198f0007a26a8a1391) Conflicts: hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBuilder.java
This commit is contained in:
parent
d436c403b2
commit
29a3b64ec0
@ -176,7 +176,12 @@ class BlockSender implements java.io.Closeable {
|
|||||||
*/
|
*/
|
||||||
private static final long LONG_READ_THRESHOLD_BYTES = 256 * 1024;
|
private static final long LONG_READ_THRESHOLD_BYTES = 256 * 1024;
|
||||||
|
|
||||||
|
// The number of bytes per checksum here determines the alignment
|
||||||
|
// of reads: we always start reading at a checksum chunk boundary,
|
||||||
|
// even if the checksum type is NULL. So, choosing too big of a value
|
||||||
|
// would risk sending too much unnecessary data. 512 (1 disk sector)
|
||||||
|
// is likely to result in minimal extra IO.
|
||||||
|
private static final long CHUNK_SIZE = 512;
|
||||||
/**
|
/**
|
||||||
* Constructor
|
* Constructor
|
||||||
*
|
*
|
||||||
@ -250,18 +255,16 @@ class BlockSender implements java.io.Closeable {
|
|||||||
try(AutoCloseableLock lock = datanode.data.acquireDatasetLock()) {
|
try(AutoCloseableLock lock = datanode.data.acquireDatasetLock()) {
|
||||||
replica = getReplica(block, datanode);
|
replica = getReplica(block, datanode);
|
||||||
replicaVisibleLength = replica.getVisibleLength();
|
replicaVisibleLength = replica.getVisibleLength();
|
||||||
if (replica instanceof FinalizedReplica) {
|
|
||||||
// Load last checksum in case the replica is being written
|
|
||||||
// concurrently
|
|
||||||
final FinalizedReplica frep = (FinalizedReplica) replica;
|
|
||||||
chunkChecksum = frep.getLastChecksumAndDataLen();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
if (replica.getState() == ReplicaState.RBW) {
|
if (replica.getState() == ReplicaState.RBW) {
|
||||||
final ReplicaInPipeline rbw = (ReplicaInPipeline) replica;
|
final ReplicaInPipeline rbw = (ReplicaInPipeline) replica;
|
||||||
waitForMinLength(rbw, startOffset + length);
|
waitForMinLength(rbw, startOffset + length);
|
||||||
chunkChecksum = rbw.getLastChecksumAndDataLen();
|
chunkChecksum = rbw.getLastChecksumAndDataLen();
|
||||||
}
|
}
|
||||||
|
if (replica instanceof FinalizedReplica) {
|
||||||
|
chunkChecksum = getPartialChunkChecksumForFinalized(
|
||||||
|
(FinalizedReplica)replica);
|
||||||
|
}
|
||||||
|
|
||||||
if (replica.getGenerationStamp() < block.getGenerationStamp()) {
|
if (replica.getGenerationStamp() < block.getGenerationStamp()) {
|
||||||
throw new IOException("Replica gen stamp < block genstamp, block="
|
throw new IOException("Replica gen stamp < block genstamp, block="
|
||||||
@ -348,12 +351,8 @@ class BlockSender implements java.io.Closeable {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (csum == null) {
|
if (csum == null) {
|
||||||
// The number of bytes per checksum here determines the alignment
|
csum = DataChecksum.newDataChecksum(DataChecksum.Type.NULL,
|
||||||
// of reads: we always start reading at a checksum chunk boundary,
|
(int)CHUNK_SIZE);
|
||||||
// even if the checksum type is NULL. So, choosing too big of a value
|
|
||||||
// would risk sending too much unnecessary data. 512 (1 disk sector)
|
|
||||||
// is likely to result in minimal extra IO.
|
|
||||||
csum = DataChecksum.newDataChecksum(DataChecksum.Type.NULL, 512);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@ -427,6 +426,37 @@ class BlockSender implements java.io.Closeable {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private ChunkChecksum getPartialChunkChecksumForFinalized(
|
||||||
|
FinalizedReplica finalized) throws IOException {
|
||||||
|
// There are a number of places in the code base where a finalized replica
|
||||||
|
// object is created. If last partial checksum is loaded whenever a
|
||||||
|
// finalized replica is created, it would increase latency in DataNode
|
||||||
|
// initialization. Therefore, the last partial chunk checksum is loaded
|
||||||
|
// lazily.
|
||||||
|
|
||||||
|
// Load last checksum in case the replica is being written concurrently
|
||||||
|
final long replicaVisibleLength = replica.getVisibleLength();
|
||||||
|
if (replicaVisibleLength % CHUNK_SIZE != 0 &&
|
||||||
|
finalized.getLastPartialChunkChecksum() == null) {
|
||||||
|
// the finalized replica does not have precomputed last partial
|
||||||
|
// chunk checksum. Recompute now.
|
||||||
|
try {
|
||||||
|
finalized.loadLastPartialChunkChecksum();
|
||||||
|
return new ChunkChecksum(finalized.getVisibleLength(),
|
||||||
|
finalized.getLastPartialChunkChecksum());
|
||||||
|
} catch (FileNotFoundException e) {
|
||||||
|
// meta file is lost. Continue anyway to preserve existing behavior.
|
||||||
|
DataNode.LOG.warn(
|
||||||
|
"meta file " + finalized.getMetaFile() + " is missing!");
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// If the checksum is null, BlockSender will use on-disk checksum.
|
||||||
|
return new ChunkChecksum(finalized.getVisibleLength(),
|
||||||
|
finalized.getLastPartialChunkChecksum());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* close opened files.
|
* close opened files.
|
||||||
*/
|
*/
|
||||||
|
@ -18,7 +18,6 @@
|
|||||||
package org.apache.hadoop.hdfs.server.datanode;
|
package org.apache.hadoop.hdfs.server.datanode;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.FileNotFoundException;
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
@ -30,9 +29,9 @@
|
|||||||
* This class describes a replica that has been finalized.
|
* This class describes a replica that has been finalized.
|
||||||
*/
|
*/
|
||||||
public class FinalizedReplica extends LocalReplica {
|
public class FinalizedReplica extends LocalReplica {
|
||||||
|
private byte[] lastPartialChunkChecksum;
|
||||||
/**
|
/**
|
||||||
* Constructor
|
* Constructor.
|
||||||
* @param blockId block id
|
* @param blockId block id
|
||||||
* @param len replica length
|
* @param len replica length
|
||||||
* @param genStamp replica generation stamp
|
* @param genStamp replica generation stamp
|
||||||
@ -41,7 +40,22 @@ public class FinalizedReplica extends LocalReplica {
|
|||||||
*/
|
*/
|
||||||
public FinalizedReplica(long blockId, long len, long genStamp,
|
public FinalizedReplica(long blockId, long len, long genStamp,
|
||||||
FsVolumeSpi vol, File dir) {
|
FsVolumeSpi vol, File dir) {
|
||||||
|
this(blockId, len, genStamp, vol, dir, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructor.
|
||||||
|
* @param blockId block id
|
||||||
|
* @param len replica length
|
||||||
|
* @param genStamp replica generation stamp
|
||||||
|
* @param vol volume where replica is located
|
||||||
|
* @param dir directory path where block and meta files are located
|
||||||
|
* @param checksum the last partial chunk checksum
|
||||||
|
*/
|
||||||
|
public FinalizedReplica(long blockId, long len, long genStamp,
|
||||||
|
FsVolumeSpi vol, File dir, byte[] checksum) {
|
||||||
super(blockId, len, genStamp, vol, dir);
|
super(blockId, len, genStamp, vol, dir);
|
||||||
|
this.setLastPartialChunkChecksum(checksum);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -51,7 +65,20 @@ public FinalizedReplica(long blockId, long len, long genStamp,
|
|||||||
* @param dir directory path where block and meta files are located
|
* @param dir directory path where block and meta files are located
|
||||||
*/
|
*/
|
||||||
public FinalizedReplica(Block block, FsVolumeSpi vol, File dir) {
|
public FinalizedReplica(Block block, FsVolumeSpi vol, File dir) {
|
||||||
|
this(block, vol, dir, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructor
|
||||||
|
* @param block a block
|
||||||
|
* @param vol volume where replica is located
|
||||||
|
* @param dir directory path where block and meta files are located
|
||||||
|
* @param checksum the last partial chunk checksum
|
||||||
|
*/
|
||||||
|
public FinalizedReplica(Block block, FsVolumeSpi vol, File dir,
|
||||||
|
byte[] checksum) {
|
||||||
super(block, vol, dir);
|
super(block, vol, dir);
|
||||||
|
this.setLastPartialChunkChecksum(checksum);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -60,6 +87,7 @@ public FinalizedReplica(Block block, FsVolumeSpi vol, File dir) {
|
|||||||
*/
|
*/
|
||||||
public FinalizedReplica(FinalizedReplica from) {
|
public FinalizedReplica(FinalizedReplica from) {
|
||||||
super(from);
|
super(from);
|
||||||
|
this.setLastPartialChunkChecksum(from.getLastPartialChunkChecksum());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override // ReplicaInfo
|
@Override // ReplicaInfo
|
||||||
@ -116,30 +144,18 @@ public ReplicaRecoveryInfo createInfo() {
|
|||||||
" does not support createInfo");
|
" does not support createInfo");
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
public byte[] getLastPartialChunkChecksum() {
|
||||||
* gets the last chunk checksum and the length of the block corresponding
|
return lastPartialChunkChecksum;
|
||||||
* to that checksum.
|
}
|
||||||
* Note, need to be called with the FsDataset lock acquired. May improve to
|
|
||||||
* lock only the FsVolume in the future.
|
public void setLastPartialChunkChecksum(byte[] checksum) {
|
||||||
* @throws IOException
|
lastPartialChunkChecksum = checksum;
|
||||||
*/
|
}
|
||||||
public ChunkChecksum getLastChecksumAndDataLen() throws IOException {
|
|
||||||
ChunkChecksum chunkChecksum = null;
|
public void loadLastPartialChunkChecksum()
|
||||||
try {
|
throws IOException {
|
||||||
byte[] lastChecksum = getVolume().loadLastPartialChunkChecksum(
|
byte[] lastChecksum = getVolume().loadLastPartialChunkChecksum(
|
||||||
getBlockFile(), getMetaFile());
|
getBlockFile(), getMetaFile());
|
||||||
if (lastChecksum != null) {
|
setLastPartialChunkChecksum(lastChecksum);
|
||||||
chunkChecksum =
|
|
||||||
new ChunkChecksum(getVisibleLength(), lastChecksum);
|
|
||||||
}
|
|
||||||
} catch (FileNotFoundException e) {
|
|
||||||
// meta file is lost. Try to continue anyway.
|
|
||||||
DataNode.LOG.warn("meta file " + getMetaFile() +
|
|
||||||
" is missing!");
|
|
||||||
} catch (IOException ioe) {
|
|
||||||
DataNode.LOG.warn("Unable to read checksum from meta file " +
|
|
||||||
getMetaFile(), ioe);
|
|
||||||
}
|
|
||||||
return chunkChecksum;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -39,6 +39,7 @@ public class ReplicaBuilder {
|
|||||||
private Thread writer;
|
private Thread writer;
|
||||||
private long recoveryId;
|
private long recoveryId;
|
||||||
private Block block;
|
private Block block;
|
||||||
|
private byte[] lastPartialChunkChecksum;
|
||||||
|
|
||||||
private ReplicaInfo fromReplica;
|
private ReplicaInfo fromReplica;
|
||||||
|
|
||||||
@ -105,6 +106,11 @@ public ReplicaBuilder setBlock(Block block) {
|
|||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public ReplicaBuilder setLastPartialChunkChecksum(byte[] checksum) {
|
||||||
|
this.lastPartialChunkChecksum = checksum;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
public LocalReplicaInPipeline buildLocalReplicaInPipeline()
|
public LocalReplicaInPipeline buildLocalReplicaInPipeline()
|
||||||
throws IllegalArgumentException {
|
throws IllegalArgumentException {
|
||||||
LocalReplicaInPipeline info = null;
|
LocalReplicaInPipeline info = null;
|
||||||
@ -185,10 +191,11 @@ private ReplicaInfo buildFinalizedReplica() throws IllegalArgumentException {
|
|||||||
+ "state: " + fromReplica.getState());
|
+ "state: " + fromReplica.getState());
|
||||||
} else {
|
} else {
|
||||||
if (null != block) {
|
if (null != block) {
|
||||||
return new FinalizedReplica(block, volume, directoryUsed);
|
return new FinalizedReplica(block, volume, directoryUsed,
|
||||||
|
lastPartialChunkChecksum);
|
||||||
} else {
|
} else {
|
||||||
return new FinalizedReplica(blockId, length, genStamp, volume,
|
return new FinalizedReplica(blockId, length, genStamp, volume,
|
||||||
directoryUsed);
|
directoryUsed, lastPartialChunkChecksum);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1653,6 +1653,7 @@ private ReplicaInfo finalizeReplica(String bpid, ReplicaInfo replicaInfo)
|
|||||||
replicaInfo.getOriginalReplica().getState()
|
replicaInfo.getOriginalReplica().getState()
|
||||||
== ReplicaState.FINALIZED) {
|
== ReplicaState.FINALIZED) {
|
||||||
newReplicaInfo = replicaInfo.getOriginalReplica();
|
newReplicaInfo = replicaInfo.getOriginalReplica();
|
||||||
|
((FinalizedReplica)newReplicaInfo).loadLastPartialChunkChecksum();
|
||||||
} else {
|
} else {
|
||||||
FsVolumeImpl v = (FsVolumeImpl)replicaInfo.getVolume();
|
FsVolumeImpl v = (FsVolumeImpl)replicaInfo.getVolume();
|
||||||
if (v == null) {
|
if (v == null) {
|
||||||
|
@ -51,6 +51,8 @@
|
|||||||
import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult;
|
import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.DataNodeVolumeMetrics;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.DataNodeVolumeMetrics;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.DFSUtilClient;
|
import org.apache.hadoop.hdfs.DFSUtilClient;
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
@ -943,10 +945,22 @@ ReplicaInfo addFinalizedBlock(String bpid, Block b, ReplicaInfo replicaInfo,
|
|||||||
long bytesReserved) throws IOException {
|
long bytesReserved) throws IOException {
|
||||||
releaseReservedSpace(bytesReserved);
|
releaseReservedSpace(bytesReserved);
|
||||||
File dest = getBlockPoolSlice(bpid).addFinalizedBlock(b, replicaInfo);
|
File dest = getBlockPoolSlice(bpid).addFinalizedBlock(b, replicaInfo);
|
||||||
|
byte[] checksum = null;
|
||||||
|
// copy the last partial checksum if the replica is originally
|
||||||
|
// in finalized or rbw state.
|
||||||
|
if (replicaInfo.getState() == ReplicaState.FINALIZED) {
|
||||||
|
FinalizedReplica finalized = (FinalizedReplica)replicaInfo;
|
||||||
|
checksum = finalized.getLastPartialChunkChecksum();
|
||||||
|
} else if (replicaInfo.getState() == ReplicaState.RBW) {
|
||||||
|
ReplicaBeingWritten rbw = (ReplicaBeingWritten)replicaInfo;
|
||||||
|
checksum = rbw.getLastChecksumAndDataLen().getChecksum();
|
||||||
|
}
|
||||||
|
|
||||||
return new ReplicaBuilder(ReplicaState.FINALIZED)
|
return new ReplicaBuilder(ReplicaState.FINALIZED)
|
||||||
.setBlock(replicaInfo)
|
.setBlock(replicaInfo)
|
||||||
.setFsVolume(this)
|
.setFsVolume(this)
|
||||||
.setDirectoryToUse(dest.getParentFile())
|
.setDirectoryToUse(dest.getParentFile())
|
||||||
|
.setLastPartialChunkChecksum(checksum)
|
||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1176,12 +1190,11 @@ public ReplicaInPipeline append(String bpid, ReplicaInfo replicaInfo,
|
|||||||
.setBytesToReserve(bytesReserved)
|
.setBytesToReserve(bytesReserved)
|
||||||
.buildLocalReplicaInPipeline();
|
.buildLocalReplicaInPipeline();
|
||||||
|
|
||||||
|
// Only a finalized replica can be appended.
|
||||||
|
FinalizedReplica finalized = (FinalizedReplica)replicaInfo;
|
||||||
// load last checksum and datalen
|
// load last checksum and datalen
|
||||||
LocalReplica localReplica = (LocalReplica)replicaInfo;
|
|
||||||
byte[] lastChunkChecksum = loadLastPartialChunkChecksum(
|
|
||||||
localReplica.getBlockFile(), localReplica.getMetaFile());
|
|
||||||
newReplicaInfo.setLastChecksumAndDataLen(
|
newReplicaInfo.setLastChecksumAndDataLen(
|
||||||
replicaInfo.getNumBytes(), lastChunkChecksum);
|
finalized.getVisibleLength(), finalized.getLastPartialChunkChecksum());
|
||||||
|
|
||||||
// rename meta file to rbw directory
|
// rename meta file to rbw directory
|
||||||
// rename block file to rbw directory
|
// rename block file to rbw directory
|
||||||
|
@ -2958,6 +2958,29 @@ public static File getBlockFile(File storageDir, ExtendedBlock blk) {
|
|||||||
blk.getBlockPoolId()), blk.getBlockId()), blk.getBlockName());
|
blk.getBlockPoolId()), blk.getBlockId()), blk.getBlockName());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return all block files in given directory (recursive search).
|
||||||
|
*/
|
||||||
|
public static List<File> getAllBlockFiles(File storageDir) {
|
||||||
|
List<File> results = new ArrayList<File>();
|
||||||
|
File[] files = storageDir.listFiles();
|
||||||
|
if (files == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
for (File f : files) {
|
||||||
|
if (f.getName().startsWith(Block.BLOCK_FILE_PREFIX) &&
|
||||||
|
!f.getName().endsWith(Block.METADATA_EXTENSION)) {
|
||||||
|
results.add(f);
|
||||||
|
} else if (f.isDirectory()) {
|
||||||
|
List<File> subdirResults = getAllBlockFiles(f);
|
||||||
|
if (subdirResults != null) {
|
||||||
|
results.addAll(subdirResults);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return results;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the latest metadata file correpsonding to a block
|
* Get the latest metadata file correpsonding to a block
|
||||||
* @param storageDir storage directory
|
* @param storageDir storage directory
|
||||||
|
@ -92,7 +92,7 @@ public void testListCorruptFilesCorruptedBlock() throws Exception {
|
|||||||
File storageDir = cluster.getInstanceStorageDir(0, 1);
|
File storageDir = cluster.getInstanceStorageDir(0, 1);
|
||||||
File data_dir = MiniDFSCluster.getFinalizedDir(storageDir, bpid);
|
File data_dir = MiniDFSCluster.getFinalizedDir(storageDir, bpid);
|
||||||
assertTrue("data directory does not exist", data_dir.exists());
|
assertTrue("data directory does not exist", data_dir.exists());
|
||||||
List<File> metaFiles = MiniDFSCluster.getAllBlockMetadataFiles(data_dir);
|
List<File> metaFiles = MiniDFSCluster.getAllBlockFiles(data_dir);
|
||||||
assertTrue("Data directory does not contain any blocks or there was an "
|
assertTrue("Data directory does not contain any blocks or there was an "
|
||||||
+ "IO error", metaFiles != null && !metaFiles.isEmpty());
|
+ "IO error", metaFiles != null && !metaFiles.isEmpty());
|
||||||
File metaFile = metaFiles.get(0);
|
File metaFile = metaFiles.get(0);
|
||||||
@ -172,7 +172,7 @@ public void testListCorruptFileBlocksInSafeMode() throws Exception {
|
|||||||
File data_dir = MiniDFSCluster.getFinalizedDir(storageDir,
|
File data_dir = MiniDFSCluster.getFinalizedDir(storageDir,
|
||||||
cluster.getNamesystem().getBlockPoolId());
|
cluster.getNamesystem().getBlockPoolId());
|
||||||
assertTrue("data directory does not exist", data_dir.exists());
|
assertTrue("data directory does not exist", data_dir.exists());
|
||||||
List<File> metaFiles = MiniDFSCluster.getAllBlockMetadataFiles(data_dir);
|
List<File> metaFiles = MiniDFSCluster.getAllBlockFiles(data_dir);
|
||||||
assertTrue("Data directory does not contain any blocks or there was an "
|
assertTrue("Data directory does not contain any blocks or there was an "
|
||||||
+ "IO error", metaFiles != null && !metaFiles.isEmpty());
|
+ "IO error", metaFiles != null && !metaFiles.isEmpty());
|
||||||
File metaFile = metaFiles.get(0);
|
File metaFile = metaFiles.get(0);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user