HDFS-9589. Block files which have been hardlinked should be duplicated before the DataNode appends to the them (cmccabe)
This commit is contained in:
parent
70d6f20126
commit
bb540ba85a
|
@ -2541,6 +2541,9 @@ Release 2.8.0 - UNRELEASED
|
||||||
HDFS-9347. Invariant assumption in TestQuorumJournalManager.shutdown()
|
HDFS-9347. Invariant assumption in TestQuorumJournalManager.shutdown()
|
||||||
is wrong. (Wei-Chiu Chuang via zhz)
|
is wrong. (Wei-Chiu Chuang via zhz)
|
||||||
|
|
||||||
|
HDFS-9589. Block files which have been hardlinked should be duplicated
|
||||||
|
before the DataNode appends to the them (cmccabe)
|
||||||
|
|
||||||
Release 2.7.3 - UNRELEASED
|
Release 2.7.3 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -18,12 +18,18 @@
|
||||||
package org.apache.hadoop.hdfs.server.datanode;
|
package org.apache.hadoop.hdfs.server.datanode;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
|
import java.io.FileInputStream;
|
||||||
|
import java.io.FileOutputStream;
|
||||||
|
import java.io.IOException;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.fs.FileUtil;
|
||||||
|
import org.apache.hadoop.fs.HardLink;
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||||
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.hadoop.util.LightWeightResizableGSet;
|
import org.apache.hadoop.util.LightWeightResizableGSet;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
@ -210,6 +216,79 @@ abstract public class ReplicaInfo extends Block
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Copy specified file into a temporary file. Then rename the
|
||||||
|
* temporary file to the original name. This will cause any
|
||||||
|
* hardlinks to the original file to be removed. The temporary
|
||||||
|
* files are created in the same directory. The temporary files will
|
||||||
|
* be recovered (especially on Windows) on datanode restart.
|
||||||
|
*/
|
||||||
|
private void breakHardlinks(File file, Block b) throws IOException {
|
||||||
|
File tmpFile = DatanodeUtil.createTmpFile(b, DatanodeUtil.getUnlinkTmpFile(file));
|
||||||
|
try {
|
||||||
|
FileInputStream in = new FileInputStream(file);
|
||||||
|
try {
|
||||||
|
FileOutputStream out = new FileOutputStream(tmpFile);
|
||||||
|
try {
|
||||||
|
IOUtils.copyBytes(in, out, 16 * 1024);
|
||||||
|
} finally {
|
||||||
|
out.close();
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
in.close();
|
||||||
|
}
|
||||||
|
if (file.length() != tmpFile.length()) {
|
||||||
|
throw new IOException("Copy of file " + file + " size " + file.length()+
|
||||||
|
" into file " + tmpFile +
|
||||||
|
" resulted in a size of " + tmpFile.length());
|
||||||
|
}
|
||||||
|
FileUtil.replaceFile(tmpFile, file);
|
||||||
|
} catch (IOException e) {
|
||||||
|
boolean done = tmpFile.delete();
|
||||||
|
if (!done) {
|
||||||
|
DataNode.LOG.info("detachFile failed to delete temporary file " +
|
||||||
|
tmpFile);
|
||||||
|
}
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This function "breaks hardlinks" to the current replica file.
|
||||||
|
*
|
||||||
|
* When doing a DataNode upgrade, we create a bunch of hardlinks to each block
|
||||||
|
* file. This cleverly ensures that both the old and the new storage
|
||||||
|
* directories can contain the same block file, without using additional space
|
||||||
|
* for the data.
|
||||||
|
*
|
||||||
|
* However, when we want to append to the replica file, we need to "break" the
|
||||||
|
* hardlink to ensure that the old snapshot continues to contain the old data
|
||||||
|
* length. If we failed to do that, we could roll back to the previous/
|
||||||
|
* directory during a downgrade, and find that the block contents were longer
|
||||||
|
* than they were at the time of upgrade.
|
||||||
|
*
|
||||||
|
* @return true only if data was copied.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public boolean breakHardLinksIfNeeded() throws IOException {
|
||||||
|
File file = getBlockFile();
|
||||||
|
if (file == null || getVolume() == null) {
|
||||||
|
throw new IOException("detachBlock:Block not found. " + this);
|
||||||
|
}
|
||||||
|
File meta = getMetaFile();
|
||||||
|
|
||||||
|
int linkCount = HardLink.getLinkCount(file);
|
||||||
|
if (linkCount > 1) {
|
||||||
|
DataNode.LOG.info("Breaking hardlink for " + linkCount + "x-linked " +
|
||||||
|
"block " + this);
|
||||||
|
breakHardlinks(file, this);
|
||||||
|
}
|
||||||
|
if (HardLink.getLinkCount(meta) > 1) {
|
||||||
|
breakHardlinks(meta, this);
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
@Override //Object
|
@Override //Object
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return getClass().getSimpleName()
|
return getClass().getSimpleName()
|
||||||
|
|
|
@ -1128,6 +1128,10 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
throws IOException {
|
throws IOException {
|
||||||
// If the block is cached, start uncaching it.
|
// If the block is cached, start uncaching it.
|
||||||
cacheManager.uncacheBlock(bpid, replicaInfo.getBlockId());
|
cacheManager.uncacheBlock(bpid, replicaInfo.getBlockId());
|
||||||
|
|
||||||
|
// If there are any hardlinks to the block, break them. This ensures we are
|
||||||
|
// not appending to a file that is part of a previous/ directory.
|
||||||
|
replicaInfo.breakHardLinksIfNeeded();
|
||||||
|
|
||||||
// construct a RBW replica with the new GS
|
// construct a RBW replica with the new GS
|
||||||
File blkfile = replicaInfo.getBlockFile();
|
File blkfile = replicaInfo.getBlockFile();
|
||||||
|
@ -2493,6 +2497,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
+ ", rur=" + rur);
|
+ ", rur=" + rur);
|
||||||
}
|
}
|
||||||
if (rur.getNumBytes() > newlength) {
|
if (rur.getNumBytes() > newlength) {
|
||||||
|
rur.breakHardLinksIfNeeded();
|
||||||
truncateBlock(blockFile, metaFile, rur.getNumBytes(), newlength);
|
truncateBlock(blockFile, metaFile, rur.getNumBytes(), newlength);
|
||||||
if(!copyOnTruncate) {
|
if(!copyOnTruncate) {
|
||||||
// update RUR with the new length
|
// update RUR with the new length
|
||||||
|
|
|
@ -43,6 +43,8 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
|
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.hadoop.ipc.RemoteException;
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
@ -109,6 +111,70 @@ public class TestFileAppend{
|
||||||
expected, "Read 1", false);
|
expected, "Read 1", false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testBreakHardlinksIfNeeded() throws IOException {
|
||||||
|
Configuration conf = new HdfsConfiguration();
|
||||||
|
if (simulatedStorage) {
|
||||||
|
SimulatedFSDataset.setFactory(conf);
|
||||||
|
}
|
||||||
|
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
|
||||||
|
FileSystem fs = cluster.getFileSystem();
|
||||||
|
InetSocketAddress addr = new InetSocketAddress("localhost",
|
||||||
|
cluster.getNameNodePort());
|
||||||
|
DFSClient client = new DFSClient(addr, conf);
|
||||||
|
try {
|
||||||
|
// create a new file, write to it and close it.
|
||||||
|
Path file1 = new Path("/filestatus.dat");
|
||||||
|
FSDataOutputStream stm = AppendTestUtil.createFile(fs, file1, 1);
|
||||||
|
writeFile(stm);
|
||||||
|
stm.close();
|
||||||
|
|
||||||
|
// Get a handle to the datanode
|
||||||
|
DataNode[] dn = cluster.listDataNodes();
|
||||||
|
assertTrue("There should be only one datanode but found " + dn.length,
|
||||||
|
dn.length == 1);
|
||||||
|
|
||||||
|
LocatedBlocks locations = client.getNamenode().getBlockLocations(
|
||||||
|
file1.toString(), 0, Long.MAX_VALUE);
|
||||||
|
List<LocatedBlock> blocks = locations.getLocatedBlocks();
|
||||||
|
final FsDatasetSpi<?> fsd = dn[0].getFSDataset();
|
||||||
|
|
||||||
|
//
|
||||||
|
// Create hard links for a few of the blocks
|
||||||
|
//
|
||||||
|
for (int i = 0; i < blocks.size(); i = i + 2) {
|
||||||
|
ExtendedBlock b = blocks.get(i).getBlock();
|
||||||
|
final File f = FsDatasetTestUtil.getBlockFile(
|
||||||
|
fsd, b.getBlockPoolId(), b.getLocalBlock());
|
||||||
|
File link = new File(f.toString() + ".link");
|
||||||
|
System.out.println("Creating hardlink for File " + f + " to " + link);
|
||||||
|
HardLink.createHardLink(f, link);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Detach all blocks. This should remove hardlinks (if any)
|
||||||
|
for (int i = 0; i < blocks.size(); i++) {
|
||||||
|
ExtendedBlock b = blocks.get(i).getBlock();
|
||||||
|
System.out.println("breakHardlinksIfNeeded detaching block " + b);
|
||||||
|
assertTrue("breakHardlinksIfNeeded(" + b + ") should have returned true",
|
||||||
|
FsDatasetTestUtil.breakHardlinksIfNeeded(fsd, b));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Since the blocks were already detached earlier, these calls should
|
||||||
|
// return false
|
||||||
|
for (int i = 0; i < blocks.size(); i++) {
|
||||||
|
ExtendedBlock b = blocks.get(i).getBlock();
|
||||||
|
System.out.println("breakHardlinksIfNeeded re-attempting to " +
|
||||||
|
"detach block " + b);
|
||||||
|
assertTrue("breakHardlinksIfNeeded(" + b + ") should have returned false",
|
||||||
|
FsDatasetTestUtil.breakHardlinksIfNeeded(fsd, b));
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
client.close();
|
||||||
|
fs.close();
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test a simple flush on a simple HDFS file.
|
* Test a simple flush on a simple HDFS file.
|
||||||
* @throws IOException an exception might be thrown
|
* @throws IOException an exception might be thrown
|
||||||
|
|
|
@ -55,6 +55,12 @@ public class FsDatasetTestUtil {
|
||||||
.getGenerationStamp());
|
.getGenerationStamp());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static boolean breakHardlinksIfNeeded(FsDatasetSpi<?> fsd,
|
||||||
|
ExtendedBlock block) throws IOException {
|
||||||
|
final ReplicaInfo info = ((FsDatasetImpl)fsd).getReplicaInfo(block);
|
||||||
|
return info.breakHardLinksIfNeeded();
|
||||||
|
}
|
||||||
|
|
||||||
public static ReplicaInfo fetchReplicaInfo (final FsDatasetSpi<?> fsd,
|
public static ReplicaInfo fetchReplicaInfo (final FsDatasetSpi<?> fsd,
|
||||||
final String bpid, final long blockId) {
|
final String bpid, final long blockId) {
|
||||||
return ((FsDatasetImpl)fsd).fetchReplicaInfo(bpid, blockId);
|
return ((FsDatasetImpl)fsd).fetchReplicaInfo(bpid, blockId);
|
||||||
|
|
Loading…
Reference in New Issue