From 91a5d92916c6c1b0475d5794c3855b53b020d4ec Mon Sep 17 00:00:00 2001 From: Vinayakumar B Date: Thu, 12 Feb 2015 12:38:44 +0530 Subject: [PATCH] HDFS-7703. Support favouredNodes for the append for new blocks ( Contributed by Vinayakumar B) (cherry picked from commit 89a544928083501625bc69f96b530040228f0a5f) Conflicts: hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 ++ .../org/apache/hadoop/hdfs/DFSClient.java | 53 ++++++++++++++----- .../apache/hadoop/hdfs/DFSOutputStream.java | 7 ++- .../hadoop/hdfs/DistributedFileSystem.java | 43 +++++++++++++++ .../namenode/TestFavoredNodesEndToEnd.java | 29 ++++++++++ 5 files changed, 121 insertions(+), 14 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 6bfa34ca411..a1b20538b2a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -324,6 +324,9 @@ Release 2.7.0 - UNRELEASED HDFS-7761. cleanup unnecssary code logic in LocatedBlock. (yliu) + HDFS-7703. Support favouredNodes for the append for new blocks + (vinayakumarb) + HDFS-7694. FSDataInputStream should support "unbuffer" (cmccabe) HDFS-7684. The host:port settings of the daemons should be trimmed before diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index d27197f867e..3c0ec997959 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -1693,6 +1693,15 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, if(LOG.isDebugEnabled()) { LOG.debug(src + ": masked=" + masked); } + final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this, + src, masked, flag, createParent, replication, blockSize, progress, + buffersize, dfsClientConf.createChecksum(checksumOpt), + getFavoredNodesStr(favoredNodes)); + beginFileLease(result.getFileId(), result); + return result; + } + + private String[] getFavoredNodesStr(InetSocketAddress[] favoredNodes) { String[] favoredNodeStrs = null; if (favoredNodes != null) { favoredNodeStrs = new String[favoredNodes.length]; @@ -1702,12 +1711,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, + favoredNodes[i].getPort(); } } - final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this, - src, masked, flag, createParent, replication, blockSize, progress, - buffersize, dfsClientConf.createChecksum(checksumOpt), - favoredNodeStrs); - beginFileLease(result.getFileId(), result); - return result; + return favoredNodeStrs; } /** @@ -1725,7 +1729,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, } return null; } - return callAppend(src, buffersize, flag, progress); + return callAppend(src, buffersize, flag, progress, null); } return null; } @@ -1804,7 +1808,8 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, /** Method to get stream returned by append call */ private DFSOutputStream callAppend(String src, int buffersize, - EnumSet flag, Progressable progress) throws IOException { + EnumSet flag, Progressable progress, String[] favoredNodes) + throws IOException { CreateFlag.validateForAppend(flag); try { LastBlockWithStatus blkWithStatus = namenode.append(src, clientName, @@ -1812,7 +1817,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, return DFSOutputStream.newStreamForAppend(this, src, flag.contains(CreateFlag.NEW_BLOCK), buffersize, progress, blkWithStatus.getLastBlock(), - blkWithStatus.getFileStatus(), dfsClientConf.createChecksum()); + blkWithStatus.getFileStatus(), dfsClientConf.createChecksum(), favoredNodes); } catch(RemoteException re) { throw re.unwrapRemoteException(AccessControlException.class, FileNotFoundException.class, @@ -1840,14 +1845,38 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, public HdfsDataOutputStream append(final String src, final int buffersize, EnumSet flag, final Progressable progress, final FileSystem.Statistics statistics) throws IOException { - final DFSOutputStream out = append(src, buffersize, flag, progress); + final DFSOutputStream out = append(src, buffersize, flag, null, progress); + return createWrappedOutputStream(out, statistics, out.getInitialLen()); + } + + /** + * Append to an existing HDFS file. + * + * @param src file name + * @param buffersize buffer size + * @param flag indicates whether to append data to a new block instead of the + * last block + * @param progress for reporting write-progress; null is acceptable. + * @param statistics file system statistics; null is acceptable. + * @param favoredNodes FavoredNodes for new blocks + * @return an output stream for writing into the file + * @see ClientProtocol#append(String, String, EnumSetWritable) + */ + public HdfsDataOutputStream append(final String src, final int buffersize, + EnumSet flag, final Progressable progress, + final FileSystem.Statistics statistics, + final InetSocketAddress[] favoredNodes) throws IOException { + final DFSOutputStream out = append(src, buffersize, flag, + getFavoredNodesStr(favoredNodes), progress); return createWrappedOutputStream(out, statistics, out.getInitialLen()); } private DFSOutputStream append(String src, int buffersize, - EnumSet flag, Progressable progress) throws IOException { + EnumSet flag, String[] favoredNodes, Progressable progress) + throws IOException { checkOpen(); - final DFSOutputStream result = callAppend(src, buffersize, flag, progress); + final DFSOutputStream result = callAppend(src, buffersize, flag, progress, + favoredNodes); beginFileLease(result.getFileId(), result); return result; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index 47b6f363e8a..3ed957bd2e1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -1822,10 +1822,13 @@ public class DFSOutputStream extends FSOutputSummer static DFSOutputStream newStreamForAppend(DFSClient dfsClient, String src, boolean toNewBlock, int bufferSize, Progressable progress, - LocatedBlock lastBlock, HdfsFileStatus stat, DataChecksum checksum) - throws IOException { + LocatedBlock lastBlock, HdfsFileStatus stat, DataChecksum checksum, + String[] favoredNodes) throws IOException { final DFSOutputStream out = new DFSOutputStream(dfsClient, src, toNewBlock, progress, lastBlock, stat, checksum); + if (favoredNodes != null && favoredNodes.length != 0) { + out.streamer.setFavoredNodes(favoredNodes); + } out.start(); return out; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java index 2cecdfbface..136ef156abe 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java @@ -317,6 +317,17 @@ public class DistributedFileSystem extends FileSystem { return append(f, EnumSet.of(CreateFlag.APPEND), bufferSize, progress); } + /** + * Append to an existing file (optional operation). + * + * @param f the existing file to be appended. + * @param flag Flags for the Append operation. CreateFlag.APPEND is mandatory + * to be present. + * @param bufferSize the size of the buffer to be used. + * @param progress for reporting progress if it is not null. + * @return Returns instance of {@link FSDataOutputStream} + * @throws IOException + */ public FSDataOutputStream append(Path f, final EnumSet flag, final int bufferSize, final Progressable progress) throws IOException { statistics.incrementWriteOps(1); @@ -336,6 +347,38 @@ public class DistributedFileSystem extends FileSystem { }.resolve(this, absF); } + /** + * Append to an existing file (optional operation). + * + * @param f the existing file to be appended. + * @param flag Flags for the Append operation. CreateFlag.APPEND is mandatory + * to be present. + * @param bufferSize the size of the buffer to be used. + * @param progress for reporting progress if it is not null. + * @param favoredNodes Favored nodes for new blocks + * @return Returns instance of {@link FSDataOutputStream} + * @throws IOException + */ + public FSDataOutputStream append(Path f, final EnumSet flag, + final int bufferSize, final Progressable progress, + final InetSocketAddress[] favoredNodes) throws IOException { + statistics.incrementWriteOps(1); + Path absF = fixRelativePart(f); + return new FileSystemLinkResolver() { + @Override + public FSDataOutputStream doCall(final Path p) + throws IOException { + return dfs.append(getPathName(p), bufferSize, flag, progress, + statistics, favoredNodes); + } + @Override + public FSDataOutputStream next(final FileSystem fs, final Path p) + throws IOException { + return fs.append(p, bufferSize); + } + }.resolve(this, absF); + } + @Override public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, int bufferSize, short replication, long blockSize, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFavoredNodesEndToEnd.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFavoredNodesEndToEnd.java index 4f110372be8..2d39896b1e7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFavoredNodesEndToEnd.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFavoredNodesEndToEnd.java @@ -26,12 +26,14 @@ import java.net.InetSocketAddress; import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Arrays; +import java.util.EnumSet; import java.util.Random; import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.impl.Log4JLogger; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; @@ -159,6 +161,33 @@ public class TestFavoredNodesEndToEnd { } } + @Test(timeout = 180000) + public void testFavoredNodesEndToEndForAppend() throws Exception { + // create 10 files with random preferred nodes + for (int i = 0; i < NUM_FILES; i++) { + Random rand = new Random(System.currentTimeMillis() + i); + // pass a new created rand so as to get a uniform distribution each time + // without too much collisions (look at the do-while loop in getDatanodes) + InetSocketAddress datanode[] = getDatanodes(rand); + Path p = new Path("/filename" + i); + // create and close the file. + dfs.create(p, FsPermission.getDefault(), true, 4096, (short) 3, 4096L, + null, null).close(); + // re-open for append + FSDataOutputStream out = dfs.append(p, EnumSet.of(CreateFlag.APPEND), + 4096, null, datanode); + out.write(SOME_BYTES); + out.close(); + BlockLocation[] locations = getBlockLocations(p); + // verify the files got created in the right nodes + for (BlockLocation loc : locations) { + String[] hosts = loc.getNames(); + String[] hosts1 = getStringForInetSocketAddrs(datanode); + assertTrue(compareNodes(hosts, hosts1)); + } + } + } + private BlockLocation[] getBlockLocations(Path p) throws Exception { DFSTestUtil.waitReplication(dfs, p, (short)3); BlockLocation[] locations = dfs.getClient().getBlockLocations(