Revert "HDFS-8860. Remove unused Replica copyOnWrite code (Lei (Eddy) Xu via Colin P. McCabe)"

This reverts commit a153b9601a.

Conflicts:
	hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
	hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java
This commit is contained in:
Lei Xu 2015-12-10 10:57:33 -10:00
parent 21daa6c68a
commit 7f393a6f61
10 changed files with 294 additions and 4 deletions

View File

@ -1448,8 +1448,6 @@ Release 2.8.0 - UNRELEASED
HDFS-9019. Adding informative message to sticky bit permission denied
exception. (xyao)
HDFS-8860. Remove unused Replica copyOnWrite code (Lei (Eddy) Xu via Colin P. McCabe)
HDFS-8716. Introduce a new config specifically for safe mode block count
(Chang Li via kihwal)

View File

@ -27,6 +27,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
* This class describes a replica that has been finalized.
*/
public class FinalizedReplica extends ReplicaInfo {
private boolean unlinked; // copy-on-write done for block
/**
* Constructor
@ -57,6 +58,7 @@ public class FinalizedReplica extends ReplicaInfo {
*/
public FinalizedReplica(FinalizedReplica from) {
super(from);
this.unlinked = from.isUnlinked();
}
@Override // ReplicaInfo
@ -64,6 +66,16 @@ public class FinalizedReplica extends ReplicaInfo {
return ReplicaState.FINALIZED;
}
@Override // ReplicaInfo
public boolean isUnlinked() {
return unlinked;
}
@Override // ReplicaInfo
public void setUnlinked() {
unlinked = true;
}
@Override
public long getVisibleLength() {
return getNumBytes(); // all bytes are visible
@ -86,6 +98,7 @@ public class FinalizedReplica extends ReplicaInfo {
@Override
public String toString() {
return super.toString();
return super.toString()
+ "\n unlinked =" + unlinked;
}
}

View File

@ -18,12 +18,18 @@
package org.apache.hadoop.hdfs.server.datanode;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.HardLink;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.LightWeightResizableGSet;
import com.google.common.annotations.VisibleForTesting;
@ -193,6 +199,22 @@ abstract public class ReplicaInfo extends Block
return new ReplicaDirInfo(currentDir.getAbsolutePath(), hasSubdirs);
}
/**
* check if this replica has already been unlinked.
* @return true if the replica has already been unlinked
* or no need to be detached; false otherwise
*/
public boolean isUnlinked() {
return true; // no need to be unlinked
}
/**
* set that this replica is unlinked
*/
public void setUnlinked() {
// no need to be unlinked
}
/**
* Number of bytes reserved for this replica on disk.
*/
@ -210,6 +232,72 @@ abstract public class ReplicaInfo extends Block
return 0;
}
/**
* Copy specified file into a temporary file. Then rename the
* temporary file to the original name. This will cause any
* hardlinks to the original file to be removed. The temporary
* files are created in the same directory. The temporary files will
* be recovered (especially on Windows) on datanode restart.
*/
private void unlinkFile(File file, Block b) throws IOException {
File tmpFile = DatanodeUtil.createTmpFile(b, DatanodeUtil.getUnlinkTmpFile(file));
try {
FileInputStream in = new FileInputStream(file);
try {
FileOutputStream out = new FileOutputStream(tmpFile);
try {
IOUtils.copyBytes(in, out, 16*1024);
} finally {
out.close();
}
} finally {
in.close();
}
if (file.length() != tmpFile.length()) {
throw new IOException("Copy of file " + file + " size " + file.length()+
" into file " + tmpFile +
" resulted in a size of " + tmpFile.length());
}
FileUtil.replaceFile(tmpFile, file);
} catch (IOException e) {
boolean done = tmpFile.delete();
if (!done) {
DataNode.LOG.info("detachFile failed to delete temporary file " +
tmpFile);
}
throw e;
}
}
/**
* Remove a hard link by copying the block to a temporary place and
* then moving it back
* @param numLinks number of hard links
* @return true if copy is successful;
* false if it is already detached or no need to be detached
* @throws IOException if there is any copy error
*/
public boolean unlinkBlock(int numLinks) throws IOException {
if (isUnlinked()) {
return false;
}
File file = getBlockFile();
if (file == null || getVolume() == null) {
throw new IOException("detachBlock:Block not found. " + this);
}
File meta = getMetaFile();
if (HardLink.getLinkCount(file) > numLinks) {
DataNode.LOG.info("CopyOnWrite for block " + this);
unlinkFile(file, this);
}
if (HardLink.getLinkCount(meta) > numLinks) {
unlinkFile(meta, this);
}
setUnlinked();
return true;
}
@Override //Object
public String toString() {
return getClass().getSimpleName()

View File

@ -85,6 +85,16 @@ public class ReplicaUnderRecovery extends ReplicaInfo {
public ReplicaInfo getOriginalReplica() {
return original;
}
@Override //ReplicaInfo
public boolean isUnlinked() {
return original.isUnlinked();
}
@Override //ReplicaInfo
public void setUnlinked() {
original.setUnlinked();
}
@Override //ReplicaInfo
public ReplicaState getState() {

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
* lease recovery.
*/
public class ReplicaWaitingToBeRecovered extends ReplicaInfo {
private boolean unlinked; // copy-on-write done for block
/**
* Constructor
@ -63,6 +64,7 @@ public class ReplicaWaitingToBeRecovered extends ReplicaInfo {
*/
public ReplicaWaitingToBeRecovered(ReplicaWaitingToBeRecovered from) {
super(from);
this.unlinked = from.isUnlinked();
}
@Override //ReplicaInfo
@ -70,6 +72,16 @@ public class ReplicaWaitingToBeRecovered extends ReplicaInfo {
return ReplicaState.RWR;
}
@Override //ReplicaInfo
public boolean isUnlinked() {
return unlinked;
}
@Override //ReplicaInfo
public void setUnlinked() {
unlinked = true;
}
@Override //ReplicaInfo
public long getVisibleLength() {
return -1; //no bytes are visible
@ -92,6 +104,7 @@ public class ReplicaWaitingToBeRecovered extends ReplicaInfo {
@Override
public String toString() {
return super.toString();
return super.toString()
+ "\n unlinked=" + unlinked;
}
}

View File

@ -1109,6 +1109,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
throws IOException {
// If the block is cached, start uncaching it.
cacheManager.uncacheBlock(bpid, replicaInfo.getBlockId());
// unlink the finalized replica
replicaInfo.unlinkBlock(1);
// construct a RBW replica with the new GS
File blkfile = replicaInfo.getBlockFile();
@ -2478,6 +2480,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
+ ", rur=" + rur);
}
if (rur.getNumBytes() > newlength) {
rur.unlinkBlock(1);
truncateBlock(blockFile, metaFile, rur.getNumBytes(), newlength);
if(!copyOnTruncate) {
// update RUR with the new length

View File

@ -109,6 +109,78 @@ public class TestFileAppend{
expected, "Read 1", false);
}
/**
* Test that copy on write for blocks works correctly
* @throws IOException an exception might be thrown
*/
@Test
public void testCopyOnWrite() throws IOException {
Configuration conf = new HdfsConfiguration();
if (simulatedStorage) {
SimulatedFSDataset.setFactory(conf);
}
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
FileSystem fs = cluster.getFileSystem();
InetSocketAddress addr = new InetSocketAddress("localhost",
cluster.getNameNodePort());
DFSClient client = new DFSClient(addr, conf);
try {
// create a new file, write to it and close it.
//
Path file1 = new Path("/filestatus.dat");
FSDataOutputStream stm = AppendTestUtil.createFile(fs, file1, 1);
writeFile(stm);
stm.close();
// Get a handle to the datanode
DataNode[] dn = cluster.listDataNodes();
assertTrue("There should be only one datanode but found " + dn.length,
dn.length == 1);
LocatedBlocks locations = client.getNamenode().getBlockLocations(
file1.toString(), 0, Long.MAX_VALUE);
List<LocatedBlock> blocks = locations.getLocatedBlocks();
//
// Create hard links for a few of the blocks
//
for (int i = 0; i < blocks.size(); i = i + 2) {
ExtendedBlock b = blocks.get(i).getBlock();
final File f = DataNodeTestUtils.getFile(dn[0],
b.getBlockPoolId(), b.getLocalBlock().getBlockId());
File link = new File(f.toString() + ".link");
System.out.println("Creating hardlink for File " + f + " to " + link);
HardLink.createHardLink(f, link);
}
//
// Detach all blocks. This should remove hardlinks (if any)
//
for (int i = 0; i < blocks.size(); i++) {
ExtendedBlock b = blocks.get(i).getBlock();
System.out.println("testCopyOnWrite detaching block " + b);
assertTrue("Detaching block " + b + " should have returned true",
DataNodeTestUtils.unlinkBlock(dn[0], b, 1));
}
// Since the blocks were already detached earlier, these calls should
// return false
//
for (int i = 0; i < blocks.size(); i++) {
ExtendedBlock b = blocks.get(i).getBlock();
System.out.println("testCopyOnWrite detaching block " + b);
assertTrue("Detaching block " + b + " should have returned false",
!DataNodeTestUtils.unlinkBlock(dn[0], b, 1));
}
} finally {
client.close();
fs.close();
cluster.shutdown();
}
}
/**
* Test a simple flush on a simple HDFS file.
* @throws IOException an exception might be thrown

View File

@ -35,6 +35,7 @@ import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil;
@ -158,6 +159,20 @@ public class DataNodeTestUtils {
return dn.getFSDataset();
}
public static File getFile(DataNode dn, String bpid, long bid) {
return FsDatasetTestUtil.getFile(dn.getFSDataset(), bpid, bid);
}
public static File getBlockFile(DataNode dn, String bpid, Block b
) throws IOException {
return FsDatasetTestUtil.getBlockFile(dn.getFSDataset(), bpid, b);
}
public static boolean unlinkBlock(DataNode dn, ExtendedBlock bk, int numLinks
) throws IOException {
return FsDatasetTestUtil.unlinkBlock(dn.getFSDataset(), bk, numLinks);
}
/**
* Fetch a copy of ReplicaInfo from a datanode by block id
* @param dn datanode to retrieve a replicainfo object from

View File

@ -54,6 +54,12 @@ public class FsDatasetTestUtil {
return FsDatasetUtil.getMetaFile(getBlockFile(fsd, bpid, b), b
.getGenerationStamp());
}
public static boolean unlinkBlock(FsDatasetSpi<?> fsd,
ExtendedBlock block, int numLinks) throws IOException {
final ReplicaInfo info = ((FsDatasetImpl)fsd).getReplicaInfo(block);
return info.unlinkBlock(numLinks);
}
public static ReplicaInfo fetchReplicaInfo (final FsDatasetSpi<?> fsd,
final String bpid, final long blockId) {

View File

@ -143,7 +143,79 @@ public class TestDatanodeRestart {
}
}
// test recovering unlinked tmp replicas
@Test public void testRecoverReplicas() throws Exception {
Configuration conf = new HdfsConfiguration();
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 1024L);
conf.setInt(HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY, 512);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
cluster.waitActive();
try {
FileSystem fs = cluster.getFileSystem();
for (int i=0; i<4; i++) {
Path fileName = new Path("/test"+i);
DFSTestUtil.createFile(fs, fileName, 1, (short)1, 0L);
DFSTestUtil.waitReplication(fs, fileName, (short)1);
}
String bpid = cluster.getNamesystem().getBlockPoolId();
DataNode dn = cluster.getDataNodes().get(0);
Iterator<ReplicaInfo> replicasItor =
dataset(dn).volumeMap.replicas(bpid).iterator();
ReplicaInfo replica = replicasItor.next();
createUnlinkTmpFile(replica, true, true); // rename block file
createUnlinkTmpFile(replica, false, true); // rename meta file
replica = replicasItor.next();
createUnlinkTmpFile(replica, true, false); // copy block file
createUnlinkTmpFile(replica, false, false); // copy meta file
replica = replicasItor.next();
createUnlinkTmpFile(replica, true, true); // rename block file
createUnlinkTmpFile(replica, false, false); // copy meta file
cluster.restartDataNodes();
cluster.waitActive();
dn = cluster.getDataNodes().get(0);
// check volumeMap: 4 finalized replica
Collection<ReplicaInfo> replicas = dataset(dn).volumeMap.replicas(bpid);
Assert.assertEquals(4, replicas.size());
replicasItor = replicas.iterator();
while (replicasItor.hasNext()) {
Assert.assertEquals(ReplicaState.FINALIZED,
replicasItor.next().getState());
}
} finally {
cluster.shutdown();
}
}
private static FsDatasetImpl dataset(DataNode dn) {
return (FsDatasetImpl)DataNodeTestUtils.getFSDataset(dn);
}
private static void createUnlinkTmpFile(ReplicaInfo replicaInfo,
boolean changeBlockFile,
boolean isRename) throws IOException {
File src;
if (changeBlockFile) {
src = replicaInfo.getBlockFile();
} else {
src = replicaInfo.getMetaFile();
}
File dst = DatanodeUtil.getUnlinkTmpFile(src);
if (isRename) {
src.renameTo(dst);
} else {
FileInputStream in = new FileInputStream(src);
try {
FileOutputStream out = new FileOutputStream(dst);
try {
IOUtils.copyBytes(in, out, 1);
} finally {
out.close();
}
} finally {
in.close();
}
}
}
}