HDFS-7100. Make eviction scheme pluggable. (Arpit Agarwal)

Conflicts:
	hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt
	hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
This commit is contained in:
arp 2014-09-20 13:25:23 -07:00 committed by Jitendra Pandey
parent f6903ca945
commit bf1b84abe1
11 changed files with 557 additions and 347 deletions

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.hdfs;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault; import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaLruTracker;
import org.apache.hadoop.hdfs.web.AuthFilter; import org.apache.hadoop.hdfs.web.AuthFilter;
import org.apache.hadoop.http.HttpConfig; import org.apache.hadoop.http.HttpConfig;
@ -129,6 +130,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final int DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_DEFAULT = 4; public static final int DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_DEFAULT = 4;
public static final String DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC = "dfs.datanode.lazywriter.interval.sec"; public static final String DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC = "dfs.datanode.lazywriter.interval.sec";
public static final int DFS_DATANODE_LAZY_WRITER_INTERVAL_DEFAULT_SEC = 60; public static final int DFS_DATANODE_LAZY_WRITER_INTERVAL_DEFAULT_SEC = 60;
public static final String DFS_DATANODE_RAM_DISK_REPLICA_TRACKER_KEY = "dfs.datanode.ram.disk.replica.tracker";
public static final Class<RamDiskReplicaLruTracker> DFS_DATANODE_RAM_DISK_REPLICA_TRACKER_DEFAULT = RamDiskReplicaLruTracker.class;
public static final String DFS_DATANODE_RAM_DISK_LOW_WATERMARK_PERCENT = "dfs.datanode.ram.disk.low.watermark.percent"; public static final String DFS_DATANODE_RAM_DISK_LOW_WATERMARK_PERCENT = "dfs.datanode.ram.disk.low.watermark.percent";
public static final int DFS_DATANODE_RAM_DISK_LOW_WATERMARK_PERCENT_DEFAULT = 10; public static final int DFS_DATANODE_RAM_DISK_LOW_WATERMARK_PERCENT_DEFAULT = 10;
public static final String DFS_DATANODE_RAM_DISK_LOW_WATERMARK_REPLICAS = "dfs.datanode.ram.disk.low.watermark.replicas"; public static final String DFS_DATANODE_RAM_DISK_LOW_WATERMARK_REPLICAS = "dfs.datanode.ram.disk.low.watermark.replicas";

View File

@ -2088,7 +2088,8 @@ public class DataNode extends ReconfigurableBase
LOG.warn("Cannot find BPOfferService for reporting block received for bpid=" LOG.warn("Cannot find BPOfferService for reporting block received for bpid="
+ block.getBlockPoolId()); + block.getBlockPoolId());
} }
if (blockScanner != null) { FsVolumeSpi volume = getFSDataset().getVolume(block);
if (blockScanner != null && !volume.isTransientStorage()) {
blockScanner.addBlock(block); blockScanner.addBlock(block);
} }
} }

View File

@ -292,9 +292,9 @@ class BlockPoolSlice {
* Move a persisted replica from lazypersist directory to a subdirectory * Move a persisted replica from lazypersist directory to a subdirectory
* under finalized. * under finalized.
*/ */
File activateSavedReplica(Block b, File blockFile) throws IOException { File activateSavedReplica(Block b, File metaFile, File blockFile)
throws IOException {
final File blockDir = DatanodeUtil.idToBlockDir(finalizedDir, b.getBlockId()); final File blockDir = DatanodeUtil.idToBlockDir(finalizedDir, b.getBlockId());
final File metaFile = FsDatasetUtil.getMetaFile(blockFile, b.getGenerationStamp());
final File targetBlockFile = new File(blockDir, blockFile.getName()); final File targetBlockFile = new File(blockDir, blockFile.getName());
final File targetMetaFile = new File(blockDir, metaFile.getName()); final File targetMetaFile = new File(blockDir, metaFile.getName());
FileUtils.moveFile(blockFile, targetBlockFile); FileUtils.moveFile(blockFile, targetBlockFile);
@ -313,7 +313,7 @@ class BlockPoolSlice {
void getVolumeMap(ReplicaMap volumeMap, void getVolumeMap(ReplicaMap volumeMap,
final LazyWriteReplicaTracker lazyWriteReplicaMap) final RamDiskReplicaTracker lazyWriteReplicaMap)
throws IOException { throws IOException {
// Recover lazy persist replicas, they will be added to the volumeMap // Recover lazy persist replicas, they will be added to the volumeMap
// when we scan the finalized directory. // when we scan the finalized directory.
@ -410,7 +410,7 @@ class BlockPoolSlice {
* false if the directory has rbw replicas * false if the directory has rbw replicas
*/ */
void addToReplicasMap(ReplicaMap volumeMap, File dir, void addToReplicasMap(ReplicaMap volumeMap, File dir,
final LazyWriteReplicaTracker lazyWriteReplicaMap, final RamDiskReplicaTracker lazyWriteReplicaMap,
boolean isFinalized) boolean isFinalized)
throws IOException { throws IOException {
File files[] = FileUtil.listFiles(dir); File files[] = FileUtil.listFiles(dir);
@ -487,7 +487,8 @@ class BlockPoolSlice {
// it is in the lazyWriteReplicaMap so it can be persisted // it is in the lazyWriteReplicaMap so it can be persisted
// eventually. // eventually.
if (newReplica.getVolume().isTransientStorage()) { if (newReplica.getVolume().isTransientStorage()) {
lazyWriteReplicaMap.addReplica(bpid, blockId, newReplica.getVolume()); lazyWriteReplicaMap.addReplica(bpid, blockId,
(FsVolumeImpl) newReplica.getVolume());
} else { } else {
lazyWriteReplicaMap.discardReplica(bpid, blockId, false); lazyWriteReplicaMap.discardReplica(bpid, blockId, false);
} }

View File

@ -30,7 +30,6 @@ import java.nio.channels.FileChannel;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
@ -88,6 +87,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.RollingLogs; import org.apache.hadoop.hdfs.server.datanode.fsdataset.RollingLogs;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy; import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy; import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy;
import static org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaTracker.RamDiskReplica;
import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean; import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
@ -159,7 +159,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
@Override // FsDatasetSpi @Override // FsDatasetSpi
public synchronized Block getStoredBlock(String bpid, long blkid) public synchronized Block getStoredBlock(String bpid, long blkid)
throws IOException { throws IOException {
File blockfile = getFile(bpid, blkid); File blockfile = getFile(bpid, blkid, false);
if (blockfile == null) { if (blockfile == null) {
return null; return null;
} }
@ -219,7 +219,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
private volatile boolean fsRunning; private volatile boolean fsRunning;
final ReplicaMap volumeMap; final ReplicaMap volumeMap;
final LazyWriteReplicaTracker lazyWriteReplicaTracker; final RamDiskReplicaTracker ramDiskReplicaTracker;
private static final int MAX_BLOCK_EVICTIONS_PER_ITERATION = 3; private static final int MAX_BLOCK_EVICTIONS_PER_ITERATION = 3;
@ -263,7 +263,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
storageMap = new ConcurrentHashMap<String, DatanodeStorage>(); storageMap = new ConcurrentHashMap<String, DatanodeStorage>();
volumeMap = new ReplicaMap(this); volumeMap = new ReplicaMap(this);
lazyWriteReplicaTracker = new LazyWriteReplicaTracker(this); ramDiskReplicaTracker = RamDiskReplicaTracker.getInstance(conf, this);
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
final VolumeChoosingPolicy<FsVolumeImpl> blockChooserImpl = final VolumeChoosingPolicy<FsVolumeImpl> blockChooserImpl =
@ -298,7 +298,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
FsVolumeImpl fsVolume = FsVolumeImplAllocator.createVolume( FsVolumeImpl fsVolume = FsVolumeImplAllocator.createVolume(
this, sd.getStorageUuid(), dir, this.conf, storageType); this, sd.getStorageUuid(), dir, this.conf, storageType);
ReplicaMap tempVolumeMap = new ReplicaMap(this); ReplicaMap tempVolumeMap = new ReplicaMap(this);
fsVolume.getVolumeMap(volumeMap, lazyWriteReplicaTracker); fsVolume.getVolumeMap(tempVolumeMap, ramDiskReplicaTracker);
volumeMap.addAll(tempVolumeMap); volumeMap.addAll(tempVolumeMap);
volumes.addVolume(fsVolume); volumes.addVolume(fsVolume);
@ -326,7 +326,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
for (final String bpid : bpids) { for (final String bpid : bpids) {
try { try {
fsVolume.addBlockPool(bpid, this.conf); fsVolume.addBlockPool(bpid, this.conf);
fsVolume.getVolumeMap(bpid, tempVolumeMap); fsVolume.getVolumeMap(bpid, tempVolumeMap, ramDiskReplicaTracker);
} catch (IOException e) { } catch (IOException e) {
LOG.warn("Caught exception when adding " + fsVolume + LOG.warn("Caught exception when adding " + fsVolume +
". Will throw later.", e); ". Will throw later.", e);
@ -586,12 +586,16 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
* checking that it exists. This should be used when the * checking that it exists. This should be used when the
* next operation is going to open the file for read anyway, * next operation is going to open the file for read anyway,
* and thus the exists check is redundant. * and thus the exists check is redundant.
*
* @param touch if true then update the last access timestamp of the
* block. Currently used for blocks on transient storage.
*/ */
private File getBlockFileNoExistsCheck(ExtendedBlock b) private File getBlockFileNoExistsCheck(ExtendedBlock b,
boolean touch)
throws IOException { throws IOException {
final File f; final File f;
synchronized(this) { synchronized(this) {
f = getFile(b.getBlockPoolId(), b.getLocalBlock().getBlockId()); f = getFile(b.getBlockPoolId(), b.getLocalBlock().getBlockId(), touch);
} }
if (f == null) { if (f == null) {
throw new IOException("Block " + b + " is not valid"); throw new IOException("Block " + b + " is not valid");
@ -602,7 +606,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
@Override // FsDatasetSpi @Override // FsDatasetSpi
public InputStream getBlockInputStream(ExtendedBlock b, public InputStream getBlockInputStream(ExtendedBlock b,
long seekOffset) throws IOException { long seekOffset) throws IOException {
File blockFile = getBlockFileNoExistsCheck(b); File blockFile = getBlockFileNoExistsCheck(b, true);
if (isNativeIOAvailable) { if (isNativeIOAvailable) {
return NativeIO.getShareDeleteFileInputStream(blockFile, seekOffset); return NativeIO.getShareDeleteFileInputStream(blockFile, seekOffset);
} else { } else {
@ -1240,7 +1244,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
newReplicaInfo = new FinalizedReplica(replicaInfo, v, dest.getParentFile()); newReplicaInfo = new FinalizedReplica(replicaInfo, v, dest.getParentFile());
if (v.isTransientStorage()) { if (v.isTransientStorage()) {
lazyWriteReplicaTracker.addReplica(bpid, replicaInfo.getBlockId(), v); ramDiskReplicaTracker.addReplica(bpid, replicaInfo.getBlockId(), v);
} }
} }
volumeMap.add(bpid, newReplicaInfo); volumeMap.add(bpid, newReplicaInfo);
@ -1265,7 +1269,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
LOG.warn("Block " + b + " unfinalized and removed. " ); LOG.warn("Block " + b + " unfinalized and removed. " );
} }
if (replicaInfo.getVolume().isTransientStorage()) { if (replicaInfo.getVolume().isTransientStorage()) {
lazyWriteReplicaTracker.discardReplica(b.getBlockPoolId(), b.getBlockId(), true); ramDiskReplicaTracker.discardReplica(b.getBlockPoolId(), b.getBlockId(), true);
} }
} }
} }
@ -1411,7 +1415,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
//Should we check for metadata file too? //Should we check for metadata file too?
final File f; final File f;
synchronized(this) { synchronized(this) {
f = getFile(bpid, blockId); f = getFile(bpid, blockId, false);
} }
if(f != null ) { if(f != null ) {
@ -1496,7 +1500,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
} }
if (v.isTransientStorage()) { if (v.isTransientStorage()) {
lazyWriteReplicaTracker.discardReplica(bpid, invalidBlks[i].getBlockId(), true); ramDiskReplicaTracker.discardReplica(bpid, invalidBlks[i].getBlockId(), true);
} }
// If a DFSClient has the replica in its cache of short-circuit file // If a DFSClient has the replica in its cache of short-circuit file
@ -1628,7 +1632,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
@Override // FsDatasetSpi @Override // FsDatasetSpi
public synchronized boolean contains(final ExtendedBlock block) { public synchronized boolean contains(final ExtendedBlock block) {
final long blockId = block.getLocalBlock().getBlockId(); final long blockId = block.getLocalBlock().getBlockId();
return getFile(block.getBlockPoolId(), blockId) != null; return getFile(block.getBlockPoolId(), blockId, false) != null;
} }
/** /**
@ -1637,9 +1641,12 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
* @param blockId a block's id * @param blockId a block's id
* @return on disk data file path; null if the replica does not exist * @return on disk data file path; null if the replica does not exist
*/ */
File getFile(final String bpid, final long blockId) { File getFile(final String bpid, final long blockId, boolean touch) {
ReplicaInfo info = volumeMap.get(bpid, blockId); ReplicaInfo info = volumeMap.get(bpid, blockId);
if (info != null) { if (info != null) {
if (touch && info.getVolume().isTransientStorage()) {
ramDiskReplicaTracker.touch(bpid, blockId);
}
return info.getBlockFile(); return info.getBlockFile();
} }
return null; return null;
@ -1808,7 +1815,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
blockScanner.deleteBlock(bpid, new Block(blockId)); blockScanner.deleteBlock(bpid, new Block(blockId));
} }
if (vol.isTransientStorage()) { if (vol.isTransientStorage()) {
lazyWriteReplicaTracker.discardReplica(bpid, blockId, true); ramDiskReplicaTracker.discardReplica(bpid, blockId, true);
} }
LOG.warn("Removed block " + blockId LOG.warn("Removed block " + blockId
+ " from memory with missing block file on the disk"); + " from memory with missing block file on the disk");
@ -1830,11 +1837,12 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
diskFile.length(), diskGS, vol, diskFile.getParentFile()); diskFile.length(), diskGS, vol, diskFile.getParentFile());
volumeMap.add(bpid, diskBlockInfo); volumeMap.add(bpid, diskBlockInfo);
final DataBlockScanner blockScanner = datanode.getBlockScanner(); final DataBlockScanner blockScanner = datanode.getBlockScanner();
if (!vol.isTransientStorage()) {
if (blockScanner != null) { if (blockScanner != null) {
blockScanner.addBlock(new ExtendedBlock(bpid, diskBlockInfo)); blockScanner.addBlock(new ExtendedBlock(bpid, diskBlockInfo));
} }
if (vol.isTransientStorage()) { } else {
lazyWriteReplicaTracker.addReplica(bpid, blockId, (FsVolumeImpl) vol); ramDiskReplicaTracker.addReplica(bpid, blockId, (FsVolumeImpl) vol);
} }
LOG.warn("Added missing block to memory " + diskBlockInfo); LOG.warn("Added missing block to memory " + diskBlockInfo);
return; return;
@ -2117,7 +2125,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
volumes.addBlockPool(bpid, conf); volumes.addBlockPool(bpid, conf);
volumeMap.initBlockPool(bpid); volumeMap.initBlockPool(bpid);
} }
volumes.getAllVolumesMap(bpid, volumeMap, lazyWriteReplicaTracker); volumes.getAllVolumesMap(bpid, volumeMap, ramDiskReplicaTracker);
} }
@Override @Override
@ -2347,7 +2355,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
LOG.debug("LazyWriter starting to save blockId=" + blockId + "; bpid=" + bpid); LOG.debug("LazyWriter starting to save blockId=" + blockId + "; bpid=" + bpid);
} }
lazyWriteReplicaTracker.recordStartLazyPersist(bpid, blockId, targetVolume); ramDiskReplicaTracker.recordStartLazyPersist(bpid, blockId, targetVolume);
bpSlice = targetVolume.getBlockPoolSlice(bpid); bpSlice = targetVolume.getBlockPoolSlice(bpid);
srcMeta = replicaInfo.getMetaFile(); srcMeta = replicaInfo.getMetaFile();
srcFile = replicaInfo.getBlockFile(); srcFile = replicaInfo.getBlockFile();
@ -2359,7 +2367,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
bpSlice.lazyPersistReplica(blockId, genStamp, srcMeta, srcFile); bpSlice.lazyPersistReplica(blockId, genStamp, srcMeta, srcFile);
synchronized (FsDatasetImpl.this) { synchronized (FsDatasetImpl.this) {
lazyWriteReplicaTracker.recordEndLazyPersist(bpid, blockId, savedFiles); ramDiskReplicaTracker.recordEndLazyPersist(bpid, blockId, savedFiles);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("LazyWriter finished saving blockId=" + blockId + "; bpid=" + bpid + LOG.debug("LazyWriter finished saving blockId=" + blockId + "; bpid=" + bpid +
@ -2374,21 +2382,21 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
* @return true if there is more work to be done, false otherwise. * @return true if there is more work to be done, false otherwise.
*/ */
private boolean saveNextReplica() { private boolean saveNextReplica() {
LazyWriteReplicaTracker.ReplicaState replicaState = null; RamDiskReplica block = null;
boolean succeeded = false; boolean succeeded = false;
try { try {
replicaState = lazyWriteReplicaTracker.dequeueNextReplicaToPersist(); block = ramDiskReplicaTracker.dequeueNextReplicaToPersist();
if (replicaState != null) { if (block != null) {
moveReplicaToNewVolume(replicaState.bpid, replicaState.blockId); moveReplicaToNewVolume(block.getBlockPoolId(), block.getBlockId());
} }
succeeded = true; succeeded = true;
} catch(IOException ioe) { } catch(IOException ioe) {
LOG.warn("Exception saving replica " + replicaState, ioe); LOG.warn("Exception saving replica " + block, ioe);
} finally { } finally {
if (!succeeded && replicaState != null) { if (!succeeded && block != null) {
LOG.warn("Failed to save replica " + replicaState + ". re-enqueueing it."); LOG.warn("Failed to save replica " + block + ". re-enqueueing it.");
lazyWriteReplicaTracker.reenqueueReplicaNotPersisted(replicaState); ramDiskReplicaTracker.reenqueueReplicaNotPersisted(block);
} }
} }
@ -2426,8 +2434,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
while (iterations++ < MAX_BLOCK_EVICTIONS_PER_ITERATION && while (iterations++ < MAX_BLOCK_EVICTIONS_PER_ITERATION &&
transientFreeSpaceBelowThreshold()) { transientFreeSpaceBelowThreshold()) {
LazyWriteReplicaTracker.ReplicaState replicaState = RamDiskReplica replicaState = ramDiskReplicaTracker.getNextCandidateForEviction();
lazyWriteReplicaTracker.getNextCandidateForEviction();
if (replicaState == null) { if (replicaState == null) {
break; break;
@ -2440,46 +2447,48 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
ReplicaInfo replicaInfo, newReplicaInfo; ReplicaInfo replicaInfo, newReplicaInfo;
File blockFile, metaFile; File blockFile, metaFile;
long blockFileUsed, metaFileUsed; long blockFileUsed, metaFileUsed;
final String bpid = replicaState.getBlockPoolId();
synchronized (FsDatasetImpl.this) { synchronized (FsDatasetImpl.this) {
replicaInfo = getReplicaInfo(replicaState.bpid, replicaState.blockId); replicaInfo = getReplicaInfo(replicaState.getBlockPoolId(), replicaState.getBlockId());
Preconditions.checkState(replicaInfo.getVolume().isTransientStorage()); Preconditions.checkState(replicaInfo.getVolume().isTransientStorage());
blockFile = replicaInfo.getBlockFile(); blockFile = replicaInfo.getBlockFile();
metaFile = replicaInfo.getMetaFile(); metaFile = replicaInfo.getMetaFile();
blockFileUsed = blockFile.length(); blockFileUsed = blockFile.length();
metaFileUsed = metaFile.length(); metaFileUsed = metaFile.length();
lazyWriteReplicaTracker.discardReplica(replicaState, false); ramDiskReplicaTracker.discardReplica(replicaState, false);
// Move the replica from lazyPersist/ to finalized/ on target volume // Move the replica from lazyPersist/ to finalized/ on target volume
BlockPoolSlice bpSlice = BlockPoolSlice bpSlice =
replicaState.lazyPersistVolume.getBlockPoolSlice(replicaState.bpid); replicaState.getLazyPersistVolume().getBlockPoolSlice(bpid);
File newBlockFile = bpSlice.activateSavedReplica( File newBlockFile = bpSlice.activateSavedReplica(
replicaInfo, replicaState.savedBlockFile); replicaInfo, replicaState.getSavedMetaFile(),
replicaState.getSavedBlockFile());
newReplicaInfo = newReplicaInfo =
new FinalizedReplica(replicaInfo.getBlockId(), new FinalizedReplica(replicaInfo.getBlockId(),
replicaInfo.getBytesOnDisk(), replicaInfo.getBytesOnDisk(),
replicaInfo.getGenerationStamp(), replicaInfo.getGenerationStamp(),
replicaState.lazyPersistVolume, replicaState.getLazyPersistVolume(),
newBlockFile.getParentFile()); newBlockFile.getParentFile());
// Update the volumeMap entry. // Update the volumeMap entry.
volumeMap.add(replicaState.bpid, newReplicaInfo); volumeMap.add(bpid, newReplicaInfo);
} }
// Before deleting the files from transient storage we must notify the // Before deleting the files from transient storage we must notify the
// NN that the files are on the new storage. Else a blockReport from // NN that the files are on the new storage. Else a blockReport from
// the transient storage might cause the NN to think the blocks are lost. // the transient storage might cause the NN to think the blocks are lost.
ExtendedBlock extendedBlock = ExtendedBlock extendedBlock =
new ExtendedBlock(replicaState.bpid, newReplicaInfo); new ExtendedBlock(bpid, newReplicaInfo);
datanode.notifyNamenodeReceivedBlock( datanode.notifyNamenodeReceivedBlock(
extendedBlock, null, newReplicaInfo.getStorageUuid()); extendedBlock, null, newReplicaInfo.getStorageUuid());
// Remove the old replicas from transient storage. // Remove the old replicas from transient storage.
if (blockFile.delete() || !blockFile.exists()) { if (blockFile.delete() || !blockFile.exists()) {
((FsVolumeImpl) replicaInfo.getVolume()).decDfsUsed(replicaState.bpid, blockFileUsed); ((FsVolumeImpl) replicaInfo.getVolume()).decDfsUsed(bpid, blockFileUsed);
if (metaFile.delete() || !metaFile.exists()) { if (metaFile.delete() || !metaFile.exists()) {
((FsVolumeImpl) replicaInfo.getVolume()).decDfsUsed(replicaState.bpid, metaFileUsed); ((FsVolumeImpl) replicaInfo.getVolume()).decDfsUsed(bpid, metaFileUsed);
} }
} }
@ -2500,7 +2509,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
// Sleep if we have no more work to do or if it looks like we are not // Sleep if we have no more work to do or if it looks like we are not
// making any forward progress. This is to ensure that if all persist // making any forward progress. This is to ensure that if all persist
// operations are failing we don't keep retrying them in a tight loop. // operations are failing we don't keep retrying them in a tight loop.
if (numSuccessiveFailures >= lazyWriteReplicaTracker.numReplicasNotPersisted()) { if (numSuccessiveFailures >= ramDiskReplicaTracker.numReplicasNotPersisted()) {
Thread.sleep(checkpointerInterval * 1000); Thread.sleep(checkpointerInterval * 1000);
numSuccessiveFailures = 0; numSuccessiveFailures = 0;
} }

View File

@ -235,9 +235,6 @@ public class FsVolumeImpl implements FsVolumeSpi {
@Override @Override
public void reserveSpaceForRbw(long bytesToReserve) { public void reserveSpaceForRbw(long bytesToReserve) {
if (bytesToReserve != 0) { if (bytesToReserve != 0) {
if (FsDatasetImpl.LOG.isDebugEnabled()) {
FsDatasetImpl.LOG.debug("Reserving " + bytesToReserve + " on volume " + getBasePath());
}
reservedForRbw.addAndGet(bytesToReserve); reservedForRbw.addAndGet(bytesToReserve);
} }
} }
@ -245,9 +242,6 @@ public class FsVolumeImpl implements FsVolumeSpi {
@Override @Override
public void releaseReservedSpace(long bytesToRelease) { public void releaseReservedSpace(long bytesToRelease) {
if (bytesToRelease != 0) { if (bytesToRelease != 0) {
if (FsDatasetImpl.LOG.isDebugEnabled()) {
FsDatasetImpl.LOG.debug("Releasing " + bytesToRelease + " on volume " + getBasePath());
}
long oldReservation, newReservation; long oldReservation, newReservation;
do { do {
@ -298,17 +292,17 @@ public class FsVolumeImpl implements FsVolumeSpi {
} }
void getVolumeMap(ReplicaMap volumeMap, void getVolumeMap(ReplicaMap volumeMap,
final LazyWriteReplicaTracker lazyWriteReplicaMap) final RamDiskReplicaTracker ramDiskReplicaMap)
throws IOException { throws IOException {
for(BlockPoolSlice s : bpSlices.values()) { for(BlockPoolSlice s : bpSlices.values()) {
s.getVolumeMap(volumeMap, lazyWriteReplicaMap); s.getVolumeMap(volumeMap, ramDiskReplicaMap);
} }
} }
void getVolumeMap(String bpid, ReplicaMap volumeMap, void getVolumeMap(String bpid, ReplicaMap volumeMap,
final LazyWriteReplicaTracker lazyWriteReplicaMap) final RamDiskReplicaTracker ramDiskReplicaMap)
throws IOException { throws IOException {
getBlockPoolSlice(bpid).getVolumeMap(volumeMap, lazyWriteReplicaMap); getBlockPoolSlice(bpid).getVolumeMap(volumeMap, ramDiskReplicaMap);
} }
@Override @Override

View File

@ -121,7 +121,7 @@ class FsVolumeList {
void getAllVolumesMap(final String bpid, void getAllVolumesMap(final String bpid,
final ReplicaMap volumeMap, final ReplicaMap volumeMap,
final LazyWriteReplicaTracker lazyWriteReplicaMap) final RamDiskReplicaTracker ramDiskReplicaMap)
throws IOException { throws IOException {
long totalStartTime = Time.monotonicNow(); long totalStartTime = Time.monotonicNow();
final List<IOException> exceptions = Collections.synchronizedList( final List<IOException> exceptions = Collections.synchronizedList(
@ -134,7 +134,7 @@ class FsVolumeList {
FsDatasetImpl.LOG.info("Adding replicas to map for block pool " + FsDatasetImpl.LOG.info("Adding replicas to map for block pool " +
bpid + " on volume " + v + "..."); bpid + " on volume " + v + "...");
long startTime = Time.monotonicNow(); long startTime = Time.monotonicNow();
v.getVolumeMap(bpid, volumeMap, lazyWriteReplicaMap); v.getVolumeMap(bpid, volumeMap, ramDiskReplicaMap);
long timeTaken = Time.monotonicNow() - startTime; long timeTaken = Time.monotonicNow() - startTime;
FsDatasetImpl.LOG.info("Time to add replicas to map for block pool" FsDatasetImpl.LOG.info("Time to add replicas to map for block pool"
+ " " + bpid + " on volume " + v + ": " + timeTaken + "ms"); + " " + bpid + " on volume " + v + ": " + timeTaken + "ms");

View File

@ -1,268 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import com.google.common.collect.TreeMultimap;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import java.io.File;
import java.util.*;
class LazyWriteReplicaTracker {
enum State {
IN_MEMORY,
LAZY_PERSIST_IN_PROGRESS,
LAZY_PERSIST_COMPLETE,
}
static class ReplicaState implements Comparable<ReplicaState> {
final String bpid;
final long blockId;
State state;
/**
* transient storage volume that holds the original replica.
*/
final FsVolumeSpi transientVolume;
/**
* Persistent volume that holds or will hold the saved replica.
*/
FsVolumeImpl lazyPersistVolume;
File savedMetaFile;
File savedBlockFile;
ReplicaState(final String bpid, final long blockId, FsVolumeSpi transientVolume) {
this.bpid = bpid;
this.blockId = blockId;
this.transientVolume = transientVolume;
state = State.IN_MEMORY;
lazyPersistVolume = null;
savedMetaFile = null;
savedBlockFile = null;
}
void deleteSavedFiles() {
try {
if (savedBlockFile != null) {
savedBlockFile.delete();
savedBlockFile = null;
}
if (savedMetaFile != null) {
savedMetaFile.delete();
savedMetaFile = null;
}
} catch (Throwable t) {
// Ignore any exceptions.
}
}
@Override
public String toString() {
return "[Bpid=" + bpid + ";blockId=" + blockId + "]";
}
@Override
public int hashCode() {
return bpid.hashCode() ^ (int) blockId;
}
@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}
if (other == null || getClass() != other.getClass()) {
return false;
}
ReplicaState otherState = (ReplicaState) other;
return (otherState.bpid.equals(bpid) && otherState.blockId == blockId);
}
@Override
public int compareTo(ReplicaState other) {
if (blockId == other.blockId) {
return 0;
} else if (blockId < other.blockId) {
return -1;
} else {
return 1;
}
}
}
final FsDatasetImpl fsDataset;
/**
* Map of blockpool ID to map of blockID to ReplicaInfo.
*/
final Map<String, Map<Long, ReplicaState>> replicaMaps;
/**
* Queue of replicas that need to be written to disk.
* Stale entries are GC'd by dequeueNextReplicaToPersist.
*/
final Queue<ReplicaState> replicasNotPersisted;
/**
* Queue of replicas in the order in which they were persisted.
* We'll dequeue them in the same order.
* We can improve the eviction scheme later.
* Stale entries are GC'd by getNextCandidateForEviction.
*/
final Queue<ReplicaState> replicasPersisted;
LazyWriteReplicaTracker(final FsDatasetImpl fsDataset) {
this.fsDataset = fsDataset;
replicaMaps = new HashMap<String, Map<Long, ReplicaState>>();
replicasNotPersisted = new LinkedList<ReplicaState>();
replicasPersisted = new LinkedList<ReplicaState>();
}
synchronized void addReplica(String bpid, long blockId,
final FsVolumeSpi transientVolume) {
Map<Long, ReplicaState> map = replicaMaps.get(bpid);
if (map == null) {
map = new HashMap<Long, ReplicaState>();
replicaMaps.put(bpid, map);
}
ReplicaState replicaState = new ReplicaState(bpid, blockId, transientVolume);
map.put(blockId, replicaState);
replicasNotPersisted.add(replicaState);
}
synchronized void recordStartLazyPersist(
final String bpid, final long blockId, FsVolumeImpl checkpointVolume) {
Map<Long, ReplicaState> map = replicaMaps.get(bpid);
ReplicaState replicaState = map.get(blockId);
replicaState.state = State.LAZY_PERSIST_IN_PROGRESS;
replicaState.lazyPersistVolume = checkpointVolume;
}
/**
* @param bpid
* @param blockId
* @param savedFiles The saved meta and block files, in that order.
*/
synchronized void recordEndLazyPersist(
final String bpid, final long blockId, final File[] savedFiles) {
Map<Long, ReplicaState> map = replicaMaps.get(bpid);
ReplicaState replicaState = map.get(blockId);
if (replicaState == null) {
throw new IllegalStateException("Unknown replica bpid=" +
bpid + "; blockId=" + blockId);
}
replicaState.state = State.LAZY_PERSIST_COMPLETE;
replicaState.savedMetaFile = savedFiles[0];
replicaState.savedBlockFile = savedFiles[1];
if (replicasNotPersisted.peek() == replicaState) {
// Common case.
replicasNotPersisted.remove();
} else {
// Should never occur in practice as lazy writer always persists
// the replica at the head of the queue before moving to the next
// one.
replicasNotPersisted.remove(replicaState);
}
replicasPersisted.add(replicaState);
}
synchronized ReplicaState dequeueNextReplicaToPersist() {
while (replicasNotPersisted.size() != 0) {
ReplicaState replicaState = replicasNotPersisted.remove();
Map<Long, ReplicaState> replicaMap = replicaMaps.get(replicaState.bpid);
if (replicaMap != null && replicaMap.get(replicaState.blockId) != null) {
return replicaState;
}
// The replica no longer exists, look for the next one.
}
return null;
}
synchronized void reenqueueReplicaNotPersisted(final ReplicaState replicaState) {
replicasNotPersisted.add(replicaState);
}
synchronized void reenqueueReplicaPersisted(final ReplicaState replicaState) {
replicasPersisted.add(replicaState);
}
synchronized int numReplicasNotPersisted() {
return replicasNotPersisted.size();
}
synchronized ReplicaState getNextCandidateForEviction() {
while (replicasPersisted.size() != 0) {
ReplicaState replicaState = replicasPersisted.remove();
Map<Long, ReplicaState> replicaMap = replicaMaps.get(replicaState.bpid);
if (replicaMap != null && replicaMap.get(replicaState.blockId) != null) {
return replicaState;
}
// The replica no longer exists, look for the next one.
}
return null;
}
void discardReplica(ReplicaState replicaState, boolean deleteSavedCopies) {
discardReplica(replicaState.bpid, replicaState.blockId, deleteSavedCopies);
}
/**
* Discard any state we are tracking for the given replica. This could mean
* the block is either deleted from the block space or the replica is no longer
* on transient storage.
*
* @param deleteSavedCopies true if we should delete the saved copies on
* persistent storage. This should be set by the
* caller when the block is no longer needed.
*/
synchronized void discardReplica(
final String bpid, final long blockId,
boolean deleteSavedCopies) {
Map<Long, ReplicaState> map = replicaMaps.get(bpid);
if (map == null) {
return;
}
ReplicaState replicaState = map.get(blockId);
if (replicaState == null) {
return;
}
if (deleteSavedCopies) {
replicaState.deleteSavedFiles();
}
map.remove(blockId);
}
}

View File

@ -0,0 +1,208 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import com.google.common.collect.TreeMultimap;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import java.io.File;
import java.util.*;
/**
* An implementation of RamDiskReplicaTracker that uses an LRU
* eviction scheme.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class RamDiskReplicaLruTracker extends RamDiskReplicaTracker {
private class RamDiskReplicaLru extends RamDiskReplica {
long lastUsedTime;
private RamDiskReplicaLru(String bpid, long blockId, FsVolumeImpl ramDiskVolume) {
super(bpid, blockId, ramDiskVolume);
}
}
/**
* Map of blockpool ID to <map of blockID to ReplicaInfo>.
*/
Map<String, Map<Long, RamDiskReplicaLru>> replicaMaps;
/**
* Queue of replicas that need to be written to disk.
* Stale entries are GC'd by dequeueNextReplicaToPersist.
*/
Queue<RamDiskReplicaLru> replicasNotPersisted;
/**
* Map of persisted replicas ordered by their last use times.
*/
TreeMultimap<Long, RamDiskReplicaLru> replicasPersisted;
RamDiskReplicaLruTracker() {
replicaMaps = new HashMap<String, Map<Long, RamDiskReplicaLru>>();
replicasNotPersisted = new LinkedList<RamDiskReplicaLru>();
replicasPersisted = TreeMultimap.create();
}
@Override
synchronized void addReplica(final String bpid, final long blockId,
final FsVolumeImpl transientVolume) {
Map<Long, RamDiskReplicaLru> map = replicaMaps.get(bpid);
if (map == null) {
map = new HashMap<Long, RamDiskReplicaLru>();
replicaMaps.put(bpid, map);
}
RamDiskReplicaLru ramDiskReplicaLru = new RamDiskReplicaLru(bpid, blockId, transientVolume);
map.put(blockId, ramDiskReplicaLru);
replicasNotPersisted.add(ramDiskReplicaLru);
}
@Override
synchronized void touch(final String bpid,
final long blockId) {
Map<Long, RamDiskReplicaLru> map = replicaMaps.get(bpid);
RamDiskReplicaLru ramDiskReplicaLru = map.get(blockId);
if (ramDiskReplicaLru == null) {
return;
}
// Reinsert the replica with its new timestamp.
if (replicasPersisted.remove(ramDiskReplicaLru.lastUsedTime, ramDiskReplicaLru)) {
ramDiskReplicaLru.lastUsedTime = System.currentTimeMillis();
replicasPersisted.put(ramDiskReplicaLru.lastUsedTime, ramDiskReplicaLru);
}
}
@Override
synchronized void recordStartLazyPersist(
final String bpid, final long blockId, FsVolumeImpl checkpointVolume) {
Map<Long, RamDiskReplicaLru> map = replicaMaps.get(bpid);
RamDiskReplicaLru ramDiskReplicaLru = map.get(blockId);
ramDiskReplicaLru.setLazyPersistVolume(checkpointVolume);
}
@Override
synchronized void recordEndLazyPersist(
final String bpid, final long blockId, final File[] savedFiles) {
Map<Long, RamDiskReplicaLru> map = replicaMaps.get(bpid);
RamDiskReplicaLru ramDiskReplicaLru = map.get(blockId);
if (ramDiskReplicaLru == null) {
throw new IllegalStateException("Unknown replica bpid=" +
bpid + "; blockId=" + blockId);
}
ramDiskReplicaLru.recordSavedBlockFiles(savedFiles);
if (replicasNotPersisted.peek() == ramDiskReplicaLru) {
// Common case.
replicasNotPersisted.remove();
} else {
// Caller error? Fallback to O(n) removal.
replicasNotPersisted.remove(ramDiskReplicaLru);
}
ramDiskReplicaLru.lastUsedTime = System.currentTimeMillis();
replicasPersisted.put(ramDiskReplicaLru.lastUsedTime, ramDiskReplicaLru);
}
@Override
synchronized RamDiskReplicaLru dequeueNextReplicaToPersist() {
while (replicasNotPersisted.size() != 0) {
RamDiskReplicaLru ramDiskReplicaLru = replicasNotPersisted.remove();
Map<Long, RamDiskReplicaLru> replicaMap =
replicaMaps.get(ramDiskReplicaLru.getBlockPoolId());
if (replicaMap != null && replicaMap.get(ramDiskReplicaLru.getBlockId()) != null) {
return ramDiskReplicaLru;
}
// The replica no longer exists, look for the next one.
}
return null;
}
@Override
synchronized void reenqueueReplicaNotPersisted(final RamDiskReplica ramDiskReplicaLru) {
replicasNotPersisted.add((RamDiskReplicaLru) ramDiskReplicaLru);
}
@Override
synchronized int numReplicasNotPersisted() {
return replicasNotPersisted.size();
}
@Override
synchronized RamDiskReplicaLru getNextCandidateForEviction() {
Iterator it = replicasPersisted.values().iterator();
while (it.hasNext()) {
RamDiskReplicaLru ramDiskReplicaLru = (RamDiskReplicaLru) it.next();
it.remove();
Map<Long, RamDiskReplicaLru> replicaMap =
replicaMaps.get(ramDiskReplicaLru.getBlockPoolId());
if (replicaMap != null && replicaMap.get(ramDiskReplicaLru.getBlockId()) != null) {
return ramDiskReplicaLru;
}
// The replica no longer exists, look for the next one.
}
return null;
}
/**
* Discard any state we are tracking for the given replica. This could mean
* the block is either deleted from the block space or the replica is no longer
* on transient storage.
*
* @param deleteSavedCopies true if we should delete the saved copies on
* persistent storage. This should be set by the
* caller when the block is no longer needed.
*/
@Override
synchronized void discardReplica(
final String bpid, final long blockId,
boolean deleteSavedCopies) {
Map<Long, RamDiskReplicaLru> map = replicaMaps.get(bpid);
if (map == null) {
return;
}
RamDiskReplicaLru ramDiskReplicaLru = map.get(blockId);
if (ramDiskReplicaLru == null) {
return;
}
if (deleteSavedCopies) {
ramDiskReplicaLru.deleteSavedFiles();
}
map.remove(blockId);
replicasPersisted.remove(ramDiskReplicaLru.lastUsedTime, ramDiskReplicaLru);
// replicasNotPersisted will be lazily GC'ed.
}
}

View File

@ -0,0 +1,245 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import com.google.common.base.Preconditions;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.util.ReflectionUtils;
import java.io.File;
@InterfaceAudience.Private
@InterfaceStability.Unstable
public abstract class RamDiskReplicaTracker {
FsDatasetImpl fsDataset;
static class RamDiskReplica implements Comparable<RamDiskReplica> {
private final String bpid;
private final long blockId;
private File savedBlockFile;
private File savedMetaFile;
/**
* RAM_DISK volume that holds the original replica.
*/
final FsVolumeSpi ramDiskVolume;
/**
* Persistent volume that holds or will hold the saved replica.
*/
FsVolumeImpl lazyPersistVolume;
RamDiskReplica(final String bpid, final long blockId,
final FsVolumeImpl ramDiskVolume) {
this.bpid = bpid;
this.blockId = blockId;
this.ramDiskVolume = ramDiskVolume;
lazyPersistVolume = null;
savedMetaFile = null;
savedBlockFile = null;
}
long getBlockId() {
return blockId;
}
String getBlockPoolId() {
return bpid;
}
FsVolumeImpl getLazyPersistVolume() {
return lazyPersistVolume;
}
void setLazyPersistVolume(FsVolumeImpl volume) {
Preconditions.checkState(!volume.isTransientStorage());
this.lazyPersistVolume = volume;
}
File getSavedBlockFile() {
return savedBlockFile;
}
File getSavedMetaFile() {
return savedMetaFile;
}
/**
* Record the saved meta and block files on the given volume.
*
* @param files Meta and block files, in that order.
*/
void recordSavedBlockFiles(File[] files) {
this.savedMetaFile = files[0];
this.savedBlockFile = files[1];
}
@Override
public int hashCode() {
return bpid.hashCode() ^ (int) blockId;
}
@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}
if (other == null || getClass() != other.getClass()) {
return false;
}
RamDiskReplica otherState = (RamDiskReplica) other;
return (otherState.bpid.equals(bpid) && otherState.blockId == blockId);
}
// Delete the saved meta and block files. Failure to delete can be
// ignored, the directory scanner will retry the deletion later.
void deleteSavedFiles() {
try {
if (savedBlockFile != null) {
savedBlockFile.delete();
savedBlockFile = null;
}
if (savedMetaFile != null) {
savedMetaFile.delete();
savedMetaFile = null;
}
} catch (Throwable t) {
// Ignore any exceptions.
}
}
@Override
public int compareTo(RamDiskReplica other) {
int bpidResult = bpid.compareTo(other.bpid);
if (bpidResult == 0)
if (blockId == other.blockId) {
return 0;
} else if (blockId < other.blockId) {
return -1;
} else {
return 1;
}
return bpidResult;
}
@Override
public String toString() {
return "[BlockPoolID=" + bpid + "; BlockId=" + blockId + "]";
}
}
/**
* Get an instance of the configured RamDiskReplicaTracker based on the
* the configuration property
* {@link org.apache.hadoop.hdfs.DFSConfigKeys#DFS_DATANODE_RAM_DISK_REPLICA_TRACKER_KEY}.
*
* @param conf the configuration to be used
* @param dataset the FsDataset object.
* @return an instance of RamDiskReplicaTracker
*/
static RamDiskReplicaTracker getInstance(final Configuration conf,
final FsDatasetImpl fsDataset) {
final Class<? extends RamDiskReplicaTracker> trackerClass = conf.getClass(
DFSConfigKeys.DFS_DATANODE_RAM_DISK_REPLICA_TRACKER_KEY,
DFSConfigKeys.DFS_DATANODE_RAM_DISK_REPLICA_TRACKER_DEFAULT,
RamDiskReplicaTracker.class);
final RamDiskReplicaTracker tracker = ReflectionUtils.newInstance(
trackerClass, conf);
tracker.initialize(fsDataset);
return tracker;
}
void initialize(final FsDatasetImpl fsDataset) {
this.fsDataset = fsDataset;
}
/**
* Start tracking a new finalized replica on RAM disk.
*
* @param transientVolume RAM disk volume that stores the replica.
*/
abstract void addReplica(final String bpid, final long blockId,
final FsVolumeImpl transientVolume);
/**
* Invoked when a replica is opened by a client. This may be used as
* a heuristic by the eviction scheme.
*/
abstract void touch(final String bpid, final long blockId);
/**
* Get the next replica to write to persistent storage.
*/
abstract RamDiskReplica dequeueNextReplicaToPersist();
/**
* Invoked if a replica that was previously dequeued for persistence
* could not be successfully persisted. Add it back so it can be retried
* later.
*/
abstract void reenqueueReplicaNotPersisted(
final RamDiskReplica ramDiskReplica);
/**
* Invoked when the Lazy persist operation is started by the DataNode.
* @param checkpointVolume
*/
abstract void recordStartLazyPersist(
final String bpid, final long blockId, FsVolumeImpl checkpointVolume);
/**
* Invoked when the Lazy persist operation is complete.
*
* @param savedFiles The saved meta and block files, in that order.
*/
abstract void recordEndLazyPersist(
final String bpid, final long blockId, final File[] savedFiles);
/**
* Return a candidate replica to remove from RAM Disk. The exact replica
* to be returned may depend on the eviction scheme utilized.
*
* @return
*/
abstract RamDiskReplica getNextCandidateForEviction();
/**
* Return the number of replicas pending persistence to disk.
*/
abstract int numReplicasNotPersisted();
/**
* Discard all state we are tracking for the given replica.
*/
abstract void discardReplica(
final String bpid, final long blockId,
boolean deleteSavedCopies);
void discardReplica(RamDiskReplica replica, boolean deleteSavedCopies) {
discardReplica(replica.getBlockPoolId(), replica.getBlockId(), deleteSavedCopies);
}
}

View File

@ -30,7 +30,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
public class FsDatasetTestUtil { public class FsDatasetTestUtil {
public static File getFile(FsDatasetSpi<?> fsd, String bpid, long bid) { public static File getFile(FsDatasetSpi<?> fsd, String bpid, long bid) {
return ((FsDatasetImpl)fsd).getFile(bpid, bid); return ((FsDatasetImpl)fsd).getFile(bpid, bid, false);
} }
public static File getBlockFile(FsDatasetSpi<?> fsd, String bpid, Block b public static File getBlockFile(FsDatasetSpi<?> fsd, String bpid, Block b

View File

@ -71,7 +71,7 @@ public class TestLazyPersistFiles {
private static final int THREADPOOL_SIZE = 10; private static final int THREADPOOL_SIZE = 10;
private static final short REPL_FACTOR = 1; private static final short REPL_FACTOR = 1;
private static final int BLOCK_SIZE = 10485760; // 10 MB private static final int BLOCK_SIZE = 5 * 1024 * 1024;
private static final int LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC = 3; private static final int LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC = 3;
private static final long HEARTBEAT_INTERVAL_SEC = 1; private static final long HEARTBEAT_INTERVAL_SEC = 1;
private static final int HEARTBEAT_RECHECK_INTERVAL_MSEC = 500; private static final int HEARTBEAT_RECHECK_INTERVAL_MSEC = 500;
@ -449,34 +449,51 @@ public class TestLazyPersistFiles {
* @throws InterruptedException * @throws InterruptedException
*/ */
@Test (timeout=300000) @Test (timeout=300000)
public void testRamDiskEvictionLRU() public void testRamDiskEvictionIsLru()
throws IOException, InterruptedException { throws IOException, InterruptedException {
startUpCluster(true, 3); final int NUM_PATHS = 5;
startUpCluster(true, NUM_PATHS + EVICTION_LOW_WATERMARK);
final String METHOD_NAME = GenericTestUtils.getMethodName(); final String METHOD_NAME = GenericTestUtils.getMethodName();
final int NUM_PATHS = 6; Path paths[] = new Path[NUM_PATHS * 2];
Path paths[] = new Path[NUM_PATHS];
for (int i = 0; i < NUM_PATHS; i++) { for (int i = 0; i < paths.length; i++) {
paths[i] = new Path("/" + METHOD_NAME + "." + i +".dat"); paths[i] = new Path("/" + METHOD_NAME + "." + i +".dat");
} }
// No eviction for the first half of files for (int i = 0; i < NUM_PATHS; i++) {
for (int i = 0; i < NUM_PATHS/2; i++) {
makeTestFile(paths[i], BLOCK_SIZE, true); makeTestFile(paths[i], BLOCK_SIZE, true);
}
// Sleep for a short time to allow the lazy writer thread to do its job.
Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
for (int i = 0; i < NUM_PATHS; ++i) {
ensureFileReplicasOnStorageType(paths[i], RAM_DISK); ensureFileReplicasOnStorageType(paths[i], RAM_DISK);
} }
// Lazy persist writer persists the first half of files // Open the files for read in a random order.
Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000); ArrayList<Integer> indexes = new ArrayList<Integer>(NUM_PATHS);
for (int i = 0; i < NUM_PATHS; ++i) {
indexes.add(i);
}
Collections.shuffle(indexes);
// Create the second half of files with eviction upon each create. for (int i = 0; i < NUM_PATHS; ++i) {
for (int i = NUM_PATHS/2; i < NUM_PATHS; i++) { LOG.info("Touching file " + paths[indexes.get(i)]);
makeTestFile(paths[i], BLOCK_SIZE, true); DFSTestUtil.readFile(fs, paths[indexes.get(i)]);
ensureFileReplicasOnStorageType(paths[i], RAM_DISK); }
// path[i-NUM_PATHS/2] is expected to be evicted by LRU // Create an equal number of new files ensuring that the previous
// files are evicted in the same order they were read.
for (int i = 0; i < NUM_PATHS; ++i) {
makeTestFile(paths[i + NUM_PATHS], BLOCK_SIZE, true);
triggerBlockReport(); triggerBlockReport();
ensureFileReplicasOnStorageType(paths[i - NUM_PATHS / 2], DEFAULT); Thread.sleep(3000);
ensureFileReplicasOnStorageType(paths[i + NUM_PATHS], RAM_DISK);
ensureFileReplicasOnStorageType(paths[indexes.get(i)], DEFAULT);
for (int j = i + 1; j < NUM_PATHS; ++j) {
ensureFileReplicasOnStorageType(paths[indexes.get(j)], RAM_DISK);
}
} }
} }