diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/IOUtils.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/IOUtils.java index 61827d1b76a..a56ccfe7d48 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/IOUtils.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/IOUtils.java @@ -27,6 +27,7 @@ import java.nio.file.DirectoryStream; import java.nio.file.DirectoryIteratorException; import java.nio.file.Files; import java.nio.file.Path; +import java.nio.file.StandardOpenOption; import java.util.ArrayList; import java.util.List; @@ -36,7 +37,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.util.ChunkedArrayList; +import org.apache.hadoop.util.Shell; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY; @@ -355,4 +356,56 @@ public class IOUtils { } return list; } + + /** + * Ensure that any writes to the given file is written to the storage device + * that contains it. This method opens channel on given File and closes it + * once the sync is done.
+ * Borrowed from Uwe Schindler in LUCENE-5588 + * @param fileToSync the file to fsync + */ + public static void fsync(File fileToSync) throws IOException { + if (!fileToSync.exists()) { + throw new FileNotFoundException( + "File/Directory " + fileToSync.getAbsolutePath() + " does not exist"); + } + boolean isDir = fileToSync.isDirectory(); + // If the file is a directory we have to open read-only, for regular files + // we must open r/w for the fsync to have an effect. See + // http://blog.httrack.com/blog/2013/11/15/ + // everything-you-always-wanted-to-know-about-fsync/ + try(FileChannel channel = FileChannel.open(fileToSync.toPath(), + isDir ? StandardOpenOption.READ : StandardOpenOption.WRITE)){ + fsync(channel, isDir); + } + } + + /** + * Ensure that any writes to the given file is written to the storage device + * that contains it. This method opens channel on given File and closes it + * once the sync is done. + * Borrowed from Uwe Schindler in LUCENE-5588 + * @param channel Channel to sync + * @param isDir if true, the given file is a directory (Channel should be + * opened for read and ignore IOExceptions, because not all file + * systems and operating systems allow to fsync on a directory) + * @throws IOException + */ + public static void fsync(FileChannel channel, boolean isDir) + throws IOException { + try { + channel.force(true); + } catch (IOException ioe) { + if (isDir) { + assert !(Shell.LINUX + || Shell.MAC) : "On Linux and MacOSX fsyncing a directory" + + " should not throw IOException, we just don't want to rely" + + " on that in production (undocumented)" + ". Got: " + ioe; + // Ignore exception if it is a directory + return; + } + // Throw original exception + throw ioe; + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java index 81945f9afb1..9ed1766d3cd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java @@ -126,6 +126,7 @@ class BlockReceiver implements Closeable { private boolean isPenultimateNode = false; private boolean syncOnClose; + private volatile boolean dirSyncOnFinalize; private long restartBudget; /** the reference of the volume where the block receiver writes to */ private ReplicaHandler replicaHandler; @@ -544,6 +545,9 @@ class BlockReceiver implements Closeable { // avoid double sync'ing on close if (syncBlock && lastPacketInBlock) { this.syncOnClose = false; + // sync directory for finalize irrespective of syncOnClose config since + // sync is requested. + this.dirSyncOnFinalize = true; } // update received bytes @@ -934,6 +938,7 @@ class BlockReceiver implements Closeable { boolean isReplaceBlock) throws IOException { syncOnClose = datanode.getDnConf().syncOnClose; + dirSyncOnFinalize = syncOnClose; boolean responderClosed = false; mirrorOut = mirrOut; mirrorAddr = mirrAddr; @@ -976,7 +981,7 @@ class BlockReceiver implements Closeable { } else { // for isDatnode or TRANSFER_FINALIZED // Finalize the block. - datanode.data.finalizeBlock(block); + datanode.data.finalizeBlock(block, dirSyncOnFinalize); } } datanode.metrics.incrBlocksWritten(); @@ -1499,7 +1504,7 @@ class BlockReceiver implements Closeable { BlockReceiver.this.close(); endTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0; block.setNumBytes(replicaInfo.getNumBytes()); - datanode.data.finalizeBlock(block); + datanode.data.finalizeBlock(block, dirSyncOnFinalize); } if (pinning) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoProvider.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoProvider.java index 2483fd4f906..e1ed454ff5f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoProvider.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoProvider.java @@ -149,7 +149,24 @@ public class FileIoProvider { final long begin = profilingEventHook.beforeFileIo(volume, SYNC, 0); try { faultInjectorEventHook.beforeFileIo(volume, SYNC, 0); - fos.getChannel().force(true); + IOUtils.fsync(fos.getChannel(), false); + profilingEventHook.afterFileIo(volume, SYNC, begin, 0); + } catch (Exception e) { + onFailure(volume, begin); + throw e; + } + } + + /** + * Sync the given directory changes to durable device. + * @throws IOException + */ + public void dirSync(@Nullable FsVolumeSpi volume, File dir) + throws IOException { + final long begin = profilingEventHook.beforeFileIo(volume, SYNC, 0); + try { + faultInjectorEventHook.beforeFileIo(volume, SYNC, 0); + IOUtils.fsync(dir); profilingEventHook.afterFileIo(volume, SYNC, begin, 0); } catch (Exception e) { onFailure(volume, begin); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java index 4ce603fa583..4d08e1771bd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java @@ -395,12 +395,14 @@ public interface FsDatasetSpi extends FSDatasetMBean { * Finalizes the block previously opened for writing using writeToBlock. * The block size is what is in the parameter b and it must match the amount * of data written + * @param block Block to be finalized + * @param fsyncDir whether to sync the directory changes to durable device. * @throws IOException * @throws ReplicaNotFoundException if the replica can not be found when the * block is been finalized. For instance, the block resides on an HDFS volume * that has been removed. */ - void finalizeBlock(ExtendedBlock b) throws IOException; + void finalizeBlock(ExtendedBlock b, boolean fsyncDir) throws IOException; /** * Unfinalizes the block previously opened for writing using writeToBlock. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 221bc6adb63..ef544910411 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -927,6 +927,18 @@ class FsDatasetImpl implements FsDatasetSpi { return dstfile; } + private void fsyncDirectory(FsVolumeSpi volume, File... dirs) + throws IOException { + FileIoProvider fileIoProvider = datanode.getFileIoProvider(); + for (File dir : dirs) { + try { + fileIoProvider.dirSync(volume, dir); + } catch (IOException e) { + throw new IOException("Failed to sync " + dir, e); + } + } + } + /** * Copy the block and meta files for the given block to the given destination. * @return the new meta and block files. @@ -1021,7 +1033,8 @@ class FsDatasetImpl implements FsDatasetSpi { targetVolume, blockFiles[0].getParentFile(), 0); newReplicaInfo.setNumBytes(blockFiles[1].length()); // Finalize the copied files - newReplicaInfo = finalizeReplica(block.getBlockPoolId(), newReplicaInfo); + newReplicaInfo = finalizeReplica(block.getBlockPoolId(), newReplicaInfo, + false); try(AutoCloseableLock lock = datasetLock.acquire()) { // Increment numBlocks here as this block moved without knowing to BPS FsVolumeImpl volume = (FsVolumeImpl) newReplicaInfo.getVolume(); @@ -1394,7 +1407,7 @@ class FsDatasetImpl implements FsDatasetSpi { bumpReplicaGS(replicaInfo, newGS); // finalize the replica if RBW if (replicaInfo.getState() == ReplicaState.RBW) { - finalizeReplica(b.getBlockPoolId(), replicaInfo); + finalizeReplica(b.getBlockPoolId(), replicaInfo, false); } return replicaInfo; } @@ -1728,7 +1741,8 @@ class FsDatasetImpl implements FsDatasetSpi { * Complete the block write! */ @Override // FsDatasetSpi - public void finalizeBlock(ExtendedBlock b) throws IOException { + public void finalizeBlock(ExtendedBlock b, boolean fsyncDir) + throws IOException { try(AutoCloseableLock lock = datasetLock.acquire()) { if (Thread.interrupted()) { // Don't allow data modifications from interrupted threads @@ -1740,12 +1754,12 @@ class FsDatasetImpl implements FsDatasetSpi { // been opened for append but never modified return; } - finalizeReplica(b.getBlockPoolId(), replicaInfo); + finalizeReplica(b.getBlockPoolId(), replicaInfo, fsyncDir); } } private FinalizedReplica finalizeReplica(String bpid, - ReplicaInfo replicaInfo) throws IOException { + ReplicaInfo replicaInfo, boolean fsyncDir) throws IOException { try(AutoCloseableLock lock = datasetLock.acquire()) { FinalizedReplica newReplicaInfo = null; if (replicaInfo.getState() == ReplicaState.RUR && @@ -1765,7 +1779,15 @@ class FsDatasetImpl implements FsDatasetSpi { bpid, replicaInfo, f, replicaInfo.getBytesReserved()); newReplicaInfo = new FinalizedReplica(replicaInfo, v, dest.getParentFile()); - + /* + * Sync the directory after rename from tmp/rbw to Finalized if + * configured. Though rename should be atomic operation, sync on both + * dest and src directories are done because IOUtils.fsync() calls + * directory's channel sync, not the journal itself. + */ + if (fsyncDir) { + fsyncDirectory(v, dest.getParentFile(), f.getParentFile()); + } if (v.isTransientStorage()) { releaseLockedMemory( replicaInfo.getOriginalBytesReserved() @@ -2742,12 +2764,12 @@ class FsDatasetImpl implements FsDatasetSpi { // but it is immediately converted to finalized state within the same // lock, so no need to update it. volumeMap.add(bpid, newReplicaInfo); - finalizeReplica(bpid, newReplicaInfo); + finalizeReplica(bpid, newReplicaInfo, false); } } // finalize the block - return finalizeReplica(bpid, rur); + return finalizeReplica(bpid, rur, false); } private File[] copyReplicaWithNewBlockIdAndGS( diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java index dd2ca99f62b..82d49f27a5d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java @@ -638,7 +638,8 @@ public class SimulatedFSDataset implements FsDatasetSpi { } @Override // FsDatasetSpi - public synchronized void finalizeBlock(ExtendedBlock b) throws IOException { + public synchronized void finalizeBlock(ExtendedBlock b, boolean fsyncDir) + throws IOException { final Map map = getMap(b.getBlockPoolId()); BInfo binfo = map.get(b.getLocalBlock()); if (binfo == null) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java index b97f5c84af1..ea28ea43f94 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java @@ -804,10 +804,12 @@ public class TestDataNodeHotSwapVolumes { // Bypass the argument to FsDatasetImpl#finalizeBlock to verify that // the block is not removed, since the volume reference should not // be released at this point. - data.finalizeBlock((ExtendedBlock) invocation.getArguments()[0]); + data.finalizeBlock((ExtendedBlock) invocation.getArguments()[0], + (boolean) invocation.getArguments()[1]); return null; } - }).when(dn.data).finalizeBlock(any(ExtendedBlock.class)); + }).when(dn.data).finalizeBlock(any(ExtendedBlock.class), + Mockito.anyBoolean()); final CyclicBarrier barrier = new CyclicBarrier(2); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java index dd7d239f6be..385d9103d56 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java @@ -96,7 +96,7 @@ public class TestSimulatedFSDataset { out.close(); } b.setNumBytes(blockIdToLen(i)); - fsdataset.finalizeBlock(b); + fsdataset.finalizeBlock(b, false); assertEquals(blockIdToLen(i), fsdataset.getLength(b)); } return bytesAdded; @@ -295,7 +295,7 @@ public class TestSimulatedFSDataset { } try { - fsdataset.finalizeBlock(b); + fsdataset.finalizeBlock(b, false); assertTrue("Expected an IO exception", false); } catch (IOException e) { // ok - as expected diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java index 731bb0b6f07..a106f7ab735 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java @@ -179,7 +179,8 @@ public class ExternalDatasetImpl implements FsDatasetSpi { } @Override - public void finalizeBlock(ExtendedBlock b) throws IOException { + public void finalizeBlock(ExtendedBlock b, boolean fsyncDir) + throws IOException { } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java index 77b5258e67b..2e6749dae49 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java @@ -565,7 +565,7 @@ public class TestFsDatasetImpl { // Lets wait for the other thread finish getting block report blockReportReceivedLatch.await(); - dataset.finalizeBlock(eb); + dataset.finalizeBlock(eb, false); LOG.info("FinalizeBlock finished"); } catch (Exception e) { LOG.warn("Exception caught. This should not affect the test", e);