HDFS-9589. Block files which have been hardlinked should be duplicated before the DataNode appends to the them (cmccabe)
(cherry picked from commit bb540ba85aa37d9fe31e640665158afe8a936230) (cherry picked from commit a56c5ef636413052fec62cee1a38b7e1455e389d)
This commit is contained in:
parent
546178a5de
commit
9218890c7f
@ -1620,6 +1620,9 @@ Release 2.8.0 - UNRELEASED
|
|||||||
HDFS-9484. NNThroughputBenchmark$BlockReportStats should not send empty
|
HDFS-9484. NNThroughputBenchmark$BlockReportStats should not send empty
|
||||||
block reports. (Mingliang Liu via shv)
|
block reports. (Mingliang Liu via shv)
|
||||||
|
|
||||||
|
HDFS-9589. Block files which have been hardlinked should be duplicated
|
||||||
|
before the DataNode appends to the them (cmccabe)
|
||||||
|
|
||||||
Release 2.7.3 - UNRELEASED
|
Release 2.7.3 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
@ -18,12 +18,18 @@
|
|||||||
package org.apache.hadoop.hdfs.server.datanode;
|
package org.apache.hadoop.hdfs.server.datanode;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
|
import java.io.FileInputStream;
|
||||||
|
import java.io.FileOutputStream;
|
||||||
|
import java.io.IOException;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.fs.FileUtil;
|
||||||
|
import org.apache.hadoop.fs.HardLink;
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||||
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.hadoop.util.LightWeightResizableGSet;
|
import org.apache.hadoop.util.LightWeightResizableGSet;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
@ -210,6 +216,79 @@ public long getOriginalBytesReserved() {
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Copy specified file into a temporary file. Then rename the
|
||||||
|
* temporary file to the original name. This will cause any
|
||||||
|
* hardlinks to the original file to be removed. The temporary
|
||||||
|
* files are created in the same directory. The temporary files will
|
||||||
|
* be recovered (especially on Windows) on datanode restart.
|
||||||
|
*/
|
||||||
|
private void breakHardlinks(File file, Block b) throws IOException {
|
||||||
|
File tmpFile = DatanodeUtil.createTmpFile(b, DatanodeUtil.getUnlinkTmpFile(file));
|
||||||
|
try {
|
||||||
|
FileInputStream in = new FileInputStream(file);
|
||||||
|
try {
|
||||||
|
FileOutputStream out = new FileOutputStream(tmpFile);
|
||||||
|
try {
|
||||||
|
IOUtils.copyBytes(in, out, 16 * 1024);
|
||||||
|
} finally {
|
||||||
|
out.close();
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
in.close();
|
||||||
|
}
|
||||||
|
if (file.length() != tmpFile.length()) {
|
||||||
|
throw new IOException("Copy of file " + file + " size " + file.length()+
|
||||||
|
" into file " + tmpFile +
|
||||||
|
" resulted in a size of " + tmpFile.length());
|
||||||
|
}
|
||||||
|
FileUtil.replaceFile(tmpFile, file);
|
||||||
|
} catch (IOException e) {
|
||||||
|
boolean done = tmpFile.delete();
|
||||||
|
if (!done) {
|
||||||
|
DataNode.LOG.info("detachFile failed to delete temporary file " +
|
||||||
|
tmpFile);
|
||||||
|
}
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This function "breaks hardlinks" to the current replica file.
|
||||||
|
*
|
||||||
|
* When doing a DataNode upgrade, we create a bunch of hardlinks to each block
|
||||||
|
* file. This cleverly ensures that both the old and the new storage
|
||||||
|
* directories can contain the same block file, without using additional space
|
||||||
|
* for the data.
|
||||||
|
*
|
||||||
|
* However, when we want to append to the replica file, we need to "break" the
|
||||||
|
* hardlink to ensure that the old snapshot continues to contain the old data
|
||||||
|
* length. If we failed to do that, we could roll back to the previous/
|
||||||
|
* directory during a downgrade, and find that the block contents were longer
|
||||||
|
* than they were at the time of upgrade.
|
||||||
|
*
|
||||||
|
* @return true only if data was copied.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public boolean breakHardLinksIfNeeded() throws IOException {
|
||||||
|
File file = getBlockFile();
|
||||||
|
if (file == null || getVolume() == null) {
|
||||||
|
throw new IOException("detachBlock:Block not found. " + this);
|
||||||
|
}
|
||||||
|
File meta = getMetaFile();
|
||||||
|
|
||||||
|
int linkCount = HardLink.getLinkCount(file);
|
||||||
|
if (linkCount > 1) {
|
||||||
|
DataNode.LOG.info("Breaking hardlink for " + linkCount + "x-linked " +
|
||||||
|
"block " + this);
|
||||||
|
breakHardlinks(file, this);
|
||||||
|
}
|
||||||
|
if (HardLink.getLinkCount(meta) > 1) {
|
||||||
|
breakHardlinks(meta, this);
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
@Override //Object
|
@Override //Object
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return getClass().getSimpleName()
|
return getClass().getSimpleName()
|
||||||
|
@ -1134,6 +1134,10 @@ private synchronized ReplicaBeingWritten append(String bpid,
|
|||||||
// If the block is cached, start uncaching it.
|
// If the block is cached, start uncaching it.
|
||||||
cacheManager.uncacheBlock(bpid, replicaInfo.getBlockId());
|
cacheManager.uncacheBlock(bpid, replicaInfo.getBlockId());
|
||||||
|
|
||||||
|
// If there are any hardlinks to the block, break them. This ensures we are
|
||||||
|
// not appending to a file that is part of a previous/ directory.
|
||||||
|
replicaInfo.breakHardLinksIfNeeded();
|
||||||
|
|
||||||
// construct a RBW replica with the new GS
|
// construct a RBW replica with the new GS
|
||||||
File blkfile = replicaInfo.getBlockFile();
|
File blkfile = replicaInfo.getBlockFile();
|
||||||
FsVolumeImpl v = (FsVolumeImpl)replicaInfo.getVolume();
|
FsVolumeImpl v = (FsVolumeImpl)replicaInfo.getVolume();
|
||||||
@ -2498,6 +2502,7 @@ private FinalizedReplica updateReplicaUnderRecovery(
|
|||||||
+ ", rur=" + rur);
|
+ ", rur=" + rur);
|
||||||
}
|
}
|
||||||
if (rur.getNumBytes() > newlength) {
|
if (rur.getNumBytes() > newlength) {
|
||||||
|
rur.breakHardLinksIfNeeded();
|
||||||
truncateBlock(blockFile, metaFile, rur.getNumBytes(), newlength);
|
truncateBlock(blockFile, metaFile, rur.getNumBytes(), newlength);
|
||||||
if(!copyOnTruncate) {
|
if(!copyOnTruncate) {
|
||||||
// update RUR with the new length
|
// update RUR with the new length
|
||||||
|
@ -43,6 +43,8 @@
|
|||||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
|
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.hadoop.ipc.RemoteException;
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
@ -109,6 +111,70 @@ private void checkFile(DistributedFileSystem fileSys, Path name, int repl)
|
|||||||
expected, "Read 1", false);
|
expected, "Read 1", false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testBreakHardlinksIfNeeded() throws IOException {
|
||||||
|
Configuration conf = new HdfsConfiguration();
|
||||||
|
if (simulatedStorage) {
|
||||||
|
SimulatedFSDataset.setFactory(conf);
|
||||||
|
}
|
||||||
|
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
|
||||||
|
FileSystem fs = cluster.getFileSystem();
|
||||||
|
InetSocketAddress addr = new InetSocketAddress("localhost",
|
||||||
|
cluster.getNameNodePort());
|
||||||
|
DFSClient client = new DFSClient(addr, conf);
|
||||||
|
try {
|
||||||
|
// create a new file, write to it and close it.
|
||||||
|
Path file1 = new Path("/filestatus.dat");
|
||||||
|
FSDataOutputStream stm = AppendTestUtil.createFile(fs, file1, 1);
|
||||||
|
writeFile(stm);
|
||||||
|
stm.close();
|
||||||
|
|
||||||
|
// Get a handle to the datanode
|
||||||
|
DataNode[] dn = cluster.listDataNodes();
|
||||||
|
assertTrue("There should be only one datanode but found " + dn.length,
|
||||||
|
dn.length == 1);
|
||||||
|
|
||||||
|
LocatedBlocks locations = client.getNamenode().getBlockLocations(
|
||||||
|
file1.toString(), 0, Long.MAX_VALUE);
|
||||||
|
List<LocatedBlock> blocks = locations.getLocatedBlocks();
|
||||||
|
final FsDatasetSpi<?> fsd = dn[0].getFSDataset();
|
||||||
|
|
||||||
|
//
|
||||||
|
// Create hard links for a few of the blocks
|
||||||
|
//
|
||||||
|
for (int i = 0; i < blocks.size(); i = i + 2) {
|
||||||
|
ExtendedBlock b = blocks.get(i).getBlock();
|
||||||
|
final File f = FsDatasetTestUtil.getBlockFile(
|
||||||
|
fsd, b.getBlockPoolId(), b.getLocalBlock());
|
||||||
|
File link = new File(f.toString() + ".link");
|
||||||
|
System.out.println("Creating hardlink for File " + f + " to " + link);
|
||||||
|
HardLink.createHardLink(f, link);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Detach all blocks. This should remove hardlinks (if any)
|
||||||
|
for (int i = 0; i < blocks.size(); i++) {
|
||||||
|
ExtendedBlock b = blocks.get(i).getBlock();
|
||||||
|
System.out.println("breakHardlinksIfNeeded detaching block " + b);
|
||||||
|
assertTrue("breakHardlinksIfNeeded(" + b + ") should have returned true",
|
||||||
|
FsDatasetTestUtil.breakHardlinksIfNeeded(fsd, b));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Since the blocks were already detached earlier, these calls should
|
||||||
|
// return false
|
||||||
|
for (int i = 0; i < blocks.size(); i++) {
|
||||||
|
ExtendedBlock b = blocks.get(i).getBlock();
|
||||||
|
System.out.println("breakHardlinksIfNeeded re-attempting to " +
|
||||||
|
"detach block " + b);
|
||||||
|
assertTrue("breakHardlinksIfNeeded(" + b + ") should have returned false",
|
||||||
|
FsDatasetTestUtil.breakHardlinksIfNeeded(fsd, b));
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
client.close();
|
||||||
|
fs.close();
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test a simple flush on a simple HDFS file.
|
* Test a simple flush on a simple HDFS file.
|
||||||
* @throws IOException an exception might be thrown
|
* @throws IOException an exception might be thrown
|
||||||
|
@ -55,6 +55,12 @@ public static File getMetaFile(FsDatasetSpi<?> fsd, String bpid, Block b)
|
|||||||
.getGenerationStamp());
|
.getGenerationStamp());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static boolean breakHardlinksIfNeeded(FsDatasetSpi<?> fsd,
|
||||||
|
ExtendedBlock block) throws IOException {
|
||||||
|
final ReplicaInfo info = ((FsDatasetImpl)fsd).getReplicaInfo(block);
|
||||||
|
return info.breakHardLinksIfNeeded();
|
||||||
|
}
|
||||||
|
|
||||||
public static ReplicaInfo fetchReplicaInfo (final FsDatasetSpi<?> fsd,
|
public static ReplicaInfo fetchReplicaInfo (final FsDatasetSpi<?> fsd,
|
||||||
final String bpid, final long blockId) {
|
final String bpid, final long blockId) {
|
||||||
return ((FsDatasetImpl)fsd).fetchReplicaInfo(bpid, blockId);
|
return ((FsDatasetImpl)fsd).fetchReplicaInfo(bpid, blockId);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user