HDFS-6960. Bugfix in LazyWriter, fix test case and some refactoring. (Arpit Agarwal)

This commit is contained in:
arp 2014-08-28 23:05:32 -07:00
parent 7e32be8768
commit 4cf9afacbe
6 changed files with 141 additions and 104 deletions

View File

@ -24,3 +24,6 @@
HDFS-6928. 'hdfs put' command should accept lazyPersist flag for testing. HDFS-6928. 'hdfs put' command should accept lazyPersist flag for testing.
(Arpit Agarwal) (Arpit Agarwal)
HDFS-6960. Bugfix in LazyWriter, fix test case and some refactoring.
(Arpit Agarwal)

View File

@ -271,10 +271,13 @@ class BlockPoolSlice {
return blockFile; return blockFile;
} }
File lazyPersistReplica(Block b, File f) throws IOException { File lazyPersistReplica(ReplicaInfo replicaInfo) throws IOException {
File blockFile = FsDatasetImpl.copyBlockFiles(b, f, lazypersistDir); if (!lazypersistDir.exists() && !lazypersistDir.mkdirs()) {
File metaFile = FsDatasetUtil.getMetaFile(blockFile, b.getGenerationStamp()); FsDatasetImpl.LOG.warn("Failed to create " + lazypersistDir);
dfsUsage.incDfsUsed(b.getNumBytes() + metaFile.length()); }
File metaFile = FsDatasetImpl.copyBlockFiles(replicaInfo, lazypersistDir);
File blockFile = Block.metaToBlockFile(metaFile);
dfsUsage.incDfsUsed(replicaInfo.getNumBytes() + metaFile.length());
return blockFile; return blockFile;
} }

View File

@ -58,6 +58,7 @@ import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.hdfs.server.datanode.DataBlockScanner; import org.apache.hadoop.hdfs.server.datanode.DataBlockScanner;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataStorage; import org.apache.hadoop.hdfs.server.datanode.DataStorage;
import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica; import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
import org.apache.hadoop.hdfs.server.datanode.Replica; import org.apache.hadoop.hdfs.server.datanode.Replica;
import org.apache.hadoop.hdfs.server.datanode.ReplicaAlreadyExistsException; import org.apache.hadoop.hdfs.server.datanode.ReplicaAlreadyExistsException;
@ -565,28 +566,33 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
return dstfile; return dstfile;
} }
static File copyBlockFiles(Block b, File srcfile, File destdir) /**
* Copy the block and meta files for the given block from the given
* @return the new meta file.
* @throws IOException
*/
static File copyBlockFiles(ReplicaInfo replicaInfo, File destRoot)
throws IOException { throws IOException {
final File dstfile = new File(destdir, b.getBlockName()); final File destDir = DatanodeUtil.idToBlockDir(destRoot, replicaInfo.getBlockId());
final File srcmeta = FsDatasetUtil.getMetaFile(srcfile, b.getGenerationStamp()); final File dstFile = new File(destDir, replicaInfo.getBlockName());
final File dstmeta = FsDatasetUtil.getMetaFile(dstfile, b.getGenerationStamp()); final File dstMeta = FsDatasetUtil.getMetaFile(dstFile, replicaInfo.getGenerationStamp());
final File srcMeta = replicaInfo.getMetaFile();
final File srcFile = replicaInfo.getBlockFile();
try { try {
FileUtils.copyFile(srcmeta, dstmeta); FileUtils.copyFile(srcMeta, dstMeta);
} catch (IOException e) { } catch (IOException e) {
throw new IOException("Failed to copy meta file for " + b throw new IOException("Failed to copy " + srcMeta + " to " + dstMeta, e);
+ " from " + srcmeta + " to " + dstmeta, e);
} }
try { try {
FileUtils.copyFile(srcfile, dstfile); FileUtils.copyFile(srcFile, dstFile);
} catch (IOException e) { } catch (IOException e) {
throw new IOException("Failed to copy block file for " + b throw new IOException("Failed to copy " + srcFile + " to " + dstFile, e);
+ " from " + srcfile + " to " + dstfile.getAbsolutePath(), e);
} }
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("addBlock: Moved " + srcmeta + " to " + dstmeta LOG.debug("addBlock: Moved " + srcMeta + " to " + dstMeta);
+ " and " + srcfile + " to " + dstfile); LOG.debug("addBlock: Moved " + srcFile + " to " + dstFile);
} }
return dstfile; return dstMeta;
} }
static private void truncateBlock(File blockFile, File metaFile, static private void truncateBlock(File blockFile, File metaFile,
@ -1174,10 +1180,6 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
if (v.isTransientStorage()) { if (v.isTransientStorage()) {
lazyWriteReplicaTracker.addReplica(bpid, replicaInfo.getBlockId(), v); lazyWriteReplicaTracker.addReplica(bpid, replicaInfo.getBlockId(), v);
// Schedule a checkpoint.
((LazyWriter) lazyWriter.getRunnable())
.addReplicaToLazyWriteQueue(bpid, replicaInfo.getBlockId());
} }
} }
volumeMap.add(bpid, newReplicaInfo); volumeMap.add(bpid, newReplicaInfo);
@ -2188,32 +2190,12 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
nbytes, flags); nbytes, flags);
} }
private static class BlockIdPair { class LazyWriter implements Runnable {
final String bpid;
final long blockId;
BlockIdPair(final String bpid, final long blockId) {
this.bpid = bpid;
this.blockId = blockId;
}
}
private class LazyWriter implements Runnable {
private volatile boolean shouldRun = true; private volatile boolean shouldRun = true;
final int checkpointerInterval; final int checkpointerInterval;
final private Queue<BlockIdPair> blocksPendingCheckpoint;
public LazyWriter(final int checkpointerInterval) { public LazyWriter(final int checkpointerInterval) {
this.checkpointerInterval = checkpointerInterval; this.checkpointerInterval = checkpointerInterval;
blocksPendingCheckpoint = new LinkedList<BlockIdPair>();
}
// Schedule a replica for writing to persistent storage.
public synchronized void addReplicaToLazyWriteQueue(
String bpid, long blockId) {
LOG.info("Block with blockId=" + blockId + "; bpid=" + bpid + " added to lazy writer queue");
blocksPendingCheckpoint.add(new BlockIdPair(bpid, blockId));
} }
private void moveReplicaToNewVolume(String bpid, long blockId) private void moveReplicaToNewVolume(String bpid, long blockId)
@ -2221,76 +2203,85 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
LOG.info("LazyWriter invoked to save blockId=" + blockId + "; bpid=" + bpid); LOG.info("LazyWriter invoked to save blockId=" + blockId + "; bpid=" + bpid);
FsVolumeImpl targetVolume = null; FsVolumeImpl targetVolume;
Block block = null; ReplicaInfo replicaInfo;
File blockFile = null;
synchronized (this) { synchronized (this) {
block = getStoredBlock(bpid, blockId); replicaInfo = volumeMap.get(bpid, blockId);
blockFile = getFile(bpid, blockId);
if (block == null) { if (replicaInfo == null || !replicaInfo.getVolume().isTransientStorage()) {
// The block was deleted before it could be checkpointed. // The block was either deleted before it could be checkpointed or
// it is already on persistent storage. This can occur if a second
// replica on persistent storage was found after the lazy write was
// scheduled.
return; return;
} }
// Pick a target volume for the block. // Pick a target volume for the block.
targetVolume = volumes.getNextVolume( targetVolume = volumes.getNextVolume(
StorageType.DEFAULT, block.getNumBytes()); StorageType.DEFAULT, replicaInfo.getNumBytes());
}
if (LOG.isDebugEnabled()) {
LOG.debug("LazyWriter starting to save blockId=" + blockId + "; bpid=" + bpid);
} }
LOG.info("LazyWriter starting to save blockId=" + blockId + "; bpid=" + bpid);
lazyWriteReplicaTracker.recordStartLazyPersist(bpid, blockId, targetVolume); lazyWriteReplicaTracker.recordStartLazyPersist(bpid, blockId, targetVolume);
File savedBlockFile = targetVolume.getBlockPoolSlice(bpid) File savedBlockFile = targetVolume.getBlockPoolSlice(bpid)
.lazyPersistReplica(block, blockFile); .lazyPersistReplica(replicaInfo);
lazyWriteReplicaTracker.recordEndLazyPersist(bpid, blockId, savedBlockFile); lazyWriteReplicaTracker.recordEndLazyPersist(bpid, blockId, savedBlockFile);
LOG.info("LazyWriter finished saving blockId=" + blockId + "; bpid=" + bpid +
" to file " + savedBlockFile); if (LOG.isDebugEnabled()) {
LOG.debug("LazyWriter finished saving blockId=" + blockId + "; bpid=" + bpid +
" to file " + savedBlockFile);
}
} }
/** /**
* Checkpoint a pending replica to persistent storage now. * Checkpoint a pending replica to persistent storage now.
* If we fail then move the replica to the end of the queue.
* @return true if there is more work to be done, false otherwise. * @return true if there is more work to be done, false otherwise.
*/ */
private boolean saveNextReplica() { private boolean saveNextReplica() {
BlockIdPair blockIdPair = null; LazyWriteReplicaTracker.ReplicaState replicaState = null;
int moreWorkThreshold = 0; boolean succeeded = false;
try { try {
synchronized (this) { synchronized (this) {
// Dequeue the next replica waiting to be checkpointed. replicaState = lazyWriteReplicaTracker.dequeueNextReplicaToPersist();
blockIdPair = blocksPendingCheckpoint.poll(); if (replicaState == null) {
if (blockIdPair == null) {
LOG.info("LazyWriter has no blocks to persist. " +
"Thread going to sleep.");
return false; return false;
} }
} }
// Move the replica outside the lock. // Move the replica outside the lock.
moveReplicaToNewVolume(blockIdPair.bpid, blockIdPair.blockId); moveReplicaToNewVolume(replicaState.bpid, replicaState.blockId);
succeeded = true;
} catch(IOException ioe) { } catch(IOException ioe) {
// If we failed, put the block on the queue and let a retry LOG.warn("Exception saving replica " + replicaState, ioe);
// interval elapse before we try again so we don't try to keep } finally {
// checkpointing the same block in a tight loop. if (!succeeded && replicaState != null) {
synchronized (this) { lazyWriteReplicaTracker.reenqueueReplica(replicaState);
blocksPendingCheckpoint.add(blockIdPair);
++moreWorkThreshold;
} }
} }
synchronized (this) { return succeeded;
return blocksPendingCheckpoint.size() > moreWorkThreshold;
}
} }
@Override @Override
public void run() { public void run() {
int numSuccessiveFailures = 0;
while (fsRunning && shouldRun) { while (fsRunning && shouldRun) {
try { try {
if (!saveNextReplica()) { numSuccessiveFailures = saveNextReplica() ? 0 : (numSuccessiveFailures + 1);
// Sleep if we have no more work to do or if it looks like we are not
// making any forward progress. This is to ensure that if all persist
// operations are failing we don't keep retrying them in a tight loop.
if (numSuccessiveFailures == lazyWriteReplicaTracker.numReplicasNotPersisted()) {
Thread.sleep(checkpointerInterval * 1000); Thread.sleep(checkpointerInterval * 1000);
numSuccessiveFailures = 0;
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
LOG.info("LazyWriter was interrupted, exiting"); LOG.info("LazyWriter was interrupted, exiting");

View File

@ -255,22 +255,6 @@ public class FsVolumeImpl implements FsVolumeSpi {
getBlockPoolSlice(bpid).getVolumeMap(volumeMap); getBlockPoolSlice(bpid).getVolumeMap(volumeMap);
} }
/**
* Add replicas under the given directory to the volume map
* @param volumeMap the replicas map
* @param dir an input directory
* @param isFinalized true if the directory has finalized replicas;
* false if the directory has rbw replicas
* @throws IOException
*/
void addToReplicasMap(String bpid, ReplicaMap volumeMap,
File dir, boolean isFinalized) throws IOException {
BlockPoolSlice bp = getBlockPoolSlice(bpid);
// TODO move this up
// dfsUsage.incDfsUsed(b.getNumBytes()+metaFile.length());
bp.addToReplicasMap(volumeMap, dir, isFinalized);
}
@Override @Override
public String toString() { public String toString() {
return currentDir.getAbsolutePath(); return currentDir.getAbsolutePath();

View File

@ -19,12 +19,11 @@
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import com.google.common.collect.Multimap;
import com.google.common.collect.TreeMultimap; import com.google.common.collect.TreeMultimap;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import java.io.File; import java.io.File;
import java.util.HashMap; import java.util.*;
import java.util.Map;
class LazyWriteReplicaTracker { class LazyWriteReplicaTracker {
@ -43,7 +42,7 @@ class LazyWriteReplicaTracker {
/** /**
* transient storage volume that holds the original replica. * transient storage volume that holds the original replica.
*/ */
final FsVolumeImpl transientVolume; final FsVolumeSpi transientVolume;
/** /**
* Persistent volume that holds or will hold the saved replica. * Persistent volume that holds or will hold the saved replica.
@ -51,7 +50,7 @@ class LazyWriteReplicaTracker {
FsVolumeImpl lazyPersistVolume; FsVolumeImpl lazyPersistVolume;
File savedBlockFile; File savedBlockFile;
ReplicaState(final String bpid, final long blockId, FsVolumeImpl transientVolume) { ReplicaState(final String bpid, final long blockId, FsVolumeSpi transientVolume) {
this.bpid = bpid; this.bpid = bpid;
this.blockId = blockId; this.blockId = blockId;
this.transientVolume = transientVolume; this.transientVolume = transientVolume;
@ -60,6 +59,11 @@ class LazyWriteReplicaTracker {
savedBlockFile = null; savedBlockFile = null;
} }
@Override
public String toString() {
return "[Bpid=" + bpid + ";blockId=" + blockId + "]";
}
@Override @Override
public int hashCode() { public int hashCode() {
return bpid.hashCode() ^ (int) blockId; return bpid.hashCode() ^ (int) blockId;
@ -98,36 +102,44 @@ class LazyWriteReplicaTracker {
*/ */
final Map<String, Map<Long, ReplicaState>> replicaMaps; final Map<String, Map<Long, ReplicaState>> replicaMaps;
/**
* Queue of replicas that need to be written to disk.
*/
final Queue<ReplicaState> replicasNotPersisted;
/** /**
* A map of blockId to persist complete time for transient blocks. This allows * A map of blockId to persist complete time for transient blocks. This allows
* us to evict LRU blocks from transient storage. Protected by 'this' * us to evict LRU blocks from transient storage. Protected by 'this'
* Object lock. * Object lock.
*/ */
final Map<ReplicaState, Long> persistTimeMap; final Map<ReplicaState, Long> replicasPersisted;
LazyWriteReplicaTracker(final FsDatasetImpl fsDataset) { LazyWriteReplicaTracker(final FsDatasetImpl fsDataset) {
this.fsDataset = fsDataset; this.fsDataset = fsDataset;
replicaMaps = new HashMap<String, Map<Long, ReplicaState>>(); replicaMaps = new HashMap<String, Map<Long, ReplicaState>>();
persistTimeMap = new HashMap<ReplicaState, Long>(); replicasNotPersisted = new LinkedList<ReplicaState>();
replicasPersisted = new HashMap<ReplicaState, Long>();
} }
TreeMultimap<Long, ReplicaState> getLruMap() { TreeMultimap<Long, ReplicaState> getLruMap() {
// TODO: This can be made more efficient. // TODO: This can be made more efficient.
TreeMultimap<Long, ReplicaState> reversedMap = TreeMultimap.create(); TreeMultimap<Long, ReplicaState> reversedMap = TreeMultimap.create();
for (Map.Entry<ReplicaState, Long> entry : persistTimeMap.entrySet()) { for (Map.Entry<ReplicaState, Long> entry : replicasPersisted.entrySet()) {
reversedMap.put(entry.getValue(), entry.getKey()); reversedMap.put(entry.getValue(), entry.getKey());
} }
return reversedMap; return reversedMap;
} }
synchronized void addReplica(String bpid, long blockId, synchronized void addReplica(String bpid, long blockId,
final FsVolumeImpl transientVolume) { final FsVolumeSpi transientVolume) {
Map<Long, ReplicaState> map = replicaMaps.get(bpid); Map<Long, ReplicaState> map = replicaMaps.get(bpid);
if (map == null) { if (map == null) {
map = new HashMap<Long, ReplicaState>(); map = new HashMap<Long, ReplicaState>();
replicaMaps.put(bpid, map); replicaMaps.put(bpid, map);
} }
map.put(blockId, new ReplicaState(bpid, blockId, transientVolume)); ReplicaState replicaState = new ReplicaState(bpid, blockId, transientVolume);
map.put(blockId, replicaState);
replicasNotPersisted.add(replicaState);
} }
synchronized void recordStartLazyPersist( synchronized void recordStartLazyPersist(
@ -149,12 +161,49 @@ class LazyWriteReplicaTracker {
} }
replicaState.state = State.LAZY_PERSIST_COMPLETE; replicaState.state = State.LAZY_PERSIST_COMPLETE;
replicaState.savedBlockFile = savedBlockFile; replicaState.savedBlockFile = savedBlockFile;
persistTimeMap.put(replicaState, System.currentTimeMillis() / 1000);
if (replicasNotPersisted.peek() == replicaState) {
// Common case.
replicasNotPersisted.remove();
} else {
// Should never occur in practice as lazy writer always persists
// the replica at the head of the queue before moving to the next
// one.
replicasNotPersisted.remove(replicaState);
}
replicasPersisted.put(replicaState, System.currentTimeMillis() / 1000);
}
synchronized ReplicaState dequeueNextReplicaToPersist() {
while (replicasNotPersisted.size() != 0) {
ReplicaState replicaState = replicasNotPersisted.remove();
Map<Long, ReplicaState> replicaMap = replicaMaps.get(replicaState.bpid);
if (replicaMap != null && replicaMap.get(replicaState.blockId) != null) {
return replicaState;
}
// The replica no longer exists, look for the next one.
}
return null;
}
synchronized void reenqueueReplica(final ReplicaState replicaState) {
replicasNotPersisted.add(replicaState);
}
synchronized int numReplicasNotPersisted() {
return replicasNotPersisted.size();
} }
synchronized void discardReplica( synchronized void discardReplica(
final String bpid, final long blockId, boolean force) { final String bpid, final long blockId, boolean force) {
Map<Long, ReplicaState> map = replicaMaps.get(bpid); Map<Long, ReplicaState> map = replicaMaps.get(bpid);
if (map == null) {
return;
}
ReplicaState replicaState = map.get(blockId); ReplicaState replicaState = map.get(blockId);
if (replicaState == null) { if (replicaState == null) {
@ -172,6 +221,9 @@ class LazyWriteReplicaTracker {
} }
map.remove(blockId); map.remove(blockId);
persistTimeMap.remove(replicaState); replicasPersisted.remove(replicaState);
// Leave the replica in replicasNotPersisted if its present.
// dequeueNextReplicaToPersist will GC it eventually.
} }
} }

View File

@ -40,7 +40,9 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.*; import org.apache.hadoop.hdfs.protocol.*;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
@ -61,6 +63,7 @@ public class TestLazyPersistFiles {
static { static {
((Log4JLogger) NameNode.blockStateChangeLog).getLogger().setLevel(Level.ALL); ((Log4JLogger) NameNode.blockStateChangeLog).getLogger().setLevel(Level.ALL);
((Log4JLogger) NameNode.stateChangeLog).getLogger().setLevel(Level.ALL); ((Log4JLogger) NameNode.stateChangeLog).getLogger().setLevel(Level.ALL);
((Log4JLogger) FsDatasetImpl.LOG).getLogger().setLevel(Level.ALL);
} }
private static short REPL_FACTOR = 1; private static short REPL_FACTOR = 1;
@ -68,7 +71,7 @@ public class TestLazyPersistFiles {
private static final int LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC = 3; private static final int LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC = 3;
private static final long HEARTBEAT_INTERVAL_SEC = 1; private static final long HEARTBEAT_INTERVAL_SEC = 1;
private static final int HEARTBEAT_RECHECK_INTERVAL_MSEC = 500; private static final int HEARTBEAT_RECHECK_INTERVAL_MSEC = 500;
private static final int LAZY_WRITER_INTERVAL_SEC = 3; private static final int LAZY_WRITER_INTERVAL_SEC = 1;
private static final int BUFFER_LENGTH = 4096; private static final int BUFFER_LENGTH = 4096;
private MiniDFSCluster cluster; private MiniDFSCluster cluster;
@ -283,8 +286,9 @@ public class TestLazyPersistFiles {
File lazyPersistDir = volume.getBlockPoolSlice(bpid).getLazypersistDir(); File lazyPersistDir = volume.getBlockPoolSlice(bpid).getLazypersistDir();
for (LocatedBlock lb : locatedBlocks.getLocatedBlocks()) { for (LocatedBlock lb : locatedBlocks.getLocatedBlocks()) {
File persistedBlockFile = new File(lazyPersistDir, "blk_" + lb.getBlock().getBlockId()); File targetDir = DatanodeUtil.idToBlockDir(lazyPersistDir, lb.getBlock().getBlockId());
if (persistedBlockFile.exists()) { File blockFile = new File(targetDir, lb.getBlock().getBlockName());
if (blockFile.exists()) {
// Found a persisted copy for this block! // Found a persisted copy for this block!
boolean added = persistedBlockIds.add(lb.getBlock().getBlockId()); boolean added = persistedBlockIds.add(lb.getBlock().getBlockId());
assertThat(added, is(true)); assertThat(added, is(true));