HDFS-6926. DN support for saving replicas to persistent storage and evicting in-memory replicas. (Arpit Agarwal)
This commit is contained in:
parent
a317bd7b02
commit
eb448e1439
|
@ -13,3 +13,6 @@
|
|||
HDFS-6925. DataNode should attempt to place replicas on transient storage
|
||||
first if lazyPersist flag is received. (Arpit Agarwal)
|
||||
|
||||
HDFS-6926. DN support for saving replicas to persistent storage and
|
||||
evicting in-memory replicas. (Arpit Agarwal)
|
||||
|
||||
|
|
|
@ -123,6 +123,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
|||
public static final long DFS_DATANODE_MAX_LOCKED_MEMORY_DEFAULT = 0;
|
||||
public static final String DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_KEY = "dfs.datanode.fsdatasetcache.max.threads.per.volume";
|
||||
public static final int DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_DEFAULT = 4;
|
||||
public static final String DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC = "dfs.datanode.lazywriter.interval.sec";
|
||||
public static final int DFS_DATANODE_LAZY_WRITER_INTERVAL_DEFAULT_SEC = 60;
|
||||
public static final String DFS_NAMENODE_PATH_BASED_CACHE_BLOCK_MAP_ALLOCATION_PERCENT =
|
||||
"dfs.namenode.path.based.cache.block.map.allocation.percent";
|
||||
public static final float DFS_NAMENODE_PATH_BASED_CACHE_BLOCK_MAP_ALLOCATION_PERCENT_DEFAULT = 0.25f;
|
||||
|
@ -227,6 +229,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
|||
public static final float DFS_NAMENODE_EDIT_LOG_AUTOROLL_MULTIPLIER_THRESHOLD_DEFAULT = 2.0f;
|
||||
public static final String DFS_NAMENODE_EDIT_LOG_AUTOROLL_CHECK_INTERVAL_MS = "dfs.namenode.edit.log.autoroll.check.interval.ms";
|
||||
public static final int DFS_NAMENODE_EDIT_LOG_AUTOROLL_CHECK_INTERVAL_MS_DEFAULT = 5*60*1000;
|
||||
|
||||
public static final String DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC = "dfs.namenode.lazypersist.file.scrub.interval.sec";
|
||||
public static final int DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC_DEFAULT = 5 * 60;
|
||||
|
||||
public static final String DFS_NAMENODE_EDITS_NOEDITLOGCHANNELFLUSH = "dfs.namenode.edits.noeditlogchannelflush";
|
||||
public static final boolean DFS_NAMENODE_EDITS_NOEDITLOGCHANNELFLUSH_DEFAULT = false;
|
||||
|
|
|
@ -191,7 +191,7 @@ class BlockPoolSliceScanner {
|
|||
+ hours + " hours for block pool " + bpid);
|
||||
|
||||
// get the list of blocks and arrange them in random order
|
||||
List<FinalizedReplica> arr = dataset.getFinalizedBlocks(blockPoolId);
|
||||
List<FinalizedReplica> arr = dataset.getFinalizedBlocksOnPersistentStorage(blockPoolId);
|
||||
Collections.shuffle(arr);
|
||||
|
||||
long scanTime = -1;
|
||||
|
|
|
@ -81,6 +81,7 @@ public class DataStorage extends Storage {
|
|||
final static String STORAGE_DIR_DETACHED = "detach";
|
||||
public final static String STORAGE_DIR_RBW = "rbw";
|
||||
public final static String STORAGE_DIR_FINALIZED = "finalized";
|
||||
public final static String STORAGE_DIR_LAZY_PERSIST = "lazypersist";
|
||||
public final static String STORAGE_DIR_TMP = "tmp";
|
||||
|
||||
// Set of bpids for which 'trash' is currently enabled.
|
||||
|
|
|
@ -225,7 +225,7 @@ public class ReplicaInPipeline extends ReplicaInfo
|
|||
}
|
||||
}
|
||||
} else {
|
||||
// for create, we can use the requested checksum
|
||||
// for create, we can use the requested checksum
|
||||
checksum = requestedChecksum;
|
||||
}
|
||||
|
||||
|
|
|
@ -138,8 +138,7 @@ public class AvailableSpaceVolumeChoosingPolicy<V extends FsVolumeSpi>
|
|||
if (mostAvailableAmongLowVolumes < replicaSize ||
|
||||
random.nextFloat() < scaledPreferencePercent) {
|
||||
volume = roundRobinPolicyHighAvailable.chooseVolume(
|
||||
highAvailableVolumes,
|
||||
replicaSize);
|
||||
highAvailableVolumes, replicaSize);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Volumes are imbalanced. Selecting " + volume +
|
||||
" from high available space volumes for write of block size "
|
||||
|
@ -147,8 +146,7 @@ public class AvailableSpaceVolumeChoosingPolicy<V extends FsVolumeSpi>
|
|||
}
|
||||
} else {
|
||||
volume = roundRobinPolicyLowAvailable.chooseVolume(
|
||||
lowAvailableVolumes,
|
||||
replicaSize);
|
||||
lowAvailableVolumes, replicaSize);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Volumes are imbalanced. Selecting " + volume +
|
||||
" from low available space volumes for write of block size "
|
||||
|
|
|
@ -113,6 +113,9 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
|
|||
/** @return a list of finalized blocks for the given block pool. */
|
||||
public List<FinalizedReplica> getFinalizedBlocks(String bpid);
|
||||
|
||||
/** @return a list of finalized blocks for the given block pool. */
|
||||
public List<FinalizedReplica> getFinalizedBlocksOnPersistentStorage(String bpid);
|
||||
|
||||
/**
|
||||
* Check whether the in-memory block record matches the block on the disk,
|
||||
* and, in case that they are not matched, update the record or mark it
|
||||
|
|
|
@ -28,6 +28,7 @@ import java.io.InputStream;
|
|||
import java.io.RandomAccessFile;
|
||||
import java.util.Scanner;
|
||||
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.DU;
|
||||
import org.apache.hadoop.fs.FileUtil;
|
||||
|
@ -61,6 +62,7 @@ class BlockPoolSlice {
|
|||
private final File currentDir; // StorageDirectory/current/bpid/current
|
||||
// directory where finalized replicas are stored
|
||||
private final File finalizedDir;
|
||||
private final File lazypersistDir;
|
||||
private final File rbwDir; // directory store RBW replica
|
||||
private final File tmpDir; // directory store Temporary replica
|
||||
private static final String DU_CACHE_FILE = "dfsUsed";
|
||||
|
@ -85,12 +87,24 @@ class BlockPoolSlice {
|
|||
this.currentDir = new File(bpDir, DataStorage.STORAGE_DIR_CURRENT);
|
||||
this.finalizedDir = new File(
|
||||
currentDir, DataStorage.STORAGE_DIR_FINALIZED);
|
||||
this.lazypersistDir = new File(currentDir, DataStorage.STORAGE_DIR_LAZY_PERSIST);
|
||||
if (!this.finalizedDir.exists()) {
|
||||
if (!this.finalizedDir.mkdirs()) {
|
||||
throw new IOException("Failed to mkdirs " + this.finalizedDir);
|
||||
}
|
||||
}
|
||||
|
||||
// Delete all checkpointed replicas on startup.
|
||||
// TODO: We can move checkpointed replicas to the finalized dir and delete
|
||||
// the copy on RAM_DISK. For now we take the simpler approach.
|
||||
|
||||
FileUtil.fullyDelete(lazypersistDir);
|
||||
if (!this.lazypersistDir.exists()) {
|
||||
if (!this.lazypersistDir.mkdirs()) {
|
||||
throw new IOException("Failed to mkdirs " + this.lazypersistDir);
|
||||
}
|
||||
}
|
||||
|
||||
// Files that were being written when the datanode was last shutdown
|
||||
// are now moved back to the data directory. It is possible that
|
||||
// in the future, we might want to do some sort of datanode-local
|
||||
|
@ -136,6 +150,10 @@ class BlockPoolSlice {
|
|||
return finalizedDir;
|
||||
}
|
||||
|
||||
File getLazypersistDir() {
|
||||
return lazypersistDir;
|
||||
}
|
||||
|
||||
File getRbwDir() {
|
||||
return rbwDir;
|
||||
}
|
||||
|
@ -252,12 +270,37 @@ class BlockPoolSlice {
|
|||
dfsUsage.incDfsUsed(b.getNumBytes()+metaFile.length());
|
||||
return blockFile;
|
||||
}
|
||||
|
||||
|
||||
File lazyPersistReplica(Block b, File f) throws IOException {
|
||||
File blockFile = FsDatasetImpl.copyBlockFiles(b, f, lazypersistDir);
|
||||
File metaFile = FsDatasetUtil.getMetaFile(blockFile, b.getGenerationStamp());
|
||||
dfsUsage.incDfsUsed(b.getNumBytes() + metaFile.length());
|
||||
return blockFile;
|
||||
}
|
||||
|
||||
/**
|
||||
* Move a persisted replica from lazypersist directory to a subdirectory
|
||||
* under finalized.
|
||||
*/
|
||||
File activateSavedReplica(Block b, File blockFile) throws IOException {
|
||||
final File blockDir = DatanodeUtil.idToBlockDir(finalizedDir, b.getBlockId());
|
||||
final File metaFile = FsDatasetUtil.getMetaFile(blockFile, b.getGenerationStamp());
|
||||
final File targetBlockFile = new File(blockDir, blockFile.getName());
|
||||
final File targetMetaFile = new File(blockDir, metaFile.getName());
|
||||
FileUtils.moveFile(blockFile, targetBlockFile);
|
||||
FsDatasetImpl.LOG.info("Moved " + blockFile + " to " + targetBlockFile);
|
||||
FileUtils.moveFile(metaFile, targetMetaFile);
|
||||
FsDatasetImpl.LOG.info("Moved " + metaFile + " to " + targetMetaFile);
|
||||
return targetBlockFile;
|
||||
}
|
||||
|
||||
void checkDirs() throws DiskErrorException {
|
||||
DiskChecker.checkDirs(finalizedDir);
|
||||
DiskChecker.checkDir(tmpDir);
|
||||
DiskChecker.checkDir(rbwDir);
|
||||
}
|
||||
|
||||
|
||||
|
||||
void getVolumeMap(ReplicaMap volumeMap) throws IOException {
|
||||
// add finalized replicas
|
||||
|
|
|
@ -27,12 +27,7 @@ import java.io.InputStream;
|
|||
import java.io.RandomAccessFile;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.FileChannel;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.Executor;
|
||||
|
||||
import javax.management.NotCompliantMBeanException;
|
||||
|
@ -88,6 +83,7 @@ import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
|
|||
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
||||
import org.apache.hadoop.io.nativeio.NativeIO;
|
||||
import org.apache.hadoop.metrics2.util.MBeans;
|
||||
import org.apache.hadoop.util.Daemon;
|
||||
import org.apache.hadoop.util.DataChecksum;
|
||||
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
|
||||
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
|
||||
|
@ -111,7 +107,6 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
@Override // FsDatasetSpi
|
||||
public List<FsVolumeImpl> getVolumes() {
|
||||
return volumes.volumes;
|
||||
|
@ -204,11 +199,17 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
final FsVolumeList volumes;
|
||||
final Map<String, DatanodeStorage> storageMap;
|
||||
final FsDatasetAsyncDiskService asyncDiskService;
|
||||
final Daemon lazyWriter;
|
||||
final FsDatasetCache cacheManager;
|
||||
private final Configuration conf;
|
||||
private final int validVolsRequired;
|
||||
private volatile boolean fsRunning;
|
||||
|
||||
final ReplicaMap volumeMap;
|
||||
final LazyWriteReplicaTracker lazyWriteReplicaTracker;
|
||||
|
||||
private static final int MAX_BLOCK_EVICTIONS_PER_ITERATION = 3;
|
||||
|
||||
|
||||
// Used for synchronizing access to usage stats
|
||||
private final Object statsLock = new Object();
|
||||
|
@ -218,6 +219,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
*/
|
||||
FsDatasetImpl(DataNode datanode, DataStorage storage, Configuration conf
|
||||
) throws IOException {
|
||||
this.fsRunning = true;
|
||||
this.datanode = datanode;
|
||||
this.dataStorage = storage;
|
||||
this.conf = conf;
|
||||
|
@ -248,6 +250,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
|
||||
storageMap = new HashMap<String, DatanodeStorage>();
|
||||
volumeMap = new ReplicaMap(this);
|
||||
lazyWriteReplicaTracker = new LazyWriteReplicaTracker(this);
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
final VolumeChoosingPolicy<FsVolumeImpl> blockChooserImpl =
|
||||
ReflectionUtils.newInstance(conf.getClass(
|
||||
|
@ -257,11 +261,17 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
volumes = new FsVolumeList(volsFailed, blockChooserImpl);
|
||||
asyncDiskService = new FsDatasetAsyncDiskService(datanode);
|
||||
|
||||
// TODO: Initialize transientReplicaTracker from blocks on disk.
|
||||
|
||||
for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
|
||||
addVolume(dataLocations, storage.getStorageDir(idx));
|
||||
}
|
||||
|
||||
cacheManager = new FsDatasetCache(this);
|
||||
lazyWriter = new Daemon(new LazyWriter(
|
||||
conf.getInt(DFSConfigKeys.DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC,
|
||||
DFSConfigKeys.DFS_DATANODE_LAZY_WRITER_INTERVAL_DEFAULT_SEC)));
|
||||
lazyWriter.start();
|
||||
registerMBean(datanode.getDatanodeUuid());
|
||||
}
|
||||
|
||||
|
@ -531,8 +541,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
return new ReplicaInputStreams(blockInFile.getFD(), metaInFile.getFD());
|
||||
}
|
||||
|
||||
static File moveBlockFiles(Block b, File srcfile, File destdir
|
||||
) throws IOException {
|
||||
static File moveBlockFiles(Block b, File srcfile, File destdir)
|
||||
throws IOException {
|
||||
final File dstfile = new File(destdir, b.getBlockName());
|
||||
final File srcmeta = FsDatasetUtil.getMetaFile(srcfile, b.getGenerationStamp());
|
||||
final File dstmeta = FsDatasetUtil.getMetaFile(dstfile, b.getGenerationStamp());
|
||||
|
@ -555,6 +565,30 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
return dstfile;
|
||||
}
|
||||
|
||||
static File copyBlockFiles(Block b, File srcfile, File destdir)
|
||||
throws IOException {
|
||||
final File dstfile = new File(destdir, b.getBlockName());
|
||||
final File srcmeta = FsDatasetUtil.getMetaFile(srcfile, b.getGenerationStamp());
|
||||
final File dstmeta = FsDatasetUtil.getMetaFile(dstfile, b.getGenerationStamp());
|
||||
try {
|
||||
FileUtils.copyFile(srcmeta, dstmeta);
|
||||
} catch (IOException e) {
|
||||
throw new IOException("Failed to copy meta file for " + b
|
||||
+ " from " + srcmeta + " to " + dstmeta, e);
|
||||
}
|
||||
try {
|
||||
FileUtils.copyFile(srcfile, dstfile);
|
||||
} catch (IOException e) {
|
||||
throw new IOException("Failed to copy block file for " + b
|
||||
+ " from " + srcfile + " to " + dstfile.getAbsolutePath(), e);
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("addBlock: Moved " + srcmeta + " to " + dstmeta
|
||||
+ " and " + srcfile + " to " + dstfile);
|
||||
}
|
||||
return dstfile;
|
||||
}
|
||||
|
||||
static private void truncateBlock(File blockFile, File metaFile,
|
||||
long oldlen, long newlen) throws IOException {
|
||||
LOG.info("truncateBlock: blockFile=" + blockFile
|
||||
|
@ -817,6 +851,83 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Attempt to evict one or more transient block replicas we have at least
|
||||
* spaceNeeded bytes free.
|
||||
*
|
||||
* @return true if we were able to free up at least spaceNeeded bytes, false
|
||||
* otherwise.
|
||||
*/
|
||||
private boolean tryToEvictBlocks(final String bpid, final long spaceNeeded)
|
||||
throws IOException {
|
||||
|
||||
boolean isAvailable = false;
|
||||
|
||||
LOG.info("Attempting to evict blocks from transient storage");
|
||||
|
||||
// Reverse the map so we can iterate in order of replica creation times,
|
||||
// evicting oldest replicas one at a time until we have sufficient space.
|
||||
TreeMultimap<Long, LazyWriteReplicaTracker.ReplicaState> lruMap =
|
||||
lazyWriteReplicaTracker.getLruMap();
|
||||
int blocksEvicted = 0;
|
||||
|
||||
// TODO: It is really inefficient to do this with the Object lock held!
|
||||
// TODO: This logic is here just for prototyping.
|
||||
// TODO: We should replace it with proactive discard when ram_disk free space
|
||||
// TODO: falls below a low watermark. That way we avoid fs operations on the
|
||||
// TODO: hot path with the lock held.
|
||||
synchronized (this) {
|
||||
long currentTime = System.currentTimeMillis() / 1000;
|
||||
for (Map.Entry<Long, LazyWriteReplicaTracker.ReplicaState> entry : lruMap.entries()) {
|
||||
LazyWriteReplicaTracker.ReplicaState lazyWriteReplica = entry.getValue();
|
||||
LOG.info("RAM_DISK: Evicting blockId=" + lazyWriteReplica.blockId +
|
||||
"; block LMT=" + entry.getKey() +
|
||||
"; currentTime=" + currentTime);
|
||||
ReplicaInfo replicaInfo = getReplicaInfo(bpid, lazyWriteReplica.blockId);
|
||||
Preconditions.checkState(replicaInfo.getVolume().isTransientStorage());
|
||||
File blockFile = replicaInfo.getBlockFile();
|
||||
File metaFile = replicaInfo.getMetaFile();
|
||||
long used = blockFile.length() + metaFile.length();
|
||||
lazyWriteReplicaTracker.discardReplica(bpid, entry.getValue().blockId, false);
|
||||
|
||||
// Move the persisted replica to the finalized directory of
|
||||
// the target volume.
|
||||
BlockPoolSlice bpSlice =
|
||||
lazyWriteReplica.lazyPersistVolume.getBlockPoolSlice(bpid);
|
||||
File newBlockFile = bpSlice.activateSavedReplica(
|
||||
replicaInfo, lazyWriteReplica.savedBlockFile);
|
||||
|
||||
ReplicaInfo newReplicaInfo =
|
||||
new FinalizedReplica(replicaInfo.getBlockId(),
|
||||
replicaInfo.getBytesOnDisk(),
|
||||
replicaInfo.getGenerationStamp(),
|
||||
lazyWriteReplica.lazyPersistVolume,
|
||||
newBlockFile.getParentFile());
|
||||
|
||||
// Update the volumeMap entry. This removes the old entry.
|
||||
volumeMap.add(bpid, newReplicaInfo);
|
||||
|
||||
// Remove the old replicas.
|
||||
blockFile.delete();
|
||||
metaFile.delete();
|
||||
((FsVolumeImpl) replicaInfo.getVolume()).decDfsUsed(bpid, used);
|
||||
++blocksEvicted;
|
||||
|
||||
if (replicaInfo.getVolume().getAvailable() > spaceNeeded) {
|
||||
LOG.info("RAM_DISK: freed up " + spaceNeeded + " bytes for new block");
|
||||
isAvailable = true;
|
||||
break;
|
||||
}
|
||||
|
||||
if (blocksEvicted == MAX_BLOCK_EVICTIONS_PER_ITERATION) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return isAvailable;
|
||||
}
|
||||
|
||||
@Override // FsDatasetSpi
|
||||
public synchronized ReplicaInPipeline createRbw(StorageType storageType,
|
||||
ExtendedBlock b, boolean allowLazyPersist) throws IOException {
|
||||
|
@ -839,7 +950,13 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
}
|
||||
} catch (DiskOutOfSpaceException de) {
|
||||
if (allowLazyPersist) {
|
||||
allowLazyPersist = false;
|
||||
if (!tryToEvictBlocks(b.getBlockPoolId(), b.getNumBytes())) {
|
||||
// Eviction did not work, we'll just fallback to DEFAULT storage.
|
||||
LOG.info("RAM_DISK: Failed to free up " + b.getNumBytes() +
|
||||
" bytes for new block. Will fallback to DEFAULT " +
|
||||
"storage");
|
||||
allowLazyPersist = false;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
throw de;
|
||||
|
@ -851,6 +968,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(b.getBlockId(),
|
||||
b.getGenerationStamp(), v, f.getParentFile());
|
||||
volumeMap.add(b.getBlockPoolId(), newReplicaInfo);
|
||||
|
||||
return newReplicaInfo;
|
||||
}
|
||||
|
||||
|
@ -988,7 +1106,6 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
ReplicaInPipeline newReplicaInfo = new ReplicaInPipeline(b.getBlockId(),
|
||||
b.getGenerationStamp(), v, f.getParentFile());
|
||||
volumeMap.add(b.getBlockPoolId(), newReplicaInfo);
|
||||
|
||||
return newReplicaInfo;
|
||||
}
|
||||
|
||||
|
@ -1054,8 +1171,17 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
|
||||
File dest = v.addBlock(bpid, replicaInfo, f);
|
||||
newReplicaInfo = new FinalizedReplica(replicaInfo, v, dest.getParentFile());
|
||||
|
||||
if (v.isTransientStorage()) {
|
||||
lazyWriteReplicaTracker.addReplica(bpid, replicaInfo.getBlockId(), v);
|
||||
|
||||
// Schedule a checkpoint.
|
||||
((LazyWriter) lazyWriter.getRunnable())
|
||||
.addReplicaToLazyWriteQueue(bpid, replicaInfo.getBlockId());
|
||||
}
|
||||
}
|
||||
volumeMap.add(bpid, newReplicaInfo);
|
||||
|
||||
return newReplicaInfo;
|
||||
}
|
||||
|
||||
|
@ -1075,6 +1201,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
replicaInfo.getMetaFile(), b.getLocalBlock())) {
|
||||
LOG.warn("Block " + b + " unfinalized and removed. " );
|
||||
}
|
||||
if (replicaInfo.getVolume().isTransientStorage()) {
|
||||
lazyWriteReplicaTracker.discardReplica(b.getBlockPoolId(), b.getBlockId(), true);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1170,6 +1299,22 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
return finalized;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the list of finalized blocks from in-memory blockmap for a block pool.
|
||||
*/
|
||||
@Override
|
||||
public synchronized List<FinalizedReplica> getFinalizedBlocksOnPersistentStorage(String bpid) {
|
||||
ArrayList<FinalizedReplica> finalized =
|
||||
new ArrayList<FinalizedReplica>(volumeMap.size(bpid));
|
||||
for (ReplicaInfo b : volumeMap.replicas(bpid)) {
|
||||
if(!b.getVolume().isTransientStorage() &&
|
||||
b.getState() == ReplicaState.FINALIZED) {
|
||||
finalized.add(new FinalizedReplica((FinalizedReplica)b));
|
||||
}
|
||||
}
|
||||
return finalized;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check whether the given block is a valid one.
|
||||
* valid means finalized
|
||||
|
@ -1287,6 +1432,10 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
volumeMap.remove(bpid, invalidBlks[i]);
|
||||
}
|
||||
|
||||
if (v.isTransientStorage()) {
|
||||
lazyWriteReplicaTracker.discardReplica(bpid, invalidBlks[i].getBlockId(), true);
|
||||
}
|
||||
|
||||
// If a DFSClient has the replica in its cache of short-circuit file
|
||||
// descriptors (and the client is using ShortCircuitShm), invalidate it.
|
||||
datanode.getShortCircuitRegistry().processBlockInvalidation(
|
||||
|
@ -1482,8 +1631,14 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
|
||||
@Override // FsDatasetSpi
|
||||
public void shutdown() {
|
||||
if (mbeanName != null)
|
||||
fsRunning = false;
|
||||
|
||||
((LazyWriter) lazyWriter.getRunnable()).stop();
|
||||
lazyWriter.interrupt();
|
||||
|
||||
if (mbeanName != null) {
|
||||
MBeans.unregister(mbeanName);
|
||||
}
|
||||
|
||||
if (asyncDiskService != null) {
|
||||
asyncDiskService.shutdown();
|
||||
|
@ -1492,6 +1647,13 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
if(volumes != null) {
|
||||
volumes.shutdown();
|
||||
}
|
||||
|
||||
try {
|
||||
lazyWriter.join();
|
||||
} catch (InterruptedException ie) {
|
||||
LOG.warn("FsDatasetImpl.shutdown ignoring InterruptedException " +
|
||||
"from LazyWriter.join");
|
||||
}
|
||||
}
|
||||
|
||||
@Override // FSDatasetMBean
|
||||
|
@ -1524,7 +1686,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
*/
|
||||
@Override
|
||||
public void checkAndUpdate(String bpid, long blockId, File diskFile,
|
||||
File diskMetaFile, FsVolumeSpi vol) {
|
||||
File diskMetaFile, FsVolumeSpi vol) throws IOException {
|
||||
Block corruptBlock = null;
|
||||
ReplicaInfo memBlockInfo;
|
||||
synchronized (this) {
|
||||
|
@ -1557,6 +1719,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
if (blockScanner != null) {
|
||||
blockScanner.deleteBlock(bpid, new Block(blockId));
|
||||
}
|
||||
if (vol.isTransientStorage()) {
|
||||
lazyWriteReplicaTracker.discardReplica(bpid, blockId, true);
|
||||
}
|
||||
LOG.warn("Removed block " + blockId
|
||||
+ " from memory with missing block file on the disk");
|
||||
// Finally remove the metadata file
|
||||
|
@ -1580,6 +1745,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
if (blockScanner != null) {
|
||||
blockScanner.addBlock(new ExtendedBlock(bpid, diskBlockInfo));
|
||||
}
|
||||
if (vol.isTransientStorage()) {
|
||||
lazyWriteReplicaTracker.addReplica(bpid, blockId, (FsVolumeImpl) vol);
|
||||
}
|
||||
LOG.warn("Added missing block to memory " + diskBlockInfo);
|
||||
return;
|
||||
}
|
||||
|
@ -1757,9 +1925,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
final String bpid = oldBlock.getBlockPoolId();
|
||||
final ReplicaInfo replica = volumeMap.get(bpid, oldBlock.getBlockId());
|
||||
LOG.info("updateReplica: " + oldBlock
|
||||
+ ", recoveryId=" + recoveryId
|
||||
+ ", length=" + newlength
|
||||
+ ", replica=" + replica);
|
||||
+ ", recoveryId=" + recoveryId
|
||||
+ ", length=" + newlength
|
||||
+ ", replica=" + replica);
|
||||
|
||||
//check replica
|
||||
if (replica == null) {
|
||||
|
@ -2019,5 +2187,123 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
asyncDiskService.submitSyncFileRangeRequest(fsVolumeImpl, fd, offset,
|
||||
nbytes, flags);
|
||||
}
|
||||
|
||||
private static class BlockIdPair {
|
||||
final String bpid;
|
||||
final long blockId;
|
||||
|
||||
BlockIdPair(final String bpid, final long blockId) {
|
||||
this.bpid = bpid;
|
||||
this.blockId = blockId;
|
||||
}
|
||||
}
|
||||
|
||||
private class LazyWriter implements Runnable {
|
||||
private volatile boolean shouldRun = true;
|
||||
final int checkpointerInterval;
|
||||
|
||||
final private Queue<BlockIdPair> blocksPendingCheckpoint;
|
||||
|
||||
public LazyWriter(final int checkpointerInterval) {
|
||||
this.checkpointerInterval = checkpointerInterval;
|
||||
blocksPendingCheckpoint = new LinkedList<BlockIdPair>();
|
||||
}
|
||||
|
||||
// Schedule a replica for writing to persistent storage.
|
||||
public synchronized void addReplicaToLazyWriteQueue(
|
||||
String bpid, long blockId) {
|
||||
LOG.info("Block with blockId=" + blockId + "; bpid=" + bpid + " added to lazy writer queue");
|
||||
blocksPendingCheckpoint.add(new BlockIdPair(bpid, blockId));
|
||||
}
|
||||
|
||||
private void moveReplicaToNewVolume(String bpid, long blockId)
|
||||
throws IOException {
|
||||
|
||||
LOG.info("LazyWriter invoked to save blockId=" + blockId + "; bpid=" + bpid);
|
||||
|
||||
FsVolumeImpl targetVolume = null;
|
||||
Block block = null;
|
||||
File blockFile = null;
|
||||
|
||||
synchronized (this) {
|
||||
block = getStoredBlock(bpid, blockId);
|
||||
blockFile = getFile(bpid, blockId);
|
||||
|
||||
if (block == null) {
|
||||
// The block was deleted before it could be checkpointed.
|
||||
return;
|
||||
}
|
||||
|
||||
// Pick a target volume for the block.
|
||||
targetVolume = volumes.getNextVolume(
|
||||
StorageType.DEFAULT, block.getNumBytes());
|
||||
}
|
||||
|
||||
LOG.info("LazyWriter starting to save blockId=" + blockId + "; bpid=" + bpid);
|
||||
lazyWriteReplicaTracker.recordStartLazyPersist(bpid, blockId, targetVolume);
|
||||
File savedBlockFile = targetVolume.getBlockPoolSlice(bpid)
|
||||
.lazyPersistReplica(block, blockFile);
|
||||
lazyWriteReplicaTracker.recordEndLazyPersist(bpid, blockId, savedBlockFile);
|
||||
LOG.info("LazyWriter finished saving blockId=" + blockId + "; bpid=" + bpid +
|
||||
" to file " + savedBlockFile);
|
||||
}
|
||||
|
||||
/**
|
||||
* Checkpoint a pending replica to persistent storage now.
|
||||
* @return true if there is more work to be done, false otherwise.
|
||||
*/
|
||||
private boolean saveNextReplica() {
|
||||
BlockIdPair blockIdPair = null;
|
||||
int moreWorkThreshold = 0;
|
||||
|
||||
try {
|
||||
synchronized (this) {
|
||||
// Dequeue the next replica waiting to be checkpointed.
|
||||
blockIdPair = blocksPendingCheckpoint.poll();
|
||||
if (blockIdPair == null) {
|
||||
LOG.info("LazyWriter has no blocks to persist. " +
|
||||
"Thread going to sleep.");
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
// Move the replica outside the lock.
|
||||
moveReplicaToNewVolume(blockIdPair.bpid, blockIdPair.blockId);
|
||||
|
||||
} catch(IOException ioe) {
|
||||
// If we failed, put the block on the queue and let a retry
|
||||
// interval elapse before we try again so we don't try to keep
|
||||
// checkpointing the same block in a tight loop.
|
||||
synchronized (this) {
|
||||
blocksPendingCheckpoint.add(blockIdPair);
|
||||
++moreWorkThreshold;
|
||||
}
|
||||
}
|
||||
|
||||
synchronized (this) {
|
||||
return blocksPendingCheckpoint.size() > moreWorkThreshold;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
while (fsRunning && shouldRun) {
|
||||
try {
|
||||
if (!saveNextReplica()) {
|
||||
Thread.sleep(checkpointerInterval * 1000);
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
LOG.info("LazyWriter was interrupted, exiting");
|
||||
break;
|
||||
} catch (Exception e) {
|
||||
LOG.error("Ignoring exception in LazyWriter:", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void stop() {
|
||||
shouldRun = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -328,6 +328,8 @@ public class FsVolumeImpl implements FsVolumeSpi {
|
|||
File bpCurrentDir = new File(bpDir, DataStorage.STORAGE_DIR_CURRENT);
|
||||
File finalizedDir = new File(bpCurrentDir,
|
||||
DataStorage.STORAGE_DIR_FINALIZED);
|
||||
File lazypersistDir = new File(bpCurrentDir,
|
||||
DataStorage.STORAGE_DIR_LAZY_PERSIST);
|
||||
File rbwDir = new File(bpCurrentDir, DataStorage.STORAGE_DIR_RBW);
|
||||
if (force) {
|
||||
FileUtil.fullyDelete(bpDir);
|
||||
|
@ -339,6 +341,10 @@ public class FsVolumeImpl implements FsVolumeSpi {
|
|||
!FileUtil.fullyDelete(finalizedDir)) {
|
||||
throw new IOException("Failed to delete " + finalizedDir);
|
||||
}
|
||||
if (!DatanodeUtil.dirNoFilesRecursive(lazypersistDir) ||
|
||||
!FileUtil.fullyDelete(lazypersistDir)) {
|
||||
throw new IOException("Failed to delete " + lazypersistDir);
|
||||
}
|
||||
FileUtil.fullyDelete(tmpDir);
|
||||
for (File f : FileUtil.listFiles(bpCurrentDir)) {
|
||||
if (!f.delete()) {
|
||||
|
|
|
@ -0,0 +1,177 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
|
||||
|
||||
|
||||
import com.google.common.collect.Multimap;
|
||||
import com.google.common.collect.TreeMultimap;
|
||||
|
||||
import java.io.File;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
class LazyWriteReplicaTracker {
|
||||
|
||||
enum State {
|
||||
IN_MEMORY,
|
||||
LAZY_PERSIST_IN_PROGRESS,
|
||||
LAZY_PERSIST_COMPLETE,
|
||||
}
|
||||
|
||||
static class ReplicaState implements Comparable<ReplicaState> {
|
||||
|
||||
final String bpid;
|
||||
final long blockId;
|
||||
State state;
|
||||
|
||||
/**
|
||||
* transient storage volume that holds the original replica.
|
||||
*/
|
||||
final FsVolumeImpl transientVolume;
|
||||
|
||||
/**
|
||||
* Persistent volume that holds or will hold the saved replica.
|
||||
*/
|
||||
FsVolumeImpl lazyPersistVolume;
|
||||
File savedBlockFile;
|
||||
|
||||
ReplicaState(final String bpid, final long blockId, FsVolumeImpl transientVolume) {
|
||||
this.bpid = bpid;
|
||||
this.blockId = blockId;
|
||||
this.transientVolume = transientVolume;
|
||||
state = State.IN_MEMORY;
|
||||
lazyPersistVolume = null;
|
||||
savedBlockFile = null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return bpid.hashCode() ^ (int) blockId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object other) {
|
||||
if (this == other) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (other == null || getClass() != other.getClass()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
ReplicaState otherState = (ReplicaState) other;
|
||||
return (otherState.bpid.equals(bpid) && otherState.blockId == blockId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compareTo(ReplicaState other) {
|
||||
if (blockId == other.blockId) {
|
||||
return 0;
|
||||
} else if (blockId < other.blockId) {
|
||||
return -1;
|
||||
} else {
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
final FsDatasetImpl fsDataset;
|
||||
|
||||
/**
|
||||
* Map of blockpool ID to map of blockID to ReplicaInfo.
|
||||
*/
|
||||
final Map<String, Map<Long, ReplicaState>> replicaMaps;
|
||||
|
||||
/**
|
||||
* A map of blockId to persist complete time for transient blocks. This allows
|
||||
* us to evict LRU blocks from transient storage. Protected by 'this'
|
||||
* Object lock.
|
||||
*/
|
||||
final Map<ReplicaState, Long> persistTimeMap;
|
||||
|
||||
LazyWriteReplicaTracker(final FsDatasetImpl fsDataset) {
|
||||
this.fsDataset = fsDataset;
|
||||
replicaMaps = new HashMap<String, Map<Long, ReplicaState>>();
|
||||
persistTimeMap = new HashMap<ReplicaState, Long>();
|
||||
}
|
||||
|
||||
TreeMultimap<Long, ReplicaState> getLruMap() {
|
||||
// TODO: This can be made more efficient.
|
||||
TreeMultimap<Long, ReplicaState> reversedMap = TreeMultimap.create();
|
||||
for (Map.Entry<ReplicaState, Long> entry : persistTimeMap.entrySet()) {
|
||||
reversedMap.put(entry.getValue(), entry.getKey());
|
||||
}
|
||||
return reversedMap;
|
||||
}
|
||||
|
||||
synchronized void addReplica(String bpid, long blockId,
|
||||
final FsVolumeImpl transientVolume) {
|
||||
Map<Long, ReplicaState> map = replicaMaps.get(bpid);
|
||||
if (map == null) {
|
||||
map = new HashMap<Long, ReplicaState>();
|
||||
replicaMaps.put(bpid, map);
|
||||
}
|
||||
map.put(blockId, new ReplicaState(bpid, blockId, transientVolume));
|
||||
}
|
||||
|
||||
synchronized void recordStartLazyPersist(
|
||||
final String bpid, final long blockId, FsVolumeImpl checkpointVolume) {
|
||||
Map<Long, ReplicaState> map = replicaMaps.get(bpid);
|
||||
ReplicaState replicaState = map.get(blockId);
|
||||
replicaState.state = State.LAZY_PERSIST_IN_PROGRESS;
|
||||
replicaState.lazyPersistVolume = checkpointVolume;
|
||||
}
|
||||
|
||||
synchronized void recordEndLazyPersist(
|
||||
final String bpid, final long blockId, File savedBlockFile) {
|
||||
Map<Long, ReplicaState> map = replicaMaps.get(bpid);
|
||||
ReplicaState replicaState = map.get(blockId);
|
||||
|
||||
if (replicaState == null) {
|
||||
throw new IllegalStateException("Unknown replica bpid=" +
|
||||
bpid + "; blockId=" + blockId);
|
||||
}
|
||||
replicaState.state = State.LAZY_PERSIST_COMPLETE;
|
||||
replicaState.savedBlockFile = savedBlockFile;
|
||||
persistTimeMap.put(replicaState, System.currentTimeMillis() / 1000);
|
||||
}
|
||||
|
||||
synchronized void discardReplica(
|
||||
final String bpid, final long blockId, boolean force) {
|
||||
Map<Long, ReplicaState> map = replicaMaps.get(bpid);
|
||||
ReplicaState replicaState = map.get(blockId);
|
||||
|
||||
if (replicaState == null) {
|
||||
if (force) {
|
||||
return;
|
||||
}
|
||||
throw new IllegalStateException("Unknown replica bpid=" +
|
||||
bpid + "; blockId=" + blockId);
|
||||
}
|
||||
|
||||
if (replicaState.state != State.LAZY_PERSIST_COMPLETE && !force) {
|
||||
throw new IllegalStateException("Discarding replica without " +
|
||||
"saving it to disk bpid=" + bpid + "; blockId=" + blockId);
|
||||
|
||||
}
|
||||
|
||||
map.remove(blockId);
|
||||
persistTimeMap.remove(replicaState);
|
||||
}
|
||||
}
|
|
@ -1111,6 +1111,11 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
|
|||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<FinalizedReplica> getFinalizedBlocksOnPersistentStorage(String bpid) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Object> getVolumeInfoMap() {
|
||||
throw new UnsupportedOperationException();
|
||||
|
|
|
@ -35,6 +35,7 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.fs.CreateFlag;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.StorageType;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
||||
|
|
Loading…
Reference in New Issue