HDFS-11170. Add builder-based create API to FileSystem. Contributed by SammiChen and Wei Zhou.
This commit is contained in:
parent
c97da481d7
commit
10deafee82
|
@ -0,0 +1,142 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.fs;
|
||||||
|
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
|
import org.apache.hadoop.fs.Options.ChecksumOpt;
|
||||||
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
|
import org.apache.hadoop.util.Progressable;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.EnumSet;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
|
||||||
|
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
|
||||||
|
|
||||||
|
/** Base of specific file system FSDataOutputStreamBuilder. */
|
||||||
|
public class FSDataOutputStreamBuilder{
|
||||||
|
private Path path = null;
|
||||||
|
private FsPermission permission = null;
|
||||||
|
private Integer bufferSize;
|
||||||
|
private Short replication;
|
||||||
|
private Long blockSize;
|
||||||
|
private Progressable progress = null;
|
||||||
|
private EnumSet<CreateFlag> flags = null;
|
||||||
|
private ChecksumOpt checksumOpt = null;
|
||||||
|
|
||||||
|
private final FileSystem fs;
|
||||||
|
|
||||||
|
public FSDataOutputStreamBuilder(FileSystem fileSystem, Path p) {
|
||||||
|
fs = fileSystem;
|
||||||
|
path = p;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected Path getPath() {
|
||||||
|
return path;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected FsPermission getPermission() {
|
||||||
|
if (permission == null) {
|
||||||
|
return FsPermission.getFileDefault();
|
||||||
|
}
|
||||||
|
return permission;
|
||||||
|
}
|
||||||
|
|
||||||
|
public FSDataOutputStreamBuilder setPermission(final FsPermission perm) {
|
||||||
|
Preconditions.checkNotNull(perm);
|
||||||
|
permission = perm;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected int getBufferSize() {
|
||||||
|
if (bufferSize == null) {
|
||||||
|
return fs.getConf().getInt(IO_FILE_BUFFER_SIZE_KEY,
|
||||||
|
IO_FILE_BUFFER_SIZE_DEFAULT);
|
||||||
|
}
|
||||||
|
return bufferSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
public FSDataOutputStreamBuilder setBufferSize(int bufSize) {
|
||||||
|
bufferSize = bufSize;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected short getReplication() {
|
||||||
|
if (replication == null) {
|
||||||
|
return fs.getDefaultReplication(getPath());
|
||||||
|
}
|
||||||
|
return replication;
|
||||||
|
}
|
||||||
|
|
||||||
|
public FSDataOutputStreamBuilder setReplication(short replica) {
|
||||||
|
replication = replica;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected long getBlockSize() {
|
||||||
|
if (blockSize == null) {
|
||||||
|
return fs.getDefaultBlockSize(getPath());
|
||||||
|
}
|
||||||
|
return blockSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
public FSDataOutputStreamBuilder setBlockSize(long blkSize) {
|
||||||
|
blockSize = blkSize;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected Progressable getProgress() {
|
||||||
|
return progress;
|
||||||
|
}
|
||||||
|
|
||||||
|
public FSDataOutputStreamBuilder setProgress(final Progressable prog) {
|
||||||
|
Preconditions.checkNotNull(prog);
|
||||||
|
progress = prog;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected EnumSet<CreateFlag> getFlags() {
|
||||||
|
if (flags == null) {
|
||||||
|
return EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE);
|
||||||
|
}
|
||||||
|
return flags;
|
||||||
|
}
|
||||||
|
|
||||||
|
public FSDataOutputStreamBuilder setFlags(
|
||||||
|
final EnumSet<CreateFlag> enumFlags) {
|
||||||
|
Preconditions.checkNotNull(enumFlags);
|
||||||
|
flags = enumFlags;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected ChecksumOpt getChecksumOpt() {
|
||||||
|
return checksumOpt;
|
||||||
|
}
|
||||||
|
|
||||||
|
public FSDataOutputStreamBuilder setChecksumOpt(
|
||||||
|
final ChecksumOpt chksumOpt) {
|
||||||
|
Preconditions.checkNotNull(chksumOpt);
|
||||||
|
checksumOpt = chksumOpt;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public FSDataOutputStream build() throws IOException {
|
||||||
|
return fs.create(getPath(), getPermission(), getFlags(), getBufferSize(),
|
||||||
|
getReplication(), getBlockSize(), getProgress(), getChecksumOpt());
|
||||||
|
}
|
||||||
|
}
|
|
@ -4122,4 +4122,13 @@ public abstract class FileSystem extends Configured implements Closeable {
|
||||||
public static GlobalStorageStatistics getGlobalStorageStatistics() {
|
public static GlobalStorageStatistics getGlobalStorageStatistics() {
|
||||||
return GlobalStorageStatistics.INSTANCE;
|
return GlobalStorageStatistics.INSTANCE;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a new FSDataOutputStreamBuilder for the file with path.
|
||||||
|
* @param path file path
|
||||||
|
* @return a FSDataOutputStreamBuilder object to build the file
|
||||||
|
*/
|
||||||
|
public FSDataOutputStreamBuilder newFSDataOutputStreamBuilder(Path path) {
|
||||||
|
return new FSDataOutputStreamBuilder(this, path);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -665,4 +665,9 @@ public class FilterFileSystem extends FileSystem {
|
||||||
public Collection<FileStatus> getTrashRoots(boolean allUsers) {
|
public Collection<FileStatus> getTrashRoots(boolean allUsers) {
|
||||||
return fs.getTrashRoots(allUsers);
|
return fs.getTrashRoots(allUsers);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public FSDataOutputStreamBuilder newFSDataOutputStreamBuilder(Path path) {
|
||||||
|
return fs.newFSDataOutputStreamBuilder(path);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1268,4 +1268,9 @@ public class HarFileSystem extends FileSystem {
|
||||||
public short getDefaultReplication(Path f) {
|
public short getDefaultReplication(Path f) {
|
||||||
return fs.getDefaultReplication(f);
|
return fs.getDefaultReplication(f);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public FSDataOutputStreamBuilder newFSDataOutputStreamBuilder(Path path) {
|
||||||
|
return fs.newFSDataOutputStreamBuilder(path);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,10 +19,13 @@ package org.apache.hadoop.fs;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileSystem.Statistics;
|
import org.apache.hadoop.fs.FileSystem.Statistics;
|
||||||
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.apache.hadoop.util.Shell;
|
import org.apache.hadoop.util.Shell;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
|
||||||
|
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
|
||||||
import static org.apache.hadoop.fs.FileSystemTestHelper.*;
|
import static org.apache.hadoop.fs.FileSystemTestHelper.*;
|
||||||
|
|
||||||
import java.io.*;
|
import java.io.*;
|
||||||
|
@ -631,4 +634,55 @@ public class TestLocalFileSystem {
|
||||||
FileStatus[] stats = fs.listStatus(path);
|
FileStatus[] stats = fs.listStatus(path);
|
||||||
assertTrue(stats != null && stats.length == 1 && stats[0] == stat);
|
assertTrue(stats != null && stats.length == 1 && stats[0] == stat);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFSOutputStreamBuilder() throws Exception {
|
||||||
|
Path path = new Path(TEST_ROOT_DIR, "testBuilder");
|
||||||
|
|
||||||
|
try {
|
||||||
|
FSDataOutputStreamBuilder builder =
|
||||||
|
fileSys.newFSDataOutputStreamBuilder(path);
|
||||||
|
FSDataOutputStream out = builder.build();
|
||||||
|
String content = "Create with a generic type of createBuilder!";
|
||||||
|
byte[] contentOrigin = content.getBytes("UTF8");
|
||||||
|
out.write(contentOrigin);
|
||||||
|
out.close();
|
||||||
|
|
||||||
|
FSDataInputStream input = fileSys.open(path);
|
||||||
|
byte[] buffer =
|
||||||
|
new byte[(int) (fileSys.getFileStatus(path).getLen())];
|
||||||
|
input.readFully(0, buffer);
|
||||||
|
input.close();
|
||||||
|
Assert.assertArrayEquals("The data be read should equals with the "
|
||||||
|
+ "data written.", contentOrigin, buffer);
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test value not being set for replication, block size, buffer size
|
||||||
|
// and permission
|
||||||
|
FSDataOutputStreamBuilder builder =
|
||||||
|
fileSys.newFSDataOutputStreamBuilder(path);
|
||||||
|
builder.build();
|
||||||
|
Assert.assertEquals("Should be default block size",
|
||||||
|
builder.getBlockSize(), fileSys.getDefaultBlockSize());
|
||||||
|
Assert.assertEquals("Should be default replication factor",
|
||||||
|
builder.getReplication(), fileSys.getDefaultReplication());
|
||||||
|
Assert.assertEquals("Should be default buffer size",
|
||||||
|
builder.getBufferSize(),
|
||||||
|
fileSys.getConf().getInt(IO_FILE_BUFFER_SIZE_KEY,
|
||||||
|
IO_FILE_BUFFER_SIZE_DEFAULT));
|
||||||
|
Assert.assertEquals("Should be default permission",
|
||||||
|
builder.getPermission(), FsPermission.getFileDefault());
|
||||||
|
|
||||||
|
// Test set 0 to replication, block size and buffer size
|
||||||
|
builder = fileSys.newFSDataOutputStreamBuilder(path);
|
||||||
|
builder.setBufferSize(0).setBlockSize(0).setReplication((short) 0);
|
||||||
|
Assert.assertEquals("Block size should be 0",
|
||||||
|
builder.getBlockSize(), 0);
|
||||||
|
Assert.assertEquals("Replication factor should be 0",
|
||||||
|
builder.getReplication(), 0);
|
||||||
|
Assert.assertEquals("Buffer size should be 0",
|
||||||
|
builder.getBufferSize(), 0);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -40,6 +40,7 @@ import org.apache.hadoop.fs.ContentSummary;
|
||||||
import org.apache.hadoop.fs.CreateFlag;
|
import org.apache.hadoop.fs.CreateFlag;
|
||||||
import org.apache.hadoop.fs.FSDataInputStream;
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
|
import org.apache.hadoop.fs.FSDataOutputStreamBuilder;
|
||||||
import org.apache.hadoop.fs.FSLinkResolver;
|
import org.apache.hadoop.fs.FSLinkResolver;
|
||||||
import org.apache.hadoop.fs.FileChecksum;
|
import org.apache.hadoop.fs.FileChecksum;
|
||||||
import org.apache.hadoop.fs.FileEncryptionInfo;
|
import org.apache.hadoop.fs.FileEncryptionInfo;
|
||||||
|
@ -483,6 +484,48 @@ public class DistributedFileSystem extends FileSystem {
|
||||||
}.resolve(this, absF);
|
}.resolve(this, absF);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Same as
|
||||||
|
* {@link #create(Path, FsPermission, EnumSet<CreateFlag>, int, short, long,
|
||||||
|
* Progressable, ChecksumOpt)} with the addition of favoredNodes that is a
|
||||||
|
* hint to where the namenode should place the file blocks.
|
||||||
|
* The favored nodes hint is not persisted in HDFS. Hence it may be honored
|
||||||
|
* at the creation time only. And with favored nodes, blocks will be pinned
|
||||||
|
* on the datanodes to prevent balancing move the block. HDFS could move the
|
||||||
|
* blocks during replication, to move the blocks from favored nodes. A value
|
||||||
|
* of null means no favored nodes for this create
|
||||||
|
*/
|
||||||
|
private HdfsDataOutputStream create(final Path f,
|
||||||
|
final FsPermission permission, final EnumSet<CreateFlag> flag,
|
||||||
|
final int bufferSize, final short replication, final long blockSize,
|
||||||
|
final Progressable progress, final ChecksumOpt checksumOpt,
|
||||||
|
final InetSocketAddress[] favoredNodes) throws IOException {
|
||||||
|
statistics.incrementWriteOps(1);
|
||||||
|
storageStatistics.incrementOpCounter(OpType.CREATE);
|
||||||
|
Path absF = fixRelativePart(f);
|
||||||
|
return new FileSystemLinkResolver<HdfsDataOutputStream>() {
|
||||||
|
@Override
|
||||||
|
public HdfsDataOutputStream doCall(final Path p) throws IOException {
|
||||||
|
final DFSOutputStream out = dfs.create(getPathName(f), permission,
|
||||||
|
flag, true, replication, blockSize, progress, bufferSize,
|
||||||
|
checksumOpt, favoredNodes);
|
||||||
|
return dfs.createWrappedOutputStream(out, statistics);
|
||||||
|
}
|
||||||
|
@Override
|
||||||
|
public HdfsDataOutputStream next(final FileSystem fs, final Path p)
|
||||||
|
throws IOException {
|
||||||
|
if (fs instanceof DistributedFileSystem) {
|
||||||
|
DistributedFileSystem myDfs = (DistributedFileSystem)fs;
|
||||||
|
return myDfs.create(p, permission, flag, bufferSize, replication,
|
||||||
|
blockSize, progress, checksumOpt, favoredNodes);
|
||||||
|
}
|
||||||
|
throw new UnsupportedOperationException("Cannot create with" +
|
||||||
|
" favoredNodes through a symlink to a non-DistributedFileSystem: "
|
||||||
|
+ f + " -> " + p);
|
||||||
|
}
|
||||||
|
}.resolve(this, absF);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected HdfsDataOutputStream primitiveCreate(Path f,
|
protected HdfsDataOutputStream primitiveCreate(Path f,
|
||||||
FsPermission absolutePermission, EnumSet<CreateFlag> flag, int bufferSize,
|
FsPermission absolutePermission, EnumSet<CreateFlag> flag, int bufferSize,
|
||||||
|
@ -2550,4 +2593,42 @@ public class DistributedFileSystem extends FileSystem {
|
||||||
DFSOpsCountStatistics getDFSOpsCountStatistics() {
|
DFSOpsCountStatistics getDFSOpsCountStatistics() {
|
||||||
return storageStatistics;
|
return storageStatistics;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Extends FSDataOutputStreamBuilder to support special requirements
|
||||||
|
* of DistributedFileSystem.
|
||||||
|
*/
|
||||||
|
public static class HdfsDataOutputStreamBuilder
|
||||||
|
extends FSDataOutputStreamBuilder {
|
||||||
|
private final DistributedFileSystem dfs;
|
||||||
|
private InetSocketAddress[] favoredNodes = null;
|
||||||
|
|
||||||
|
public HdfsDataOutputStreamBuilder(DistributedFileSystem dfs, Path path) {
|
||||||
|
super(dfs, path);
|
||||||
|
this.dfs = dfs;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected InetSocketAddress[] getFavoredNodes() {
|
||||||
|
return favoredNodes;
|
||||||
|
}
|
||||||
|
|
||||||
|
public HdfsDataOutputStreamBuilder setFavoredNodes(
|
||||||
|
final InetSocketAddress[] nodes) {
|
||||||
|
Preconditions.checkNotNull(nodes);
|
||||||
|
favoredNodes = nodes.clone();
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public HdfsDataOutputStream build() throws IOException {
|
||||||
|
return dfs.create(getPath(), getPermission(), getFlags(),
|
||||||
|
getBufferSize(), getReplication(), getBlockSize(),
|
||||||
|
getProgress(), getChecksumOpt(), getFavoredNodes());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public HdfsDataOutputStreamBuilder newFSDataOutputStreamBuilder(Path path) {
|
||||||
|
return new HdfsDataOutputStreamBuilder(this, path);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -75,6 +75,7 @@ import org.apache.hadoop.fs.RemoteIterator;
|
||||||
import org.apache.hadoop.fs.StorageStatistics.LongStatistic;
|
import org.apache.hadoop.fs.StorageStatistics.LongStatistic;
|
||||||
import org.apache.hadoop.fs.StorageType;
|
import org.apache.hadoop.fs.StorageType;
|
||||||
import org.apache.hadoop.fs.VolumeId;
|
import org.apache.hadoop.fs.VolumeId;
|
||||||
|
import org.apache.hadoop.fs.contract.ContractTestUtils;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
|
import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
|
||||||
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||||
|
@ -88,7 +89,6 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
|
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
|
||||||
import org.apache.hadoop.hdfs.web.HftpFileSystem;
|
import org.apache.hadoop.hdfs.web.HftpFileSystem;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager.Op;
|
|
||||||
import org.apache.hadoop.hdfs.web.WebHdfsConstants;
|
import org.apache.hadoop.hdfs.web.WebHdfsConstants;
|
||||||
import org.apache.hadoop.ipc.ProtobufRpcEngine;
|
import org.apache.hadoop.ipc.ProtobufRpcEngine;
|
||||||
import org.apache.hadoop.net.DNSToSwitchMapping;
|
import org.apache.hadoop.net.DNSToSwitchMapping;
|
||||||
|
@ -1519,4 +1519,37 @@ public class TestDistributedFileSystem {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDFSDataOutputStreamBuilder() throws Exception {
|
||||||
|
Configuration conf = getTestConfiguration();
|
||||||
|
MiniDFSCluster cluster = null;
|
||||||
|
String testFile = "/testDFSDataOutputStreamBuilder";
|
||||||
|
Path testFilePath = new Path(testFile);
|
||||||
|
try {
|
||||||
|
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
|
||||||
|
DistributedFileSystem fs = cluster.getFileSystem();
|
||||||
|
|
||||||
|
// Test create an empty file
|
||||||
|
FSDataOutputStream out =
|
||||||
|
fs.newFSDataOutputStreamBuilder(testFilePath).build();
|
||||||
|
out.close();
|
||||||
|
|
||||||
|
// Test create a file with content, and verify the content
|
||||||
|
String content = "This is a test!";
|
||||||
|
out = fs.newFSDataOutputStreamBuilder(testFilePath)
|
||||||
|
.setBufferSize(4096).setReplication((short) 1)
|
||||||
|
.setBlockSize(4096).build();
|
||||||
|
byte[] contentOrigin = content.getBytes("UTF8");
|
||||||
|
out.write(contentOrigin);
|
||||||
|
out.close();
|
||||||
|
|
||||||
|
ContractTestUtils.verifyFileContents(fs, testFilePath,
|
||||||
|
content.getBytes());
|
||||||
|
} finally {
|
||||||
|
if (cluster != null) {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -189,6 +189,29 @@ public class TestFavoredNodesEndToEnd {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 180000)
|
||||||
|
public void testCreateStreamBuilderFavoredNodesEndToEnd() throws Exception {
|
||||||
|
//create 10 files with random preferred nodes
|
||||||
|
for (int i = 0; i < NUM_FILES; i++) {
|
||||||
|
Random rand = new Random(System.currentTimeMillis() + i);
|
||||||
|
//pass a new created rand so as to get a uniform distribution each time
|
||||||
|
//without too much collisions (look at the do-while loop in getDatanodes)
|
||||||
|
InetSocketAddress[] dns = getDatanodes(rand);
|
||||||
|
Path p = new Path("/filename"+i);
|
||||||
|
FSDataOutputStream out =
|
||||||
|
dfs.newFSDataOutputStreamBuilder(p).setFavoredNodes(dns).build();
|
||||||
|
out.write(SOME_BYTES);
|
||||||
|
out.close();
|
||||||
|
BlockLocation[] locations = getBlockLocations(p);
|
||||||
|
//verify the files got created in the right nodes
|
||||||
|
for (BlockLocation loc : locations) {
|
||||||
|
String[] hosts = loc.getNames();
|
||||||
|
String[] hosts1 = getStringForInetSocketAddrs(dns);
|
||||||
|
assertTrue(compareNodes(hosts, hosts1));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private BlockLocation[] getBlockLocations(Path p) throws Exception {
|
private BlockLocation[] getBlockLocations(Path p) throws Exception {
|
||||||
DFSTestUtil.waitReplication(dfs, p, (short)3);
|
DFSTestUtil.waitReplication(dfs, p, (short)3);
|
||||||
BlockLocation[] locations = dfs.getClient().getBlockLocations(
|
BlockLocation[] locations = dfs.getClient().getBlockLocations(
|
||||||
|
|
Loading…
Reference in New Issue