HDFS-5042. Completed files lost after power failure. Contributed by Vinayakumar B.

This commit is contained in:
Kihwal Lee 2017-05-31 11:02:34 -05:00
parent f5f12b576e
commit 5b0baeab5e
10 changed files with 123 additions and 20 deletions

View File

@ -27,6 +27,7 @@ import java.nio.file.DirectoryStream;
import java.nio.file.DirectoryIteratorException; import java.nio.file.DirectoryIteratorException;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
@ -36,7 +37,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ChunkedArrayList; import org.apache.hadoop.util.Shell;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
@ -355,4 +356,56 @@ public class IOUtils {
} }
return list; return list;
} }
/**
* Ensure that any writes to the given file is written to the storage device
* that contains it. This method opens channel on given File and closes it
* once the sync is done.<br>
* Borrowed from Uwe Schindler in LUCENE-5588
* @param fileToSync the file to fsync
*/
public static void fsync(File fileToSync) throws IOException {
if (!fileToSync.exists()) {
throw new FileNotFoundException(
"File/Directory " + fileToSync.getAbsolutePath() + " does not exist");
}
boolean isDir = fileToSync.isDirectory();
// If the file is a directory we have to open read-only, for regular files
// we must open r/w for the fsync to have an effect. See
// http://blog.httrack.com/blog/2013/11/15/
// everything-you-always-wanted-to-know-about-fsync/
try(FileChannel channel = FileChannel.open(fileToSync.toPath(),
isDir ? StandardOpenOption.READ : StandardOpenOption.WRITE)){
fsync(channel, isDir);
}
}
/**
* Ensure that any writes to the given file is written to the storage device
* that contains it. This method opens channel on given File and closes it
* once the sync is done.
* Borrowed from Uwe Schindler in LUCENE-5588
* @param channel Channel to sync
* @param isDir if true, the given file is a directory (Channel should be
* opened for read and ignore IOExceptions, because not all file
* systems and operating systems allow to fsync on a directory)
* @throws IOException
*/
public static void fsync(FileChannel channel, boolean isDir)
throws IOException {
try {
channel.force(true);
} catch (IOException ioe) {
if (isDir) {
assert !(Shell.LINUX
|| Shell.MAC) : "On Linux and MacOSX fsyncing a directory"
+ " should not throw IOException, we just don't want to rely"
+ " on that in production (undocumented)" + ". Got: " + ioe;
// Ignore exception if it is a directory
return;
}
// Throw original exception
throw ioe;
}
}
} }

View File

@ -126,6 +126,7 @@ class BlockReceiver implements Closeable {
private boolean isPenultimateNode = false; private boolean isPenultimateNode = false;
private boolean syncOnClose; private boolean syncOnClose;
private volatile boolean dirSyncOnFinalize;
private long restartBudget; private long restartBudget;
/** the reference of the volume where the block receiver writes to */ /** the reference of the volume where the block receiver writes to */
private ReplicaHandler replicaHandler; private ReplicaHandler replicaHandler;
@ -544,6 +545,9 @@ class BlockReceiver implements Closeable {
// avoid double sync'ing on close // avoid double sync'ing on close
if (syncBlock && lastPacketInBlock) { if (syncBlock && lastPacketInBlock) {
this.syncOnClose = false; this.syncOnClose = false;
// sync directory for finalize irrespective of syncOnClose config since
// sync is requested.
this.dirSyncOnFinalize = true;
} }
// update received bytes // update received bytes
@ -934,6 +938,7 @@ class BlockReceiver implements Closeable {
boolean isReplaceBlock) throws IOException { boolean isReplaceBlock) throws IOException {
syncOnClose = datanode.getDnConf().syncOnClose; syncOnClose = datanode.getDnConf().syncOnClose;
dirSyncOnFinalize = syncOnClose;
boolean responderClosed = false; boolean responderClosed = false;
mirrorOut = mirrOut; mirrorOut = mirrOut;
mirrorAddr = mirrAddr; mirrorAddr = mirrAddr;
@ -976,7 +981,7 @@ class BlockReceiver implements Closeable {
} else { } else {
// for isDatnode or TRANSFER_FINALIZED // for isDatnode or TRANSFER_FINALIZED
// Finalize the block. // Finalize the block.
datanode.data.finalizeBlock(block); datanode.data.finalizeBlock(block, dirSyncOnFinalize);
} }
} }
datanode.metrics.incrBlocksWritten(); datanode.metrics.incrBlocksWritten();
@ -1499,7 +1504,7 @@ class BlockReceiver implements Closeable {
BlockReceiver.this.close(); BlockReceiver.this.close();
endTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0; endTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
block.setNumBytes(replicaInfo.getNumBytes()); block.setNumBytes(replicaInfo.getNumBytes());
datanode.data.finalizeBlock(block); datanode.data.finalizeBlock(block, dirSyncOnFinalize);
} }
if (pinning) { if (pinning) {

View File

@ -149,7 +149,24 @@ public class FileIoProvider {
final long begin = profilingEventHook.beforeFileIo(volume, SYNC, 0); final long begin = profilingEventHook.beforeFileIo(volume, SYNC, 0);
try { try {
faultInjectorEventHook.beforeFileIo(volume, SYNC, 0); faultInjectorEventHook.beforeFileIo(volume, SYNC, 0);
fos.getChannel().force(true); IOUtils.fsync(fos.getChannel(), false);
profilingEventHook.afterFileIo(volume, SYNC, begin, 0);
} catch (Exception e) {
onFailure(volume, begin);
throw e;
}
}
/**
* Sync the given directory changes to durable device.
* @throws IOException
*/
public void dirSync(@Nullable FsVolumeSpi volume, File dir)
throws IOException {
final long begin = profilingEventHook.beforeFileIo(volume, SYNC, 0);
try {
faultInjectorEventHook.beforeFileIo(volume, SYNC, 0);
IOUtils.fsync(dir);
profilingEventHook.afterFileIo(volume, SYNC, begin, 0); profilingEventHook.afterFileIo(volume, SYNC, begin, 0);
} catch (Exception e) { } catch (Exception e) {
onFailure(volume, begin); onFailure(volume, begin);

View File

@ -395,12 +395,14 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
* Finalizes the block previously opened for writing using writeToBlock. * Finalizes the block previously opened for writing using writeToBlock.
* The block size is what is in the parameter b and it must match the amount * The block size is what is in the parameter b and it must match the amount
* of data written * of data written
* @param block Block to be finalized
* @param fsyncDir whether to sync the directory changes to durable device.
* @throws IOException * @throws IOException
* @throws ReplicaNotFoundException if the replica can not be found when the * @throws ReplicaNotFoundException if the replica can not be found when the
* block is been finalized. For instance, the block resides on an HDFS volume * block is been finalized. For instance, the block resides on an HDFS volume
* that has been removed. * that has been removed.
*/ */
void finalizeBlock(ExtendedBlock b) throws IOException; void finalizeBlock(ExtendedBlock b, boolean fsyncDir) throws IOException;
/** /**
* Unfinalizes the block previously opened for writing using writeToBlock. * Unfinalizes the block previously opened for writing using writeToBlock.

View File

@ -927,6 +927,18 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
return dstfile; return dstfile;
} }
private void fsyncDirectory(FsVolumeSpi volume, File... dirs)
throws IOException {
FileIoProvider fileIoProvider = datanode.getFileIoProvider();
for (File dir : dirs) {
try {
fileIoProvider.dirSync(volume, dir);
} catch (IOException e) {
throw new IOException("Failed to sync " + dir, e);
}
}
}
/** /**
* Copy the block and meta files for the given block to the given destination. * Copy the block and meta files for the given block to the given destination.
* @return the new meta and block files. * @return the new meta and block files.
@ -1021,7 +1033,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
targetVolume, blockFiles[0].getParentFile(), 0); targetVolume, blockFiles[0].getParentFile(), 0);
newReplicaInfo.setNumBytes(blockFiles[1].length()); newReplicaInfo.setNumBytes(blockFiles[1].length());
// Finalize the copied files // Finalize the copied files
newReplicaInfo = finalizeReplica(block.getBlockPoolId(), newReplicaInfo); newReplicaInfo = finalizeReplica(block.getBlockPoolId(), newReplicaInfo,
false);
try(AutoCloseableLock lock = datasetLock.acquire()) { try(AutoCloseableLock lock = datasetLock.acquire()) {
// Increment numBlocks here as this block moved without knowing to BPS // Increment numBlocks here as this block moved without knowing to BPS
FsVolumeImpl volume = (FsVolumeImpl) newReplicaInfo.getVolume(); FsVolumeImpl volume = (FsVolumeImpl) newReplicaInfo.getVolume();
@ -1394,7 +1407,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
bumpReplicaGS(replicaInfo, newGS); bumpReplicaGS(replicaInfo, newGS);
// finalize the replica if RBW // finalize the replica if RBW
if (replicaInfo.getState() == ReplicaState.RBW) { if (replicaInfo.getState() == ReplicaState.RBW) {
finalizeReplica(b.getBlockPoolId(), replicaInfo); finalizeReplica(b.getBlockPoolId(), replicaInfo, false);
} }
return replicaInfo; return replicaInfo;
} }
@ -1728,7 +1741,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
* Complete the block write! * Complete the block write!
*/ */
@Override // FsDatasetSpi @Override // FsDatasetSpi
public void finalizeBlock(ExtendedBlock b) throws IOException { public void finalizeBlock(ExtendedBlock b, boolean fsyncDir)
throws IOException {
try(AutoCloseableLock lock = datasetLock.acquire()) { try(AutoCloseableLock lock = datasetLock.acquire()) {
if (Thread.interrupted()) { if (Thread.interrupted()) {
// Don't allow data modifications from interrupted threads // Don't allow data modifications from interrupted threads
@ -1740,12 +1754,12 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
// been opened for append but never modified // been opened for append but never modified
return; return;
} }
finalizeReplica(b.getBlockPoolId(), replicaInfo); finalizeReplica(b.getBlockPoolId(), replicaInfo, fsyncDir);
} }
} }
private FinalizedReplica finalizeReplica(String bpid, private FinalizedReplica finalizeReplica(String bpid,
ReplicaInfo replicaInfo) throws IOException { ReplicaInfo replicaInfo, boolean fsyncDir) throws IOException {
try(AutoCloseableLock lock = datasetLock.acquire()) { try(AutoCloseableLock lock = datasetLock.acquire()) {
FinalizedReplica newReplicaInfo = null; FinalizedReplica newReplicaInfo = null;
if (replicaInfo.getState() == ReplicaState.RUR && if (replicaInfo.getState() == ReplicaState.RUR &&
@ -1765,7 +1779,15 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
bpid, replicaInfo, f, replicaInfo.getBytesReserved()); bpid, replicaInfo, f, replicaInfo.getBytesReserved());
newReplicaInfo = newReplicaInfo =
new FinalizedReplica(replicaInfo, v, dest.getParentFile()); new FinalizedReplica(replicaInfo, v, dest.getParentFile());
/*
* Sync the directory after rename from tmp/rbw to Finalized if
* configured. Though rename should be atomic operation, sync on both
* dest and src directories are done because IOUtils.fsync() calls
* directory's channel sync, not the journal itself.
*/
if (fsyncDir) {
fsyncDirectory(v, dest.getParentFile(), f.getParentFile());
}
if (v.isTransientStorage()) { if (v.isTransientStorage()) {
releaseLockedMemory( releaseLockedMemory(
replicaInfo.getOriginalBytesReserved() replicaInfo.getOriginalBytesReserved()
@ -2742,12 +2764,12 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
// but it is immediately converted to finalized state within the same // but it is immediately converted to finalized state within the same
// lock, so no need to update it. // lock, so no need to update it.
volumeMap.add(bpid, newReplicaInfo); volumeMap.add(bpid, newReplicaInfo);
finalizeReplica(bpid, newReplicaInfo); finalizeReplica(bpid, newReplicaInfo, false);
} }
} }
// finalize the block // finalize the block
return finalizeReplica(bpid, rur); return finalizeReplica(bpid, rur, false);
} }
private File[] copyReplicaWithNewBlockIdAndGS( private File[] copyReplicaWithNewBlockIdAndGS(

View File

@ -638,7 +638,8 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
} }
@Override // FsDatasetSpi @Override // FsDatasetSpi
public synchronized void finalizeBlock(ExtendedBlock b) throws IOException { public synchronized void finalizeBlock(ExtendedBlock b, boolean fsyncDir)
throws IOException {
final Map<Block, BInfo> map = getMap(b.getBlockPoolId()); final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
BInfo binfo = map.get(b.getLocalBlock()); BInfo binfo = map.get(b.getLocalBlock());
if (binfo == null) { if (binfo == null) {

View File

@ -804,10 +804,12 @@ public class TestDataNodeHotSwapVolumes {
// Bypass the argument to FsDatasetImpl#finalizeBlock to verify that // Bypass the argument to FsDatasetImpl#finalizeBlock to verify that
// the block is not removed, since the volume reference should not // the block is not removed, since the volume reference should not
// be released at this point. // be released at this point.
data.finalizeBlock((ExtendedBlock) invocation.getArguments()[0]); data.finalizeBlock((ExtendedBlock) invocation.getArguments()[0],
(boolean) invocation.getArguments()[1]);
return null; return null;
} }
}).when(dn.data).finalizeBlock(any(ExtendedBlock.class)); }).when(dn.data).finalizeBlock(any(ExtendedBlock.class),
Mockito.anyBoolean());
final CyclicBarrier barrier = new CyclicBarrier(2); final CyclicBarrier barrier = new CyclicBarrier(2);

View File

@ -96,7 +96,7 @@ public class TestSimulatedFSDataset {
out.close(); out.close();
} }
b.setNumBytes(blockIdToLen(i)); b.setNumBytes(blockIdToLen(i));
fsdataset.finalizeBlock(b); fsdataset.finalizeBlock(b, false);
assertEquals(blockIdToLen(i), fsdataset.getLength(b)); assertEquals(blockIdToLen(i), fsdataset.getLength(b));
} }
return bytesAdded; return bytesAdded;
@ -295,7 +295,7 @@ public class TestSimulatedFSDataset {
} }
try { try {
fsdataset.finalizeBlock(b); fsdataset.finalizeBlock(b, false);
assertTrue("Expected an IO exception", false); assertTrue("Expected an IO exception", false);
} catch (IOException e) { } catch (IOException e) {
// ok - as expected // ok - as expected

View File

@ -179,7 +179,8 @@ public class ExternalDatasetImpl implements FsDatasetSpi<ExternalVolumeImpl> {
} }
@Override @Override
public void finalizeBlock(ExtendedBlock b) throws IOException { public void finalizeBlock(ExtendedBlock b, boolean fsyncDir)
throws IOException {
} }
@Override @Override

View File

@ -565,7 +565,7 @@ public class TestFsDatasetImpl {
// Lets wait for the other thread finish getting block report // Lets wait for the other thread finish getting block report
blockReportReceivedLatch.await(); blockReportReceivedLatch.await();
dataset.finalizeBlock(eb); dataset.finalizeBlock(eb, false);
LOG.info("FinalizeBlock finished"); LOG.info("FinalizeBlock finished");
} catch (Exception e) { } catch (Exception e) {
LOG.warn("Exception caught. This should not affect the test", e); LOG.warn("Exception caught. This should not affect the test", e);