From a56c5ef636413052fec62cee1a38b7e1455e389d Mon Sep 17 00:00:00 2001 From: Colin Patrick Mccabe Date: Mon, 21 Dec 2015 14:13:36 -0800 Subject: [PATCH] HDFS-9589. Block files which have been hardlinked should be duplicated before the DataNode appends to the them (cmccabe) (cherry picked from commit bb540ba85aa37d9fe31e640665158afe8a936230) --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../hdfs/server/datanode/ReplicaInfo.java | 79 +++++++++++++++++++ .../fsdataset/impl/FsDatasetImpl.java | 5 ++ .../apache/hadoop/hdfs/TestFileAppend.java | 66 ++++++++++++++++ .../fsdataset/impl/FsDatasetTestUtil.java | 6 ++ 5 files changed, 159 insertions(+) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 800e5a6319d..83be6e40f38 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -1664,6 +1664,9 @@ Release 2.8.0 - UNRELEASED HDFS-9347. Invariant assumption in TestQuorumJournalManager.shutdown() is wrong. (Wei-Chiu Chuang via zhz) + HDFS-9589. Block files which have been hardlinked should be duplicated + before the DataNode appends to the them (cmccabe) + Release 2.7.3 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java index d19e656c9d6..3ef639033c4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java @@ -18,12 +18,18 @@ package org.apache.hadoop.hdfs.server.datanode; import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; import java.util.HashMap; import java.util.Map; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.HardLink; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.util.LightWeightResizableGSet; import com.google.common.annotations.VisibleForTesting; @@ -210,6 +216,79 @@ abstract public class ReplicaInfo extends Block return 0; } + /** + * Copy specified file into a temporary file. Then rename the + * temporary file to the original name. This will cause any + * hardlinks to the original file to be removed. The temporary + * files are created in the same directory. The temporary files will + * be recovered (especially on Windows) on datanode restart. + */ + private void breakHardlinks(File file, Block b) throws IOException { + File tmpFile = DatanodeUtil.createTmpFile(b, DatanodeUtil.getUnlinkTmpFile(file)); + try { + FileInputStream in = new FileInputStream(file); + try { + FileOutputStream out = new FileOutputStream(tmpFile); + try { + IOUtils.copyBytes(in, out, 16 * 1024); + } finally { + out.close(); + } + } finally { + in.close(); + } + if (file.length() != tmpFile.length()) { + throw new IOException("Copy of file " + file + " size " + file.length()+ + " into file " + tmpFile + + " resulted in a size of " + tmpFile.length()); + } + FileUtil.replaceFile(tmpFile, file); + } catch (IOException e) { + boolean done = tmpFile.delete(); + if (!done) { + DataNode.LOG.info("detachFile failed to delete temporary file " + + tmpFile); + } + throw e; + } + } + + /** + * This function "breaks hardlinks" to the current replica file. + * + * When doing a DataNode upgrade, we create a bunch of hardlinks to each block + * file. This cleverly ensures that both the old and the new storage + * directories can contain the same block file, without using additional space + * for the data. + * + * However, when we want to append to the replica file, we need to "break" the + * hardlink to ensure that the old snapshot continues to contain the old data + * length. If we failed to do that, we could roll back to the previous/ + * directory during a downgrade, and find that the block contents were longer + * than they were at the time of upgrade. + * + * @return true only if data was copied. + * @throws IOException + */ + public boolean breakHardLinksIfNeeded() throws IOException { + File file = getBlockFile(); + if (file == null || getVolume() == null) { + throw new IOException("detachBlock:Block not found. " + this); + } + File meta = getMetaFile(); + + int linkCount = HardLink.getLinkCount(file); + if (linkCount > 1) { + DataNode.LOG.info("Breaking hardlink for " + linkCount + "x-linked " + + "block " + this); + breakHardlinks(file, this); + } + if (HardLink.getLinkCount(meta) > 1) { + breakHardlinks(meta, this); + } + return true; + } + @Override //Object public String toString() { return getClass().getSimpleName() diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 0fabf92b02f..544169d9084 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -1133,6 +1133,10 @@ class FsDatasetImpl implements FsDatasetSpi { throws IOException { // If the block is cached, start uncaching it. cacheManager.uncacheBlock(bpid, replicaInfo.getBlockId()); + + // If there are any hardlinks to the block, break them. This ensures we are + // not appending to a file that is part of a previous/ directory. + replicaInfo.breakHardLinksIfNeeded(); // construct a RBW replica with the new GS File blkfile = replicaInfo.getBlockFile(); @@ -2498,6 +2502,7 @@ class FsDatasetImpl implements FsDatasetSpi { + ", rur=" + rur); } if (rur.getNumBytes() > newlength) { + rur.breakHardLinksIfNeeded(); truncateBlock(blockFile, metaFile, rur.getNumBytes(), newlength); if(!copyOnTruncate) { // update RUR with the new length diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java index be1f985046e..ea1d0a67315 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java @@ -43,6 +43,8 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.ipc.RemoteException; import org.junit.Assert; @@ -109,6 +111,70 @@ public class TestFileAppend{ expected, "Read 1", false); } + @Test + public void testBreakHardlinksIfNeeded() throws IOException { + Configuration conf = new HdfsConfiguration(); + if (simulatedStorage) { + SimulatedFSDataset.setFactory(conf); + } + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); + FileSystem fs = cluster.getFileSystem(); + InetSocketAddress addr = new InetSocketAddress("localhost", + cluster.getNameNodePort()); + DFSClient client = new DFSClient(addr, conf); + try { + // create a new file, write to it and close it. + Path file1 = new Path("/filestatus.dat"); + FSDataOutputStream stm = AppendTestUtil.createFile(fs, file1, 1); + writeFile(stm); + stm.close(); + + // Get a handle to the datanode + DataNode[] dn = cluster.listDataNodes(); + assertTrue("There should be only one datanode but found " + dn.length, + dn.length == 1); + + LocatedBlocks locations = client.getNamenode().getBlockLocations( + file1.toString(), 0, Long.MAX_VALUE); + List blocks = locations.getLocatedBlocks(); + final FsDatasetSpi fsd = dn[0].getFSDataset(); + + // + // Create hard links for a few of the blocks + // + for (int i = 0; i < blocks.size(); i = i + 2) { + ExtendedBlock b = blocks.get(i).getBlock(); + final File f = FsDatasetTestUtil.getBlockFile( + fsd, b.getBlockPoolId(), b.getLocalBlock()); + File link = new File(f.toString() + ".link"); + System.out.println("Creating hardlink for File " + f + " to " + link); + HardLink.createHardLink(f, link); + } + + // Detach all blocks. This should remove hardlinks (if any) + for (int i = 0; i < blocks.size(); i++) { + ExtendedBlock b = blocks.get(i).getBlock(); + System.out.println("breakHardlinksIfNeeded detaching block " + b); + assertTrue("breakHardlinksIfNeeded(" + b + ") should have returned true", + FsDatasetTestUtil.breakHardlinksIfNeeded(fsd, b)); + } + + // Since the blocks were already detached earlier, these calls should + // return false + for (int i = 0; i < blocks.size(); i++) { + ExtendedBlock b = blocks.get(i).getBlock(); + System.out.println("breakHardlinksIfNeeded re-attempting to " + + "detach block " + b); + assertTrue("breakHardlinksIfNeeded(" + b + ") should have returned false", + FsDatasetTestUtil.breakHardlinksIfNeeded(fsd, b)); + } + } finally { + client.close(); + fs.close(); + cluster.shutdown(); + } + } + /** * Test a simple flush on a simple HDFS file. * @throws IOException an exception might be thrown diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetTestUtil.java index f4480a16b96..665befaee51 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetTestUtil.java @@ -55,6 +55,12 @@ public class FsDatasetTestUtil { .getGenerationStamp()); } + public static boolean breakHardlinksIfNeeded(FsDatasetSpi fsd, + ExtendedBlock block) throws IOException { + final ReplicaInfo info = ((FsDatasetImpl)fsd).getReplicaInfo(block); + return info.breakHardLinksIfNeeded(); + } + public static ReplicaInfo fetchReplicaInfo (final FsDatasetSpi fsd, final String bpid, final long blockId) { return ((FsDatasetImpl)fsd).fetchReplicaInfo(bpid, blockId);