HDFS-7703. Support favouredNodes for the append for new blocks ( Contributed by Vinayakumar B)

(cherry picked from commit 89a5449280)

Conflicts:
	hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
This commit is contained in:
Vinayakumar B 2015-02-12 12:38:44 +05:30
parent 54e33baaf6
commit 91a5d92916
5 changed files with 121 additions and 14 deletions

View File

@ -324,6 +324,9 @@ Release 2.7.0 - UNRELEASED
HDFS-7761. cleanup unnecssary code logic in LocatedBlock. (yliu)
HDFS-7703. Support favouredNodes for the append for new blocks
(vinayakumarb)
HDFS-7694. FSDataInputStream should support "unbuffer" (cmccabe)
HDFS-7684. The host:port settings of the daemons should be trimmed before

View File

@ -1693,6 +1693,15 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
if(LOG.isDebugEnabled()) {
LOG.debug(src + ": masked=" + masked);
}
final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this,
src, masked, flag, createParent, replication, blockSize, progress,
buffersize, dfsClientConf.createChecksum(checksumOpt),
getFavoredNodesStr(favoredNodes));
beginFileLease(result.getFileId(), result);
return result;
}
private String[] getFavoredNodesStr(InetSocketAddress[] favoredNodes) {
String[] favoredNodeStrs = null;
if (favoredNodes != null) {
favoredNodeStrs = new String[favoredNodes.length];
@ -1702,12 +1711,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
+ favoredNodes[i].getPort();
}
}
final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this,
src, masked, flag, createParent, replication, blockSize, progress,
buffersize, dfsClientConf.createChecksum(checksumOpt),
favoredNodeStrs);
beginFileLease(result.getFileId(), result);
return result;
return favoredNodeStrs;
}
/**
@ -1725,7 +1729,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
}
return null;
}
return callAppend(src, buffersize, flag, progress);
return callAppend(src, buffersize, flag, progress, null);
}
return null;
}
@ -1804,7 +1808,8 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
/** Method to get stream returned by append call */
private DFSOutputStream callAppend(String src, int buffersize,
EnumSet<CreateFlag> flag, Progressable progress) throws IOException {
EnumSet<CreateFlag> flag, Progressable progress, String[] favoredNodes)
throws IOException {
CreateFlag.validateForAppend(flag);
try {
LastBlockWithStatus blkWithStatus = namenode.append(src, clientName,
@ -1812,7 +1817,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
return DFSOutputStream.newStreamForAppend(this, src,
flag.contains(CreateFlag.NEW_BLOCK),
buffersize, progress, blkWithStatus.getLastBlock(),
blkWithStatus.getFileStatus(), dfsClientConf.createChecksum());
blkWithStatus.getFileStatus(), dfsClientConf.createChecksum(), favoredNodes);
} catch(RemoteException re) {
throw re.unwrapRemoteException(AccessControlException.class,
FileNotFoundException.class,
@ -1840,14 +1845,38 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
public HdfsDataOutputStream append(final String src, final int buffersize,
EnumSet<CreateFlag> flag, final Progressable progress,
final FileSystem.Statistics statistics) throws IOException {
final DFSOutputStream out = append(src, buffersize, flag, progress);
final DFSOutputStream out = append(src, buffersize, flag, null, progress);
return createWrappedOutputStream(out, statistics, out.getInitialLen());
}
/**
* Append to an existing HDFS file.
*
* @param src file name
* @param buffersize buffer size
* @param flag indicates whether to append data to a new block instead of the
* last block
* @param progress for reporting write-progress; null is acceptable.
* @param statistics file system statistics; null is acceptable.
* @param favoredNodes FavoredNodes for new blocks
* @return an output stream for writing into the file
* @see ClientProtocol#append(String, String, EnumSetWritable)
*/
public HdfsDataOutputStream append(final String src, final int buffersize,
EnumSet<CreateFlag> flag, final Progressable progress,
final FileSystem.Statistics statistics,
final InetSocketAddress[] favoredNodes) throws IOException {
final DFSOutputStream out = append(src, buffersize, flag,
getFavoredNodesStr(favoredNodes), progress);
return createWrappedOutputStream(out, statistics, out.getInitialLen());
}
private DFSOutputStream append(String src, int buffersize,
EnumSet<CreateFlag> flag, Progressable progress) throws IOException {
EnumSet<CreateFlag> flag, String[] favoredNodes, Progressable progress)
throws IOException {
checkOpen();
final DFSOutputStream result = callAppend(src, buffersize, flag, progress);
final DFSOutputStream result = callAppend(src, buffersize, flag, progress,
favoredNodes);
beginFileLease(result.getFileId(), result);
return result;
}

View File

@ -1822,10 +1822,13 @@ public class DFSOutputStream extends FSOutputSummer
static DFSOutputStream newStreamForAppend(DFSClient dfsClient, String src,
boolean toNewBlock, int bufferSize, Progressable progress,
LocatedBlock lastBlock, HdfsFileStatus stat, DataChecksum checksum)
throws IOException {
LocatedBlock lastBlock, HdfsFileStatus stat, DataChecksum checksum,
String[] favoredNodes) throws IOException {
final DFSOutputStream out = new DFSOutputStream(dfsClient, src, toNewBlock,
progress, lastBlock, stat, checksum);
if (favoredNodes != null && favoredNodes.length != 0) {
out.streamer.setFavoredNodes(favoredNodes);
}
out.start();
return out;
}

View File

@ -317,6 +317,17 @@ public class DistributedFileSystem extends FileSystem {
return append(f, EnumSet.of(CreateFlag.APPEND), bufferSize, progress);
}
/**
* Append to an existing file (optional operation).
*
* @param f the existing file to be appended.
* @param flag Flags for the Append operation. CreateFlag.APPEND is mandatory
* to be present.
* @param bufferSize the size of the buffer to be used.
* @param progress for reporting progress if it is not null.
* @return Returns instance of {@link FSDataOutputStream}
* @throws IOException
*/
public FSDataOutputStream append(Path f, final EnumSet<CreateFlag> flag,
final int bufferSize, final Progressable progress) throws IOException {
statistics.incrementWriteOps(1);
@ -336,6 +347,38 @@ public class DistributedFileSystem extends FileSystem {
}.resolve(this, absF);
}
/**
* Append to an existing file (optional operation).
*
* @param f the existing file to be appended.
* @param flag Flags for the Append operation. CreateFlag.APPEND is mandatory
* to be present.
* @param bufferSize the size of the buffer to be used.
* @param progress for reporting progress if it is not null.
* @param favoredNodes Favored nodes for new blocks
* @return Returns instance of {@link FSDataOutputStream}
* @throws IOException
*/
public FSDataOutputStream append(Path f, final EnumSet<CreateFlag> flag,
final int bufferSize, final Progressable progress,
final InetSocketAddress[] favoredNodes) throws IOException {
statistics.incrementWriteOps(1);
Path absF = fixRelativePart(f);
return new FileSystemLinkResolver<FSDataOutputStream>() {
@Override
public FSDataOutputStream doCall(final Path p)
throws IOException {
return dfs.append(getPathName(p), bufferSize, flag, progress,
statistics, favoredNodes);
}
@Override
public FSDataOutputStream next(final FileSystem fs, final Path p)
throws IOException {
return fs.append(p, bufferSize);
}
}.resolve(this, absF);
}
@Override
public FSDataOutputStream create(Path f, FsPermission permission,
boolean overwrite, int bufferSize, short replication, long blockSize,

View File

@ -26,12 +26,14 @@ import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.Random;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
@ -159,6 +161,33 @@ public class TestFavoredNodesEndToEnd {
}
}
@Test(timeout = 180000)
public void testFavoredNodesEndToEndForAppend() throws Exception {
// create 10 files with random preferred nodes
for (int i = 0; i < NUM_FILES; i++) {
Random rand = new Random(System.currentTimeMillis() + i);
// pass a new created rand so as to get a uniform distribution each time
// without too much collisions (look at the do-while loop in getDatanodes)
InetSocketAddress datanode[] = getDatanodes(rand);
Path p = new Path("/filename" + i);
// create and close the file.
dfs.create(p, FsPermission.getDefault(), true, 4096, (short) 3, 4096L,
null, null).close();
// re-open for append
FSDataOutputStream out = dfs.append(p, EnumSet.of(CreateFlag.APPEND),
4096, null, datanode);
out.write(SOME_BYTES);
out.close();
BlockLocation[] locations = getBlockLocations(p);
// verify the files got created in the right nodes
for (BlockLocation loc : locations) {
String[] hosts = loc.getNames();
String[] hosts1 = getStringForInetSocketAddrs(datanode);
assertTrue(compareNodes(hosts, hosts1));
}
}
}
private BlockLocation[] getBlockLocations(Path p) throws Exception {
DFSTestUtil.waitReplication(dfs, p, (short)3);
BlockLocation[] locations = dfs.getClient().getBlockLocations(