From 5a81e70c448cf9674323ed220c758726d51a1aec Mon Sep 17 00:00:00 2001 From: Kihwal Lee Date: Wed, 31 May 2017 10:55:03 -0500 Subject: [PATCH] HDFS-5042. Completed files lost after power failure. Contributed by Vinayakumar B. --- .../java/org/apache/hadoop/io/IOUtils.java | 55 ++++++++++++++++++- .../hdfs/server/datanode/BlockReceiver.java | 9 ++- .../hdfs/server/datanode/FileIoProvider.java | 19 ++++++- .../hdfs/server/datanode/LocalReplica.java | 13 +++++ .../datanode/fsdataset/FsDatasetSpi.java | 4 +- .../fsdataset/impl/FsDatasetImpl.java | 31 ++++++++--- .../server/datanode/SimulatedFSDataset.java | 3 +- .../datanode/TestDataNodeHotSwapVolumes.java | 6 +- .../datanode/TestSimulatedFSDataset.java | 4 +- .../extdataset/ExternalDatasetImpl.java | 3 +- .../fsdataset/impl/TestFsDatasetImpl.java | 2 +- 11 files changed, 130 insertions(+), 19 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/IOUtils.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/IOUtils.java index 0d2e7976b25..ee7264bdedc 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/IOUtils.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/IOUtils.java @@ -27,6 +27,7 @@ import java.nio.file.DirectoryIteratorException; import java.nio.file.Files; import java.nio.file.Path; +import java.nio.file.StandardOpenOption; import java.util.ArrayList; import java.util.List; @@ -36,7 +37,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.util.ChunkedArrayList; +import org.apache.hadoop.util.Shell; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY; @@ -357,4 +358,56 @@ public static List listDirectory(File dir, FilenameFilter filter) } return list; } + + /** + * Ensure that any writes to the given file is written to the storage device + * that contains it. This method opens channel on given File and closes it + * once the sync is done.
+ * Borrowed from Uwe Schindler in LUCENE-5588 + * @param fileToSync the file to fsync + */ + public static void fsync(File fileToSync) throws IOException { + if (!fileToSync.exists()) { + throw new FileNotFoundException( + "File/Directory " + fileToSync.getAbsolutePath() + " does not exist"); + } + boolean isDir = fileToSync.isDirectory(); + // If the file is a directory we have to open read-only, for regular files + // we must open r/w for the fsync to have an effect. See + // http://blog.httrack.com/blog/2013/11/15/ + // everything-you-always-wanted-to-know-about-fsync/ + try(FileChannel channel = FileChannel.open(fileToSync.toPath(), + isDir ? StandardOpenOption.READ : StandardOpenOption.WRITE)){ + fsync(channel, isDir); + } + } + + /** + * Ensure that any writes to the given file is written to the storage device + * that contains it. This method opens channel on given File and closes it + * once the sync is done. + * Borrowed from Uwe Schindler in LUCENE-5588 + * @param channel Channel to sync + * @param isDir if true, the given file is a directory (Channel should be + * opened for read and ignore IOExceptions, because not all file + * systems and operating systems allow to fsync on a directory) + * @throws IOException + */ + public static void fsync(FileChannel channel, boolean isDir) + throws IOException { + try { + channel.force(true); + } catch (IOException ioe) { + if (isDir) { + assert !(Shell.LINUX + || Shell.MAC) : "On Linux and MacOSX fsyncing a directory" + + " should not throw IOException, we just don't want to rely" + + " on that in production (undocumented)" + ". Got: " + ioe; + // Ignore exception if it is a directory + return; + } + // Throw original exception + throw ioe; + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java index a0e646decae..007770024f9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java @@ -125,6 +125,7 @@ class BlockReceiver implements Closeable { private boolean isPenultimateNode = false; private boolean syncOnClose; + private volatile boolean dirSyncOnFinalize; private long restartBudget; /** the reference of the volume where the block receiver writes to */ private ReplicaHandler replicaHandler; @@ -547,6 +548,9 @@ private int receivePacket() throws IOException { // avoid double sync'ing on close if (syncBlock && lastPacketInBlock) { this.syncOnClose = false; + // sync directory for finalize irrespective of syncOnClose config since + // sync is requested. + this.dirSyncOnFinalize = true; } // update received bytes @@ -937,6 +941,7 @@ void receiveBlock( boolean isReplaceBlock) throws IOException { syncOnClose = datanode.getDnConf().syncOnClose; + dirSyncOnFinalize = syncOnClose; boolean responderClosed = false; mirrorOut = mirrOut; mirrorAddr = mirrAddr; @@ -979,7 +984,7 @@ void receiveBlock( } else { // for isDatnode or TRANSFER_FINALIZED // Finalize the block. - datanode.data.finalizeBlock(block); + datanode.data.finalizeBlock(block, dirSyncOnFinalize); } } datanode.metrics.incrBlocksWritten(); @@ -1502,7 +1507,7 @@ private void finalizeBlock(long startTime) throws IOException { BlockReceiver.this.close(); endTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0; block.setNumBytes(replicaInfo.getNumBytes()); - datanode.data.finalizeBlock(block); + datanode.data.finalizeBlock(block, dirSyncOnFinalize); } if (pinning) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoProvider.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoProvider.java index 694eadd8c5e..b8e08d0142f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoProvider.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoProvider.java @@ -149,7 +149,24 @@ public void sync( final long begin = profilingEventHook.beforeFileIo(volume, SYNC, 0); try { faultInjectorEventHook.beforeFileIo(volume, SYNC, 0); - fos.getChannel().force(true); + IOUtils.fsync(fos.getChannel(), false); + profilingEventHook.afterFileIo(volume, SYNC, begin, 0); + } catch (Exception e) { + onFailure(volume, begin); + throw e; + } + } + + /** + * Sync the given directory changes to durable device. + * @throws IOException + */ + public void dirSync(@Nullable FsVolumeSpi volume, File dir) + throws IOException { + final long begin = profilingEventHook.beforeFileIo(volume, SYNC, 0); + try { + faultInjectorEventHook.beforeFileIo(volume, SYNC, 0); + IOUtils.fsync(dir); profilingEventHook.afterFileIo(volume, SYNC, begin, 0); } catch (Exception e) { onFailure(volume, begin); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplica.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplica.java index 1d46ddd2b2f..2c5af11d0d2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplica.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplica.java @@ -510,4 +510,17 @@ public static void truncateBlock( metaRAF.write(b, 0, checksumsize); } } + + /** + * Sync the parent directory changes to durable device. + * @throws IOException + */ + public void fsyncDirectory() throws IOException { + File dir = getDir(); + try { + getFileIoProvider().dirSync(getVolume(), getDir()); + } catch (IOException e) { + throw new IOException("Failed to sync " + dir, e); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java index fd3af5da9f6..7be42e8c17f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java @@ -394,12 +394,14 @@ Replica recoverClose(ExtendedBlock b, long newGS, long expectedBlockLen * Finalizes the block previously opened for writing using writeToBlock. * The block size is what is in the parameter b and it must match the amount * of data written + * @param block Block to be finalized + * @param fsyncDir whether to sync the directory changes to durable device. * @throws IOException * @throws ReplicaNotFoundException if the replica can not be found when the * block is been finalized. For instance, the block resides on an HDFS volume * that has been removed. */ - void finalizeBlock(ExtendedBlock b) throws IOException; + void finalizeBlock(ExtendedBlock b, boolean fsyncDir) throws IOException; /** * Unfinalizes the block previously opened for writing using writeToBlock. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index eb4455b4f2e..11835a53fb3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -59,6 +59,8 @@ import org.apache.hadoop.hdfs.DFSUtilClient; import org.apache.hadoop.hdfs.ExtendedBlockId; import org.apache.hadoop.hdfs.server.datanode.FileIoProvider; +import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica; +import org.apache.hadoop.hdfs.server.datanode.LocalReplica; import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; @@ -987,7 +989,8 @@ private ReplicaInfo moveBlock(ExtendedBlock block, ReplicaInfo replicaInfo, replicaInfo, smallBufferSize, conf); // Finalize the copied files - newReplicaInfo = finalizeReplica(block.getBlockPoolId(), newReplicaInfo); + newReplicaInfo = finalizeReplica(block.getBlockPoolId(), newReplicaInfo, + false); try (AutoCloseableLock lock = datasetLock.acquire()) { // Increment numBlocks here as this block moved without knowing to BPS FsVolumeImpl volume = (FsVolumeImpl) newReplicaInfo.getVolume(); @@ -1290,7 +1293,7 @@ public Replica recoverClose(ExtendedBlock b, long newGS, replicaInfo.bumpReplicaGS(newGS); // finalize the replica if RBW if (replicaInfo.getState() == ReplicaState.RBW) { - finalizeReplica(b.getBlockPoolId(), replicaInfo); + finalizeReplica(b.getBlockPoolId(), replicaInfo, false); } return replicaInfo; } @@ -1604,7 +1607,8 @@ public void adjustCrcChannelPosition(ExtendedBlock b, ReplicaOutputStreams strea * Complete the block write! */ @Override // FsDatasetSpi - public void finalizeBlock(ExtendedBlock b) throws IOException { + public void finalizeBlock(ExtendedBlock b, boolean fsyncDir) + throws IOException { try (AutoCloseableLock lock = datasetLock.acquire()) { if (Thread.interrupted()) { // Don't allow data modifications from interrupted threads @@ -1616,12 +1620,12 @@ public void finalizeBlock(ExtendedBlock b) throws IOException { // been opened for append but never modified return; } - finalizeReplica(b.getBlockPoolId(), replicaInfo); + finalizeReplica(b.getBlockPoolId(), replicaInfo, fsyncDir); } } private ReplicaInfo finalizeReplica(String bpid, - ReplicaInfo replicaInfo) throws IOException { + ReplicaInfo replicaInfo, boolean fsyncDir) throws IOException { try (AutoCloseableLock lock = datasetLock.acquire()) { ReplicaInfo newReplicaInfo = null; if (replicaInfo.getState() == ReplicaState.RUR && @@ -1636,6 +1640,19 @@ private ReplicaInfo finalizeReplica(String bpid, newReplicaInfo = v.addFinalizedBlock( bpid, replicaInfo, replicaInfo, replicaInfo.getBytesReserved()); + /* + * Sync the directory after rename from tmp/rbw to Finalized if + * configured. Though rename should be atomic operation, sync on both + * dest and src directories are done because IOUtils.fsync() calls + * directory's channel sync, not the journal itself. + */ + if (fsyncDir && newReplicaInfo instanceof FinalizedReplica + && replicaInfo instanceof LocalReplica) { + FinalizedReplica finalizedReplica = (FinalizedReplica) newReplicaInfo; + finalizedReplica.fsyncDirectory(); + LocalReplica localReplica = (LocalReplica) replicaInfo; + localReplica.fsyncDirectory(); + } if (v.isTransientStorage()) { releaseLockedMemory( replicaInfo.getOriginalBytesReserved() @@ -2601,11 +2618,11 @@ private ReplicaInfo updateReplicaUnderRecovery( newReplicaInfo.setNumBytes(newlength); volumeMap.add(bpid, newReplicaInfo.getReplicaInfo()); - finalizeReplica(bpid, newReplicaInfo.getReplicaInfo()); + finalizeReplica(bpid, newReplicaInfo.getReplicaInfo(), false); } } // finalize the block - return finalizeReplica(bpid, rur); + return finalizeReplica(bpid, rur, false); } @Override // FsDatasetSpi diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java index afa7a82fb04..212f95342ba 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java @@ -673,7 +673,8 @@ private Map getMap(String bpid) throws IOException { } @Override // FsDatasetSpi - public synchronized void finalizeBlock(ExtendedBlock b) throws IOException { + public synchronized void finalizeBlock(ExtendedBlock b, boolean fsyncDir) + throws IOException { final Map map = getMap(b.getBlockPoolId()); BInfo binfo = map.get(b.getLocalBlock()); if (binfo == null) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java index 9d140a1f793..0c0440d2349 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java @@ -802,10 +802,12 @@ public Object answer(InvocationOnMock invocation) // Bypass the argument to FsDatasetImpl#finalizeBlock to verify that // the block is not removed, since the volume reference should not // be released at this point. - data.finalizeBlock((ExtendedBlock) invocation.getArguments()[0]); + data.finalizeBlock((ExtendedBlock) invocation.getArguments()[0], + (boolean) invocation.getArguments()[1]); return null; } - }).when(dn.data).finalizeBlock(any(ExtendedBlock.class)); + }).when(dn.data).finalizeBlock(any(ExtendedBlock.class), + Mockito.anyBoolean()); final CyclicBarrier barrier = new CyclicBarrier(2); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java index 469e249b62c..4775fc7ac47 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java @@ -96,7 +96,7 @@ static int addSomeBlocks(SimulatedFSDataset fsdataset, long startingBlockId, out.close(); } b.setNumBytes(blockIdToLen(i)); - fsdataset.finalizeBlock(b); + fsdataset.finalizeBlock(b, false); assertEquals(blockIdToLen(i), fsdataset.getLength(b)); } return bytesAdded; @@ -295,7 +295,7 @@ public void checkInvalidBlock(ExtendedBlock b) { } try { - fsdataset.finalizeBlock(b); + fsdataset.finalizeBlock(b, false); assertTrue("Expected an IO exception", false); } catch (IOException e) { // ok - as expected diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java index d14bd72e17e..13502d96211 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java @@ -180,7 +180,8 @@ public Replica recoverClose(ExtendedBlock b, long newGS, long expectedBlkLen) } @Override - public void finalizeBlock(ExtendedBlock b) throws IOException { + public void finalizeBlock(ExtendedBlock b, boolean fsyncDir) + throws IOException { } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java index 32935613aac..2a3bf79c92e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java @@ -582,7 +582,7 @@ public void run() { // Lets wait for the other thread finish getting block report blockReportReceivedLatch.await(); - dataset.finalizeBlock(eb); + dataset.finalizeBlock(eb, false); LOG.info("FinalizeBlock finished"); } catch (Exception e) { LOG.warn("Exception caught. This should not affect the test", e);