HDFS-7100. Make eviction scheme pluggable. (Arpit Agarwal)

This commit is contained in:
arp 2014-09-20 13:25:23 -07:00
parent 09dab88d3e
commit b2d5ed36bc
12 changed files with 559 additions and 347 deletions

View File

@ -60,4 +60,6 @@
HDFS-7091. Add forwarding constructor for INodeFile for existing callers.
(Arpit Agarwal)
HDFS-7100. Make eviction scheme pluggable. (Arpit Agarwal)

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.hdfs;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaLruTracker;
import org.apache.hadoop.hdfs.web.AuthFilter;
import org.apache.hadoop.http.HttpConfig;
@ -129,6 +130,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final int DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_DEFAULT = 4;
public static final String DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC = "dfs.datanode.lazywriter.interval.sec";
public static final int DFS_DATANODE_LAZY_WRITER_INTERVAL_DEFAULT_SEC = 60;
public static final String DFS_DATANODE_RAM_DISK_REPLICA_TRACKER_KEY = "dfs.datanode.ram.disk.replica.tracker";
public static final Class<RamDiskReplicaLruTracker> DFS_DATANODE_RAM_DISK_REPLICA_TRACKER_DEFAULT = RamDiskReplicaLruTracker.class;
public static final String DFS_DATANODE_RAM_DISK_LOW_WATERMARK_PERCENT = "dfs.datanode.ram.disk.low.watermark.percent";
public static final int DFS_DATANODE_RAM_DISK_LOW_WATERMARK_PERCENT_DEFAULT = 10;
public static final String DFS_DATANODE_RAM_DISK_LOW_WATERMARK_REPLICAS = "dfs.datanode.ram.disk.low.watermark.replicas";

View File

@ -2041,7 +2041,8 @@ public class DataNode extends ReconfigurableBase
LOG.warn("Cannot find BPOfferService for reporting block received for bpid="
+ block.getBlockPoolId());
}
if (blockScanner != null) {
FsVolumeSpi volume = getFSDataset().getVolume(block);
if (blockScanner != null && !volume.isTransientStorage()) {
blockScanner.addBlock(block);
}
}

View File

@ -286,9 +286,9 @@ class BlockPoolSlice {
* Move a persisted replica from lazypersist directory to a subdirectory
* under finalized.
*/
File activateSavedReplica(Block b, File blockFile) throws IOException {
File activateSavedReplica(Block b, File metaFile, File blockFile)
throws IOException {
final File blockDir = DatanodeUtil.idToBlockDir(finalizedDir, b.getBlockId());
final File metaFile = FsDatasetUtil.getMetaFile(blockFile, b.getGenerationStamp());
final File targetBlockFile = new File(blockDir, blockFile.getName());
final File targetMetaFile = new File(blockDir, metaFile.getName());
FileUtils.moveFile(blockFile, targetBlockFile);
@ -307,7 +307,7 @@ class BlockPoolSlice {
void getVolumeMap(ReplicaMap volumeMap,
final LazyWriteReplicaTracker lazyWriteReplicaMap)
final RamDiskReplicaTracker lazyWriteReplicaMap)
throws IOException {
// Recover lazy persist replicas, they will be added to the volumeMap
// when we scan the finalized directory.
@ -404,7 +404,7 @@ class BlockPoolSlice {
* false if the directory has rbw replicas
*/
void addToReplicasMap(ReplicaMap volumeMap, File dir,
final LazyWriteReplicaTracker lazyWriteReplicaMap,
final RamDiskReplicaTracker lazyWriteReplicaMap,
boolean isFinalized)
throws IOException {
File files[] = FileUtil.listFiles(dir);
@ -481,7 +481,8 @@ class BlockPoolSlice {
// it is in the lazyWriteReplicaMap so it can be persisted
// eventually.
if (newReplica.getVolume().isTransientStorage()) {
lazyWriteReplicaMap.addReplica(bpid, blockId, newReplica.getVolume());
lazyWriteReplicaMap.addReplica(bpid, blockId,
(FsVolumeImpl) newReplica.getVolume());
} else {
lazyWriteReplicaMap.discardReplica(bpid, blockId, false);
}

View File

@ -30,7 +30,6 @@ import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@ -87,6 +86,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.RollingLogs;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy;
import static org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaTracker.RamDiskReplica;
import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
@ -158,7 +158,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
@Override // FsDatasetSpi
public synchronized Block getStoredBlock(String bpid, long blkid)
throws IOException {
File blockfile = getFile(bpid, blkid);
File blockfile = getFile(bpid, blkid, false);
if (blockfile == null) {
return null;
}
@ -218,7 +218,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
private volatile boolean fsRunning;
final ReplicaMap volumeMap;
final LazyWriteReplicaTracker lazyWriteReplicaTracker;
final RamDiskReplicaTracker ramDiskReplicaTracker;
private static final int MAX_BLOCK_EVICTIONS_PER_ITERATION = 3;
@ -262,7 +262,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
storageMap = new ConcurrentHashMap<String, DatanodeStorage>();
volumeMap = new ReplicaMap(this);
lazyWriteReplicaTracker = new LazyWriteReplicaTracker(this);
ramDiskReplicaTracker = RamDiskReplicaTracker.getInstance(conf, this);
@SuppressWarnings("unchecked")
final VolumeChoosingPolicy<FsVolumeImpl> blockChooserImpl =
@ -297,7 +297,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
FsVolumeImpl fsVolume = FsVolumeImplAllocator.createVolume(
this, sd.getStorageUuid(), dir, this.conf, storageType);
ReplicaMap tempVolumeMap = new ReplicaMap(this);
fsVolume.getVolumeMap(tempVolumeMap, lazyWriteReplicaTracker);
fsVolume.getVolumeMap(tempVolumeMap, ramDiskReplicaTracker);
volumeMap.addAll(tempVolumeMap);
volumes.addVolume(fsVolume);
@ -325,7 +325,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
for (final String bpid : bpids) {
try {
fsVolume.addBlockPool(bpid, this.conf);
fsVolume.getVolumeMap(bpid, tempVolumeMap, lazyWriteReplicaTracker);
fsVolume.getVolumeMap(bpid, tempVolumeMap, ramDiskReplicaTracker);
} catch (IOException e) {
LOG.warn("Caught exception when adding " + fsVolume +
". Will throw later.", e);
@ -585,12 +585,16 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
* checking that it exists. This should be used when the
* next operation is going to open the file for read anyway,
* and thus the exists check is redundant.
*
* @param touch if true then update the last access timestamp of the
* block. Currently used for blocks on transient storage.
*/
private File getBlockFileNoExistsCheck(ExtendedBlock b)
private File getBlockFileNoExistsCheck(ExtendedBlock b,
boolean touch)
throws IOException {
final File f;
synchronized(this) {
f = getFile(b.getBlockPoolId(), b.getLocalBlock().getBlockId());
f = getFile(b.getBlockPoolId(), b.getLocalBlock().getBlockId(), touch);
}
if (f == null) {
throw new IOException("Block " + b + " is not valid");
@ -601,7 +605,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
@Override // FsDatasetSpi
public InputStream getBlockInputStream(ExtendedBlock b,
long seekOffset) throws IOException {
File blockFile = getBlockFileNoExistsCheck(b);
File blockFile = getBlockFileNoExistsCheck(b, true);
if (isNativeIOAvailable) {
return NativeIO.getShareDeleteFileInputStream(blockFile, seekOffset);
} else {
@ -1239,7 +1243,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
newReplicaInfo = new FinalizedReplica(replicaInfo, v, dest.getParentFile());
if (v.isTransientStorage()) {
lazyWriteReplicaTracker.addReplica(bpid, replicaInfo.getBlockId(), v);
ramDiskReplicaTracker.addReplica(bpid, replicaInfo.getBlockId(), v);
}
}
volumeMap.add(bpid, newReplicaInfo);
@ -1264,7 +1268,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
LOG.warn("Block " + b + " unfinalized and removed. " );
}
if (replicaInfo.getVolume().isTransientStorage()) {
lazyWriteReplicaTracker.discardReplica(b.getBlockPoolId(), b.getBlockId(), true);
ramDiskReplicaTracker.discardReplica(b.getBlockPoolId(), b.getBlockId(), true);
}
}
}
@ -1410,7 +1414,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
//Should we check for metadata file too?
final File f;
synchronized(this) {
f = getFile(bpid, blockId);
f = getFile(bpid, blockId, false);
}
if(f != null ) {
@ -1495,7 +1499,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
}
if (v.isTransientStorage()) {
lazyWriteReplicaTracker.discardReplica(bpid, invalidBlks[i].getBlockId(), true);
ramDiskReplicaTracker.discardReplica(bpid, invalidBlks[i].getBlockId(), true);
}
// If a DFSClient has the replica in its cache of short-circuit file
@ -1627,7 +1631,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
@Override // FsDatasetSpi
public synchronized boolean contains(final ExtendedBlock block) {
final long blockId = block.getLocalBlock().getBlockId();
return getFile(block.getBlockPoolId(), blockId) != null;
return getFile(block.getBlockPoolId(), blockId, false) != null;
}
/**
@ -1636,9 +1640,12 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
* @param blockId a block's id
* @return on disk data file path; null if the replica does not exist
*/
File getFile(final String bpid, final long blockId) {
File getFile(final String bpid, final long blockId, boolean touch) {
ReplicaInfo info = volumeMap.get(bpid, blockId);
if (info != null) {
if (touch && info.getVolume().isTransientStorage()) {
ramDiskReplicaTracker.touch(bpid, blockId);
}
return info.getBlockFile();
}
return null;
@ -1807,7 +1814,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
blockScanner.deleteBlock(bpid, new Block(blockId));
}
if (vol.isTransientStorage()) {
lazyWriteReplicaTracker.discardReplica(bpid, blockId, true);
ramDiskReplicaTracker.discardReplica(bpid, blockId, true);
}
LOG.warn("Removed block " + blockId
+ " from memory with missing block file on the disk");
@ -1829,11 +1836,12 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
diskFile.length(), diskGS, vol, diskFile.getParentFile());
volumeMap.add(bpid, diskBlockInfo);
final DataBlockScanner blockScanner = datanode.getBlockScanner();
if (blockScanner != null) {
blockScanner.addBlock(new ExtendedBlock(bpid, diskBlockInfo));
}
if (vol.isTransientStorage()) {
lazyWriteReplicaTracker.addReplica(bpid, blockId, (FsVolumeImpl) vol);
if (!vol.isTransientStorage()) {
if (blockScanner != null) {
blockScanner.addBlock(new ExtendedBlock(bpid, diskBlockInfo));
}
} else {
ramDiskReplicaTracker.addReplica(bpid, blockId, (FsVolumeImpl) vol);
}
LOG.warn("Added missing block to memory " + diskBlockInfo);
return;
@ -2116,7 +2124,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
volumes.addBlockPool(bpid, conf);
volumeMap.initBlockPool(bpid);
}
volumes.getAllVolumesMap(bpid, volumeMap, lazyWriteReplicaTracker);
volumes.getAllVolumesMap(bpid, volumeMap, ramDiskReplicaTracker);
}
@Override
@ -2346,7 +2354,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
LOG.debug("LazyWriter starting to save blockId=" + blockId + "; bpid=" + bpid);
}
lazyWriteReplicaTracker.recordStartLazyPersist(bpid, blockId, targetVolume);
ramDiskReplicaTracker.recordStartLazyPersist(bpid, blockId, targetVolume);
bpSlice = targetVolume.getBlockPoolSlice(bpid);
srcMeta = replicaInfo.getMetaFile();
srcFile = replicaInfo.getBlockFile();
@ -2358,7 +2366,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
bpSlice.lazyPersistReplica(blockId, genStamp, srcMeta, srcFile);
synchronized (FsDatasetImpl.this) {
lazyWriteReplicaTracker.recordEndLazyPersist(bpid, blockId, savedFiles);
ramDiskReplicaTracker.recordEndLazyPersist(bpid, blockId, savedFiles);
if (LOG.isDebugEnabled()) {
LOG.debug("LazyWriter finished saving blockId=" + blockId + "; bpid=" + bpid +
@ -2373,21 +2381,21 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
* @return true if there is more work to be done, false otherwise.
*/
private boolean saveNextReplica() {
LazyWriteReplicaTracker.ReplicaState replicaState = null;
RamDiskReplica block = null;
boolean succeeded = false;
try {
replicaState = lazyWriteReplicaTracker.dequeueNextReplicaToPersist();
if (replicaState != null) {
moveReplicaToNewVolume(replicaState.bpid, replicaState.blockId);
block = ramDiskReplicaTracker.dequeueNextReplicaToPersist();
if (block != null) {
moveReplicaToNewVolume(block.getBlockPoolId(), block.getBlockId());
}
succeeded = true;
} catch(IOException ioe) {
LOG.warn("Exception saving replica " + replicaState, ioe);
LOG.warn("Exception saving replica " + block, ioe);
} finally {
if (!succeeded && replicaState != null) {
LOG.warn("Failed to save replica " + replicaState + ". re-enqueueing it.");
lazyWriteReplicaTracker.reenqueueReplicaNotPersisted(replicaState);
if (!succeeded && block != null) {
LOG.warn("Failed to save replica " + block + ". re-enqueueing it.");
ramDiskReplicaTracker.reenqueueReplicaNotPersisted(block);
}
}
@ -2425,8 +2433,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
while (iterations++ < MAX_BLOCK_EVICTIONS_PER_ITERATION &&
transientFreeSpaceBelowThreshold()) {
LazyWriteReplicaTracker.ReplicaState replicaState =
lazyWriteReplicaTracker.getNextCandidateForEviction();
RamDiskReplica replicaState = ramDiskReplicaTracker.getNextCandidateForEviction();
if (replicaState == null) {
break;
@ -2439,46 +2446,48 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
ReplicaInfo replicaInfo, newReplicaInfo;
File blockFile, metaFile;
long blockFileUsed, metaFileUsed;
final String bpid = replicaState.getBlockPoolId();
synchronized (FsDatasetImpl.this) {
replicaInfo = getReplicaInfo(replicaState.bpid, replicaState.blockId);
replicaInfo = getReplicaInfo(replicaState.getBlockPoolId(), replicaState.getBlockId());
Preconditions.checkState(replicaInfo.getVolume().isTransientStorage());
blockFile = replicaInfo.getBlockFile();
metaFile = replicaInfo.getMetaFile();
blockFileUsed = blockFile.length();
metaFileUsed = metaFile.length();
lazyWriteReplicaTracker.discardReplica(replicaState, false);
ramDiskReplicaTracker.discardReplica(replicaState, false);
// Move the replica from lazyPersist/ to finalized/ on target volume
BlockPoolSlice bpSlice =
replicaState.lazyPersistVolume.getBlockPoolSlice(replicaState.bpid);
replicaState.getLazyPersistVolume().getBlockPoolSlice(bpid);
File newBlockFile = bpSlice.activateSavedReplica(
replicaInfo, replicaState.savedBlockFile);
replicaInfo, replicaState.getSavedMetaFile(),
replicaState.getSavedBlockFile());
newReplicaInfo =
new FinalizedReplica(replicaInfo.getBlockId(),
replicaInfo.getBytesOnDisk(),
replicaInfo.getGenerationStamp(),
replicaState.lazyPersistVolume,
replicaState.getLazyPersistVolume(),
newBlockFile.getParentFile());
// Update the volumeMap entry.
volumeMap.add(replicaState.bpid, newReplicaInfo);
volumeMap.add(bpid, newReplicaInfo);
}
// Before deleting the files from transient storage we must notify the
// NN that the files are on the new storage. Else a blockReport from
// the transient storage might cause the NN to think the blocks are lost.
ExtendedBlock extendedBlock =
new ExtendedBlock(replicaState.bpid, newReplicaInfo);
new ExtendedBlock(bpid, newReplicaInfo);
datanode.notifyNamenodeReceivedBlock(
extendedBlock, null, newReplicaInfo.getStorageUuid());
// Remove the old replicas from transient storage.
if (blockFile.delete() || !blockFile.exists()) {
((FsVolumeImpl) replicaInfo.getVolume()).decDfsUsed(replicaState.bpid, blockFileUsed);
((FsVolumeImpl) replicaInfo.getVolume()).decDfsUsed(bpid, blockFileUsed);
if (metaFile.delete() || !metaFile.exists()) {
((FsVolumeImpl) replicaInfo.getVolume()).decDfsUsed(replicaState.bpid, metaFileUsed);
((FsVolumeImpl) replicaInfo.getVolume()).decDfsUsed(bpid, metaFileUsed);
}
}
@ -2499,7 +2508,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
// Sleep if we have no more work to do or if it looks like we are not
// making any forward progress. This is to ensure that if all persist
// operations are failing we don't keep retrying them in a tight loop.
if (numSuccessiveFailures >= lazyWriteReplicaTracker.numReplicasNotPersisted()) {
if (numSuccessiveFailures >= ramDiskReplicaTracker.numReplicasNotPersisted()) {
Thread.sleep(checkpointerInterval * 1000);
numSuccessiveFailures = 0;
}

View File

@ -235,9 +235,6 @@ public class FsVolumeImpl implements FsVolumeSpi {
@Override
public void reserveSpaceForRbw(long bytesToReserve) {
if (bytesToReserve != 0) {
if (FsDatasetImpl.LOG.isDebugEnabled()) {
FsDatasetImpl.LOG.debug("Reserving " + bytesToReserve + " on volume " + getBasePath());
}
reservedForRbw.addAndGet(bytesToReserve);
}
}
@ -245,9 +242,6 @@ public class FsVolumeImpl implements FsVolumeSpi {
@Override
public void releaseReservedSpace(long bytesToRelease) {
if (bytesToRelease != 0) {
if (FsDatasetImpl.LOG.isDebugEnabled()) {
FsDatasetImpl.LOG.debug("Releasing " + bytesToRelease + " on volume " + getBasePath());
}
long oldReservation, newReservation;
do {
@ -298,17 +292,17 @@ public class FsVolumeImpl implements FsVolumeSpi {
}
void getVolumeMap(ReplicaMap volumeMap,
final LazyWriteReplicaTracker lazyWriteReplicaMap)
final RamDiskReplicaTracker ramDiskReplicaMap)
throws IOException {
for(BlockPoolSlice s : bpSlices.values()) {
s.getVolumeMap(volumeMap, lazyWriteReplicaMap);
s.getVolumeMap(volumeMap, ramDiskReplicaMap);
}
}
void getVolumeMap(String bpid, ReplicaMap volumeMap,
final LazyWriteReplicaTracker lazyWriteReplicaMap)
final RamDiskReplicaTracker ramDiskReplicaMap)
throws IOException {
getBlockPoolSlice(bpid).getVolumeMap(volumeMap, lazyWriteReplicaMap);
getBlockPoolSlice(bpid).getVolumeMap(volumeMap, ramDiskReplicaMap);
}
@Override

View File

@ -121,7 +121,7 @@ class FsVolumeList {
void getAllVolumesMap(final String bpid,
final ReplicaMap volumeMap,
final LazyWriteReplicaTracker lazyWriteReplicaMap)
final RamDiskReplicaTracker ramDiskReplicaMap)
throws IOException {
long totalStartTime = Time.monotonicNow();
final List<IOException> exceptions = Collections.synchronizedList(
@ -134,7 +134,7 @@ class FsVolumeList {
FsDatasetImpl.LOG.info("Adding replicas to map for block pool " +
bpid + " on volume " + v + "...");
long startTime = Time.monotonicNow();
v.getVolumeMap(bpid, volumeMap, lazyWriteReplicaMap);
v.getVolumeMap(bpid, volumeMap, ramDiskReplicaMap);
long timeTaken = Time.monotonicNow() - startTime;
FsDatasetImpl.LOG.info("Time to add replicas to map for block pool"
+ " " + bpid + " on volume " + v + ": " + timeTaken + "ms");

View File

@ -1,268 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import com.google.common.collect.TreeMultimap;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import java.io.File;
import java.util.*;
class LazyWriteReplicaTracker {
enum State {
IN_MEMORY,
LAZY_PERSIST_IN_PROGRESS,
LAZY_PERSIST_COMPLETE,
}
static class ReplicaState implements Comparable<ReplicaState> {
final String bpid;
final long blockId;
State state;
/**
* transient storage volume that holds the original replica.
*/
final FsVolumeSpi transientVolume;
/**
* Persistent volume that holds or will hold the saved replica.
*/
FsVolumeImpl lazyPersistVolume;
File savedMetaFile;
File savedBlockFile;
ReplicaState(final String bpid, final long blockId, FsVolumeSpi transientVolume) {
this.bpid = bpid;
this.blockId = blockId;
this.transientVolume = transientVolume;
state = State.IN_MEMORY;
lazyPersistVolume = null;
savedMetaFile = null;
savedBlockFile = null;
}
void deleteSavedFiles() {
try {
if (savedBlockFile != null) {
savedBlockFile.delete();
savedBlockFile = null;
}
if (savedMetaFile != null) {
savedMetaFile.delete();
savedMetaFile = null;
}
} catch (Throwable t) {
// Ignore any exceptions.
}
}
@Override
public String toString() {
return "[Bpid=" + bpid + ";blockId=" + blockId + "]";
}
@Override
public int hashCode() {
return bpid.hashCode() ^ (int) blockId;
}
@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}
if (other == null || getClass() != other.getClass()) {
return false;
}
ReplicaState otherState = (ReplicaState) other;
return (otherState.bpid.equals(bpid) && otherState.blockId == blockId);
}
@Override
public int compareTo(ReplicaState other) {
if (blockId == other.blockId) {
return 0;
} else if (blockId < other.blockId) {
return -1;
} else {
return 1;
}
}
}
final FsDatasetImpl fsDataset;
/**
* Map of blockpool ID to map of blockID to ReplicaInfo.
*/
final Map<String, Map<Long, ReplicaState>> replicaMaps;
/**
* Queue of replicas that need to be written to disk.
* Stale entries are GC'd by dequeueNextReplicaToPersist.
*/
final Queue<ReplicaState> replicasNotPersisted;
/**
* Queue of replicas in the order in which they were persisted.
* We'll dequeue them in the same order.
* We can improve the eviction scheme later.
* Stale entries are GC'd by getNextCandidateForEviction.
*/
final Queue<ReplicaState> replicasPersisted;
LazyWriteReplicaTracker(final FsDatasetImpl fsDataset) {
this.fsDataset = fsDataset;
replicaMaps = new HashMap<String, Map<Long, ReplicaState>>();
replicasNotPersisted = new LinkedList<ReplicaState>();
replicasPersisted = new LinkedList<ReplicaState>();
}
synchronized void addReplica(String bpid, long blockId,
final FsVolumeSpi transientVolume) {
Map<Long, ReplicaState> map = replicaMaps.get(bpid);
if (map == null) {
map = new HashMap<Long, ReplicaState>();
replicaMaps.put(bpid, map);
}
ReplicaState replicaState = new ReplicaState(bpid, blockId, transientVolume);
map.put(blockId, replicaState);
replicasNotPersisted.add(replicaState);
}
synchronized void recordStartLazyPersist(
final String bpid, final long blockId, FsVolumeImpl checkpointVolume) {
Map<Long, ReplicaState> map = replicaMaps.get(bpid);
ReplicaState replicaState = map.get(blockId);
replicaState.state = State.LAZY_PERSIST_IN_PROGRESS;
replicaState.lazyPersistVolume = checkpointVolume;
}
/**
* @param bpid
* @param blockId
* @param savedFiles The saved meta and block files, in that order.
*/
synchronized void recordEndLazyPersist(
final String bpid, final long blockId, final File[] savedFiles) {
Map<Long, ReplicaState> map = replicaMaps.get(bpid);
ReplicaState replicaState = map.get(blockId);
if (replicaState == null) {
throw new IllegalStateException("Unknown replica bpid=" +
bpid + "; blockId=" + blockId);
}
replicaState.state = State.LAZY_PERSIST_COMPLETE;
replicaState.savedMetaFile = savedFiles[0];
replicaState.savedBlockFile = savedFiles[1];
if (replicasNotPersisted.peek() == replicaState) {
// Common case.
replicasNotPersisted.remove();
} else {
// Should never occur in practice as lazy writer always persists
// the replica at the head of the queue before moving to the next
// one.
replicasNotPersisted.remove(replicaState);
}
replicasPersisted.add(replicaState);
}
synchronized ReplicaState dequeueNextReplicaToPersist() {
while (replicasNotPersisted.size() != 0) {
ReplicaState replicaState = replicasNotPersisted.remove();
Map<Long, ReplicaState> replicaMap = replicaMaps.get(replicaState.bpid);
if (replicaMap != null && replicaMap.get(replicaState.blockId) != null) {
return replicaState;
}
// The replica no longer exists, look for the next one.
}
return null;
}
synchronized void reenqueueReplicaNotPersisted(final ReplicaState replicaState) {
replicasNotPersisted.add(replicaState);
}
synchronized void reenqueueReplicaPersisted(final ReplicaState replicaState) {
replicasPersisted.add(replicaState);
}
synchronized int numReplicasNotPersisted() {
return replicasNotPersisted.size();
}
synchronized ReplicaState getNextCandidateForEviction() {
while (replicasPersisted.size() != 0) {
ReplicaState replicaState = replicasPersisted.remove();
Map<Long, ReplicaState> replicaMap = replicaMaps.get(replicaState.bpid);
if (replicaMap != null && replicaMap.get(replicaState.blockId) != null) {
return replicaState;
}
// The replica no longer exists, look for the next one.
}
return null;
}
void discardReplica(ReplicaState replicaState, boolean deleteSavedCopies) {
discardReplica(replicaState.bpid, replicaState.blockId, deleteSavedCopies);
}
/**
* Discard any state we are tracking for the given replica. This could mean
* the block is either deleted from the block space or the replica is no longer
* on transient storage.
*
* @param deleteSavedCopies true if we should delete the saved copies on
* persistent storage. This should be set by the
* caller when the block is no longer needed.
*/
synchronized void discardReplica(
final String bpid, final long blockId,
boolean deleteSavedCopies) {
Map<Long, ReplicaState> map = replicaMaps.get(bpid);
if (map == null) {
return;
}
ReplicaState replicaState = map.get(blockId);
if (replicaState == null) {
return;
}
if (deleteSavedCopies) {
replicaState.deleteSavedFiles();
}
map.remove(blockId);
}
}

View File

@ -0,0 +1,208 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import com.google.common.collect.TreeMultimap;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import java.io.File;
import java.util.*;
/**
* An implementation of RamDiskReplicaTracker that uses an LRU
* eviction scheme.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class RamDiskReplicaLruTracker extends RamDiskReplicaTracker {
private class RamDiskReplicaLru extends RamDiskReplica {
long lastUsedTime;
private RamDiskReplicaLru(String bpid, long blockId, FsVolumeImpl ramDiskVolume) {
super(bpid, blockId, ramDiskVolume);
}
}
/**
* Map of blockpool ID to <map of blockID to ReplicaInfo>.
*/
Map<String, Map<Long, RamDiskReplicaLru>> replicaMaps;
/**
* Queue of replicas that need to be written to disk.
* Stale entries are GC'd by dequeueNextReplicaToPersist.
*/
Queue<RamDiskReplicaLru> replicasNotPersisted;
/**
* Map of persisted replicas ordered by their last use times.
*/
TreeMultimap<Long, RamDiskReplicaLru> replicasPersisted;
RamDiskReplicaLruTracker() {
replicaMaps = new HashMap<String, Map<Long, RamDiskReplicaLru>>();
replicasNotPersisted = new LinkedList<RamDiskReplicaLru>();
replicasPersisted = TreeMultimap.create();
}
@Override
synchronized void addReplica(final String bpid, final long blockId,
final FsVolumeImpl transientVolume) {
Map<Long, RamDiskReplicaLru> map = replicaMaps.get(bpid);
if (map == null) {
map = new HashMap<Long, RamDiskReplicaLru>();
replicaMaps.put(bpid, map);
}
RamDiskReplicaLru ramDiskReplicaLru = new RamDiskReplicaLru(bpid, blockId, transientVolume);
map.put(blockId, ramDiskReplicaLru);
replicasNotPersisted.add(ramDiskReplicaLru);
}
@Override
synchronized void touch(final String bpid,
final long blockId) {
Map<Long, RamDiskReplicaLru> map = replicaMaps.get(bpid);
RamDiskReplicaLru ramDiskReplicaLru = map.get(blockId);
if (ramDiskReplicaLru == null) {
return;
}
// Reinsert the replica with its new timestamp.
if (replicasPersisted.remove(ramDiskReplicaLru.lastUsedTime, ramDiskReplicaLru)) {
ramDiskReplicaLru.lastUsedTime = System.currentTimeMillis();
replicasPersisted.put(ramDiskReplicaLru.lastUsedTime, ramDiskReplicaLru);
}
}
@Override
synchronized void recordStartLazyPersist(
final String bpid, final long blockId, FsVolumeImpl checkpointVolume) {
Map<Long, RamDiskReplicaLru> map = replicaMaps.get(bpid);
RamDiskReplicaLru ramDiskReplicaLru = map.get(blockId);
ramDiskReplicaLru.setLazyPersistVolume(checkpointVolume);
}
@Override
synchronized void recordEndLazyPersist(
final String bpid, final long blockId, final File[] savedFiles) {
Map<Long, RamDiskReplicaLru> map = replicaMaps.get(bpid);
RamDiskReplicaLru ramDiskReplicaLru = map.get(blockId);
if (ramDiskReplicaLru == null) {
throw new IllegalStateException("Unknown replica bpid=" +
bpid + "; blockId=" + blockId);
}
ramDiskReplicaLru.recordSavedBlockFiles(savedFiles);
if (replicasNotPersisted.peek() == ramDiskReplicaLru) {
// Common case.
replicasNotPersisted.remove();
} else {
// Caller error? Fallback to O(n) removal.
replicasNotPersisted.remove(ramDiskReplicaLru);
}
ramDiskReplicaLru.lastUsedTime = System.currentTimeMillis();
replicasPersisted.put(ramDiskReplicaLru.lastUsedTime, ramDiskReplicaLru);
}
@Override
synchronized RamDiskReplicaLru dequeueNextReplicaToPersist() {
while (replicasNotPersisted.size() != 0) {
RamDiskReplicaLru ramDiskReplicaLru = replicasNotPersisted.remove();
Map<Long, RamDiskReplicaLru> replicaMap =
replicaMaps.get(ramDiskReplicaLru.getBlockPoolId());
if (replicaMap != null && replicaMap.get(ramDiskReplicaLru.getBlockId()) != null) {
return ramDiskReplicaLru;
}
// The replica no longer exists, look for the next one.
}
return null;
}
@Override
synchronized void reenqueueReplicaNotPersisted(final RamDiskReplica ramDiskReplicaLru) {
replicasNotPersisted.add((RamDiskReplicaLru) ramDiskReplicaLru);
}
@Override
synchronized int numReplicasNotPersisted() {
return replicasNotPersisted.size();
}
@Override
synchronized RamDiskReplicaLru getNextCandidateForEviction() {
Iterator it = replicasPersisted.values().iterator();
while (it.hasNext()) {
RamDiskReplicaLru ramDiskReplicaLru = (RamDiskReplicaLru) it.next();
it.remove();
Map<Long, RamDiskReplicaLru> replicaMap =
replicaMaps.get(ramDiskReplicaLru.getBlockPoolId());
if (replicaMap != null && replicaMap.get(ramDiskReplicaLru.getBlockId()) != null) {
return ramDiskReplicaLru;
}
// The replica no longer exists, look for the next one.
}
return null;
}
/**
* Discard any state we are tracking for the given replica. This could mean
* the block is either deleted from the block space or the replica is no longer
* on transient storage.
*
* @param deleteSavedCopies true if we should delete the saved copies on
* persistent storage. This should be set by the
* caller when the block is no longer needed.
*/
@Override
synchronized void discardReplica(
final String bpid, final long blockId,
boolean deleteSavedCopies) {
Map<Long, RamDiskReplicaLru> map = replicaMaps.get(bpid);
if (map == null) {
return;
}
RamDiskReplicaLru ramDiskReplicaLru = map.get(blockId);
if (ramDiskReplicaLru == null) {
return;
}
if (deleteSavedCopies) {
ramDiskReplicaLru.deleteSavedFiles();
}
map.remove(blockId);
replicasPersisted.remove(ramDiskReplicaLru.lastUsedTime, ramDiskReplicaLru);
// replicasNotPersisted will be lazily GC'ed.
}
}

View File

@ -0,0 +1,245 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import com.google.common.base.Preconditions;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.util.ReflectionUtils;
import java.io.File;
@InterfaceAudience.Private
@InterfaceStability.Unstable
public abstract class RamDiskReplicaTracker {
FsDatasetImpl fsDataset;
static class RamDiskReplica implements Comparable<RamDiskReplica> {
private final String bpid;
private final long blockId;
private File savedBlockFile;
private File savedMetaFile;
/**
* RAM_DISK volume that holds the original replica.
*/
final FsVolumeSpi ramDiskVolume;
/**
* Persistent volume that holds or will hold the saved replica.
*/
FsVolumeImpl lazyPersistVolume;
RamDiskReplica(final String bpid, final long blockId,
final FsVolumeImpl ramDiskVolume) {
this.bpid = bpid;
this.blockId = blockId;
this.ramDiskVolume = ramDiskVolume;
lazyPersistVolume = null;
savedMetaFile = null;
savedBlockFile = null;
}
long getBlockId() {
return blockId;
}
String getBlockPoolId() {
return bpid;
}
FsVolumeImpl getLazyPersistVolume() {
return lazyPersistVolume;
}
void setLazyPersistVolume(FsVolumeImpl volume) {
Preconditions.checkState(!volume.isTransientStorage());
this.lazyPersistVolume = volume;
}
File getSavedBlockFile() {
return savedBlockFile;
}
File getSavedMetaFile() {
return savedMetaFile;
}
/**
* Record the saved meta and block files on the given volume.
*
* @param files Meta and block files, in that order.
*/
void recordSavedBlockFiles(File[] files) {
this.savedMetaFile = files[0];
this.savedBlockFile = files[1];
}
@Override
public int hashCode() {
return bpid.hashCode() ^ (int) blockId;
}
@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}
if (other == null || getClass() != other.getClass()) {
return false;
}
RamDiskReplica otherState = (RamDiskReplica) other;
return (otherState.bpid.equals(bpid) && otherState.blockId == blockId);
}
// Delete the saved meta and block files. Failure to delete can be
// ignored, the directory scanner will retry the deletion later.
void deleteSavedFiles() {
try {
if (savedBlockFile != null) {
savedBlockFile.delete();
savedBlockFile = null;
}
if (savedMetaFile != null) {
savedMetaFile.delete();
savedMetaFile = null;
}
} catch (Throwable t) {
// Ignore any exceptions.
}
}
@Override
public int compareTo(RamDiskReplica other) {
int bpidResult = bpid.compareTo(other.bpid);
if (bpidResult == 0)
if (blockId == other.blockId) {
return 0;
} else if (blockId < other.blockId) {
return -1;
} else {
return 1;
}
return bpidResult;
}
@Override
public String toString() {
return "[BlockPoolID=" + bpid + "; BlockId=" + blockId + "]";
}
}
/**
* Get an instance of the configured RamDiskReplicaTracker based on the
* the configuration property
* {@link org.apache.hadoop.hdfs.DFSConfigKeys#DFS_DATANODE_RAM_DISK_REPLICA_TRACKER_KEY}.
*
* @param conf the configuration to be used
* @param dataset the FsDataset object.
* @return an instance of RamDiskReplicaTracker
*/
static RamDiskReplicaTracker getInstance(final Configuration conf,
final FsDatasetImpl fsDataset) {
final Class<? extends RamDiskReplicaTracker> trackerClass = conf.getClass(
DFSConfigKeys.DFS_DATANODE_RAM_DISK_REPLICA_TRACKER_KEY,
DFSConfigKeys.DFS_DATANODE_RAM_DISK_REPLICA_TRACKER_DEFAULT,
RamDiskReplicaTracker.class);
final RamDiskReplicaTracker tracker = ReflectionUtils.newInstance(
trackerClass, conf);
tracker.initialize(fsDataset);
return tracker;
}
void initialize(final FsDatasetImpl fsDataset) {
this.fsDataset = fsDataset;
}
/**
* Start tracking a new finalized replica on RAM disk.
*
* @param transientVolume RAM disk volume that stores the replica.
*/
abstract void addReplica(final String bpid, final long blockId,
final FsVolumeImpl transientVolume);
/**
* Invoked when a replica is opened by a client. This may be used as
* a heuristic by the eviction scheme.
*/
abstract void touch(final String bpid, final long blockId);
/**
* Get the next replica to write to persistent storage.
*/
abstract RamDiskReplica dequeueNextReplicaToPersist();
/**
* Invoked if a replica that was previously dequeued for persistence
* could not be successfully persisted. Add it back so it can be retried
* later.
*/
abstract void reenqueueReplicaNotPersisted(
final RamDiskReplica ramDiskReplica);
/**
* Invoked when the Lazy persist operation is started by the DataNode.
* @param checkpointVolume
*/
abstract void recordStartLazyPersist(
final String bpid, final long blockId, FsVolumeImpl checkpointVolume);
/**
* Invoked when the Lazy persist operation is complete.
*
* @param savedFiles The saved meta and block files, in that order.
*/
abstract void recordEndLazyPersist(
final String bpid, final long blockId, final File[] savedFiles);
/**
* Return a candidate replica to remove from RAM Disk. The exact replica
* to be returned may depend on the eviction scheme utilized.
*
* @return
*/
abstract RamDiskReplica getNextCandidateForEviction();
/**
* Return the number of replicas pending persistence to disk.
*/
abstract int numReplicasNotPersisted();
/**
* Discard all state we are tracking for the given replica.
*/
abstract void discardReplica(
final String bpid, final long blockId,
boolean deleteSavedCopies);
void discardReplica(RamDiskReplica replica, boolean deleteSavedCopies) {
discardReplica(replica.getBlockPoolId(), replica.getBlockId(), deleteSavedCopies);
}
}

View File

@ -30,7 +30,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
public class FsDatasetTestUtil {
public static File getFile(FsDatasetSpi<?> fsd, String bpid, long bid) {
return ((FsDatasetImpl)fsd).getFile(bpid, bid);
return ((FsDatasetImpl)fsd).getFile(bpid, bid, false);
}
public static File getBlockFile(FsDatasetSpi<?> fsd, String bpid, Block b

View File

@ -71,7 +71,7 @@ public class TestLazyPersistFiles {
private static final int THREADPOOL_SIZE = 10;
private static final short REPL_FACTOR = 1;
private static final int BLOCK_SIZE = 10485760; // 10 MB
private static final int BLOCK_SIZE = 5 * 1024 * 1024;
private static final int LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC = 3;
private static final long HEARTBEAT_INTERVAL_SEC = 1;
private static final int HEARTBEAT_RECHECK_INTERVAL_MSEC = 500;
@ -449,34 +449,51 @@ public class TestLazyPersistFiles {
* @throws InterruptedException
*/
@Test (timeout=300000)
public void testRamDiskEvictionLRU()
public void testRamDiskEvictionIsLru()
throws IOException, InterruptedException {
startUpCluster(true, 3);
final int NUM_PATHS = 5;
startUpCluster(true, NUM_PATHS + EVICTION_LOW_WATERMARK);
final String METHOD_NAME = GenericTestUtils.getMethodName();
final int NUM_PATHS = 6;
Path paths[] = new Path[NUM_PATHS];
Path paths[] = new Path[NUM_PATHS * 2];
for (int i = 0; i < NUM_PATHS; i++) {
for (int i = 0; i < paths.length; i++) {
paths[i] = new Path("/" + METHOD_NAME + "." + i +".dat");
}
// No eviction for the first half of files
for (int i = 0; i < NUM_PATHS/2; i++) {
for (int i = 0; i < NUM_PATHS; i++) {
makeTestFile(paths[i], BLOCK_SIZE, true);
}
// Sleep for a short time to allow the lazy writer thread to do its job.
Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
for (int i = 0; i < NUM_PATHS; ++i) {
ensureFileReplicasOnStorageType(paths[i], RAM_DISK);
}
// Lazy persist writer persists the first half of files
Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
// Open the files for read in a random order.
ArrayList<Integer> indexes = new ArrayList<Integer>(NUM_PATHS);
for (int i = 0; i < NUM_PATHS; ++i) {
indexes.add(i);
}
Collections.shuffle(indexes);
// Create the second half of files with eviction upon each create.
for (int i = NUM_PATHS/2; i < NUM_PATHS; i++) {
makeTestFile(paths[i], BLOCK_SIZE, true);
ensureFileReplicasOnStorageType(paths[i], RAM_DISK);
for (int i = 0; i < NUM_PATHS; ++i) {
LOG.info("Touching file " + paths[indexes.get(i)]);
DFSTestUtil.readFile(fs, paths[indexes.get(i)]);
}
// path[i-NUM_PATHS/2] is expected to be evicted by LRU
// Create an equal number of new files ensuring that the previous
// files are evicted in the same order they were read.
for (int i = 0; i < NUM_PATHS; ++i) {
makeTestFile(paths[i + NUM_PATHS], BLOCK_SIZE, true);
triggerBlockReport();
ensureFileReplicasOnStorageType(paths[i - NUM_PATHS / 2], DEFAULT);
Thread.sleep(3000);
ensureFileReplicasOnStorageType(paths[i + NUM_PATHS], RAM_DISK);
ensureFileReplicasOnStorageType(paths[indexes.get(i)], DEFAULT);
for (int j = i + 1; j < NUM_PATHS; ++j) {
ensureFileReplicasOnStorageType(paths[indexes.get(j)], RAM_DISK);
}
}
}