HDFS-11643. Add shouldReplicate option to create builder. Contributed by SammiChen.

This commit is contained in:
Andrew Wang 2017-05-04 11:39:14 -07:00
parent 81092b1f11
commit c2a52ef9c2
10 changed files with 170 additions and 32 deletions

View File

@ -110,7 +110,13 @@ public enum CreateFlag {
* 'local' means the same host as the client is being run on. * 'local' means the same host as the client is being run on.
*/ */
@InterfaceAudience.LimitedPrivate({"HBase"}) @InterfaceAudience.LimitedPrivate({"HBase"})
NO_LOCAL_WRITE((short) 0x40); NO_LOCAL_WRITE((short) 0x40),
/**
* Enforce the file to be a replicated file, no matter what its parent
* directory's replication or erasure coding policy is.
*/
SHOULD_REPLICATE((short) 0x80);
private final short mode; private final short mode;

View File

@ -100,6 +100,7 @@ import org.apache.hadoop.crypto.key.KeyProviderDelegationTokenExtension;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import org.apache.commons.lang.StringUtils;
import javax.annotation.Nonnull; import javax.annotation.Nonnull;
@ -462,17 +463,20 @@ public class DistributedFileSystem extends FileSystem {
/** /**
* Same as * Same as
* {@link #create(Path, FsPermission, EnumSet<CreateFlag>, int, short, long, * {@link #create(Path, FsPermission, EnumSet<CreateFlag>, int, short, long,
* Progressable, ChecksumOpt)} with the addition of favoredNodes that is a * Progressable, ChecksumOpt)} with a few additions. First, addition of
* hint to where the namenode should place the file blocks. * favoredNodes that is a hint to where the namenode should place the file
* The favored nodes hint is not persisted in HDFS. Hence it may be honored * blocks. The favored nodes hint is not persisted in HDFS. Hence it may be
* at the creation time only. And with favored nodes, blocks will be pinned * honored at the creation time only. And with favored nodes, blocks will be
* on the datanodes to prevent balancing move the block. HDFS could move the * pinned on the datanodes to prevent balancing move the block. HDFS could
* blocks during replication, to move the blocks from favored nodes. A value * move the blocks during replication, to move the blocks from favored nodes.
* of null means no favored nodes for this create. * A value of null means no favored nodes for this create.
* Another addition is ecPolicyName. A non-null ecPolicyName specifies an * The second addition is ecPolicyName. A non-null ecPolicyName specifies an
* explicit erasure coding policy for this file, overriding the inherited * explicit erasure coding policy for this file, overriding the inherited
* policy. A null ecPolicyName means the file will inherit its EC policy from * policy. A null ecPolicyName means the file will inherit its EC policy or
* an ancestor (the default). * replication policy from its ancestor (the default).
* ecPolicyName and SHOULD_REPLICATE CreateFlag are mutually exclusive. It's
* invalid to set both SHOULD_REPLICATE and a non-null ecPolicyName.
*
*/ */
private HdfsDataOutputStream create(final Path f, private HdfsDataOutputStream create(final Path f,
final FsPermission permission, final EnumSet<CreateFlag> flag, final FsPermission permission, final EnumSet<CreateFlag> flag,
@ -2669,6 +2673,7 @@ public class DistributedFileSystem extends FileSystem {
private final DistributedFileSystem dfs; private final DistributedFileSystem dfs;
private InetSocketAddress[] favoredNodes = null; private InetSocketAddress[] favoredNodes = null;
private String ecPolicyName = null; private String ecPolicyName = null;
private boolean shouldReplicate = false;
public HdfsDataOutputStreamBuilder(DistributedFileSystem dfs, Path path) { public HdfsDataOutputStreamBuilder(DistributedFileSystem dfs, Path path) {
super(dfs, path); super(dfs, path);
@ -2690,6 +2695,14 @@ public class DistributedFileSystem extends FileSystem {
return ecPolicyName; return ecPolicyName;
} }
/**
* Enforce the file to be a striped file with erasure coding policy
* 'policyName', no matter what its parent directory's replication
* or erasure coding policy is. Don't call this function and
* enforceReplicate() in the same builder since they have conflict
* of interest.
*
*/
public HdfsDataOutputStreamBuilder setEcPolicyName( public HdfsDataOutputStreamBuilder setEcPolicyName(
@Nonnull final String policyName) { @Nonnull final String policyName) {
Preconditions.checkNotNull(policyName); Preconditions.checkNotNull(policyName);
@ -2697,9 +2710,33 @@ public class DistributedFileSystem extends FileSystem {
return this; return this;
} }
public boolean shouldReplicate() {
return shouldReplicate;
}
/**
* Enforce the file to be a replicated file, no matter what its parent
* directory's replication or erasure coding policy is. Don't call this
* function and setEcPolicyName() in the same builder since they have
* conflict of interest.
*/
public HdfsDataOutputStreamBuilder replicate() {
shouldReplicate = true;
return this;
}
@Override @Override
public HdfsDataOutputStream build() throws IOException { public HdfsDataOutputStream build() throws IOException {
return dfs.create(getPath(), getPermission(), getFlags(), Preconditions.checkState(
!(shouldReplicate() && (!StringUtils.isEmpty(getEcPolicyName()))),
"shouldReplicate and ecPolicyName are " +
"exclusive parameters. Set both is not allowed!");
EnumSet<CreateFlag> createFlags = getFlags();
if (shouldReplicate()) {
createFlags.add(CreateFlag.SHOULD_REPLICATE);
}
return dfs.create(getPath(), getPermission(), createFlags,
getBufferSize(), getReplication(), getBlockSize(), getBufferSize(), getReplication(), getBlockSize(),
getProgress(), getChecksumOpt(), getFavoredNodes(), getProgress(), getChecksumOpt(), getFavoredNodes(),
getEcPolicyName()); getEcPolicyName());

View File

@ -154,8 +154,10 @@ public interface ClientProtocol {
* @param src path of the file being created. * @param src path of the file being created.
* @param masked masked permission. * @param masked masked permission.
* @param clientName name of the current client. * @param clientName name of the current client.
* @param flag indicates whether the file should be * @param flag indicates whether the file should be overwritten if it already
* overwritten if it already exists or create if it does not exist or append. * exists or create if it does not exist or append, or whether the
* file should be a replicate file, no matter what its ancestor's
* replication or erasure coding policy is.
* @param createParent create missing parent directory if true * @param createParent create missing parent directory if true
* @param replication block replication factor. * @param replication block replication factor.
* @param blockSize maximum block size. * @param blockSize maximum block size.
@ -163,7 +165,9 @@ public interface ClientProtocol {
* @param ecPolicyName the name of erasure coding policy. A null value means * @param ecPolicyName the name of erasure coding policy. A null value means
* this file will inherit its parent directory's policy, * this file will inherit its parent directory's policy,
* either traditional replication or erasure coding * either traditional replication or erasure coding
* policy. * policy. ecPolicyName and SHOULD_REPLICATE CreateFlag
* are mutually exclusive. It's invalid to set both
* SHOULD_REPLICATE flag and a non-null ecPolicyName.
* *
* @return the status of the created file, it could be null if the server * @return the status of the created file, it could be null if the server
* doesn't support returning the file status * doesn't support returning the file status

View File

@ -1753,6 +1753,9 @@ public class PBHelperClient {
if (flag.contains(CreateFlag.NEW_BLOCK)) { if (flag.contains(CreateFlag.NEW_BLOCK)) {
value |= CreateFlagProto.NEW_BLOCK.getNumber(); value |= CreateFlagProto.NEW_BLOCK.getNumber();
} }
if (flag.contains(CreateFlag.SHOULD_REPLICATE)) {
value |= CreateFlagProto.SHOULD_REPLICATE.getNumber();
}
return value; return value;
} }
@ -1966,6 +1969,10 @@ public class PBHelperClient {
== CreateFlagProto.NEW_BLOCK_VALUE) { == CreateFlagProto.NEW_BLOCK_VALUE) {
result.add(CreateFlag.NEW_BLOCK); result.add(CreateFlag.NEW_BLOCK);
} }
if ((flag & CreateFlagProto.SHOULD_REPLICATE.getNumber())
== CreateFlagProto.SHOULD_REPLICATE.getNumber()) {
result.add(CreateFlag.SHOULD_REPLICATE);
}
return new EnumSetWritable<>(result, CreateFlag.class); return new EnumSetWritable<>(result, CreateFlag.class);
} }

View File

@ -68,6 +68,7 @@ enum CreateFlagProto {
APPEND = 0x04; // Append to a file APPEND = 0x04; // Append to a file
LAZY_PERSIST = 0x10; // File with reduced durability guarantees. LAZY_PERSIST = 0x10; // File with reduced durability guarantees.
NEW_BLOCK = 0x20; // Write data to a new block when appending NEW_BLOCK = 0x20; // Write data to a new block when appending
SHOULD_REPLICATE = 0x80; // Enforce to create a replicate file
} }
message CreateRequestProto { message CreateRequestProto {

View File

@ -25,15 +25,18 @@ import java.net.URI;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.EnumSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import com.google.common.base.Preconditions;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FsServerDefaults; import org.apache.hadoop.fs.FsServerDefaults;
@ -241,7 +244,15 @@ public class NameNodeConnector implements Closeable {
IOUtils.closeStream(fs.append(idPath)); IOUtils.closeStream(fs.append(idPath));
fs.delete(idPath, true); fs.delete(idPath, true);
} }
final FSDataOutputStream fsout = fs.create(idPath, false);
final FSDataOutputStream fsout = fs.newFSDataOutputStreamBuilder(idPath)
.replicate()
.setFlags(EnumSet.of(CreateFlag.CREATE))
.build();
Preconditions.checkState(!fs.getFileStatus(idPath).isErasureCoded(),
"Id File should be a replicate file");
// mark balancer idPath to be deleted during filesystem closure // mark balancer idPath to be deleted during filesystem closure
fs.deleteOnExit(idPath); fs.deleteOnExit(idPath);
if (write2IdFile) { if (write2IdFile) {

View File

@ -352,7 +352,7 @@ class FSDirWriteFileOp {
EnumSet<CreateFlag> flag, boolean createParent, EnumSet<CreateFlag> flag, boolean createParent,
short replication, long blockSize, short replication, long blockSize,
FileEncryptionInfo feInfo, INode.BlocksMapUpdateInfo toRemoveBlocks, FileEncryptionInfo feInfo, INode.BlocksMapUpdateInfo toRemoveBlocks,
String ecPolicyName, boolean logRetryEntry) boolean shouldReplicate, String ecPolicyName, boolean logRetryEntry)
throws IOException { throws IOException {
assert fsn.hasWriteLock(); assert fsn.hasWriteLock();
boolean overwrite = flag.contains(CreateFlag.OVERWRITE); boolean overwrite = flag.contains(CreateFlag.OVERWRITE);
@ -386,7 +386,8 @@ class FSDirWriteFileOp {
FSDirMkdirOp.createAncestorDirectories(fsd, iip, permissions); FSDirMkdirOp.createAncestorDirectories(fsd, iip, permissions);
if (parent != null) { if (parent != null) {
iip = addFile(fsd, parent, iip.getLastLocalName(), permissions, iip = addFile(fsd, parent, iip.getLastLocalName(), permissions,
replication, blockSize, holder, clientMachine, ecPolicyName); replication, blockSize, holder, clientMachine, shouldReplicate,
ecPolicyName);
newNode = iip != null ? iip.getLastINode().asFile() : null; newNode = iip != null ? iip.getLastINode().asFile() : null;
} }
if (newNode == null) { if (newNode == null) {
@ -522,8 +523,8 @@ class FSDirWriteFileOp {
private static INodesInPath addFile( private static INodesInPath addFile(
FSDirectory fsd, INodesInPath existing, byte[] localName, FSDirectory fsd, INodesInPath existing, byte[] localName,
PermissionStatus permissions, short replication, long preferredBlockSize, PermissionStatus permissions, short replication, long preferredBlockSize,
String clientName, String clientMachine, String ecPolicyName) String clientName, String clientMachine, boolean shouldReplicate,
throws IOException { String ecPolicyName) throws IOException {
Preconditions.checkNotNull(existing); Preconditions.checkNotNull(existing);
long modTime = now(); long modTime = now();
@ -531,16 +532,18 @@ class FSDirWriteFileOp {
fsd.writeLock(); fsd.writeLock();
try { try {
boolean isStriped = false; boolean isStriped = false;
ErasureCodingPolicy ecPolicy; ErasureCodingPolicy ecPolicy = null;
if (!StringUtils.isEmpty(ecPolicyName)) { if (!shouldReplicate) {
ecPolicy = FSDirErasureCodingOp.getErasureCodingPolicyByName( if (!StringUtils.isEmpty(ecPolicyName)) {
fsd.getFSNamesystem(), ecPolicyName); ecPolicy = FSDirErasureCodingOp.getErasureCodingPolicyByName(
} else { fsd.getFSNamesystem(), ecPolicyName);
ecPolicy = FSDirErasureCodingOp.unprotectedGetErasureCodingPolicy( } else {
fsd.getFSNamesystem(), existing); ecPolicy = FSDirErasureCodingOp.unprotectedGetErasureCodingPolicy(
} fsd.getFSNamesystem(), existing);
if (ecPolicy != null) { }
isStriped = true; if (ecPolicy != null) {
isStriped = true;
}
} }
final BlockType blockType = isStriped ? final BlockType blockType = isStriped ?
BlockType.STRIPED : BlockType.CONTIGUOUS; BlockType.STRIPED : BlockType.CONTIGUOUS;

View File

@ -2225,6 +2225,13 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
throw new InvalidPathException(src); throw new InvalidPathException(src);
} }
boolean shouldReplicate = flag.contains(CreateFlag.SHOULD_REPLICATE);
if (shouldReplicate &&
(!org.apache.commons.lang.StringUtils.isEmpty(ecPolicyName))) {
throw new HadoopIllegalArgumentException("SHOULD_REPLICATE flag and " +
"ecPolicyName are exclusive parameters. Set both is not allowed!");
}
FSPermissionChecker pc = getPermissionChecker(); FSPermissionChecker pc = getPermissionChecker();
INodesInPath iip = null; INodesInPath iip = null;
boolean skipSync = true; // until we do something that might create edits boolean skipSync = true; // until we do something that might create edits
@ -2240,7 +2247,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
iip = FSDirWriteFileOp.resolvePathForStartFile( iip = FSDirWriteFileOp.resolvePathForStartFile(
dir, pc, src, flag, createParent); dir, pc, src, flag, createParent);
if (!FSDirErasureCodingOp.hasErasureCodingPolicy(this, iip)) { if (shouldReplicate ||
(org.apache.commons.lang.StringUtils.isEmpty(ecPolicyName) &&
!FSDirErasureCodingOp.hasErasureCodingPolicy(this, iip))) {
blockManager.verifyReplication(src, replication, clientMachine); blockManager.verifyReplication(src, replication, clientMachine);
} }
@ -2272,7 +2281,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
try { try {
stat = FSDirWriteFileOp.startFile(this, iip, permissions, holder, stat = FSDirWriteFileOp.startFile(this, iip, permissions, holder,
clientMachine, flag, createParent, replication, blockSize, feInfo, clientMachine, flag, createParent, replication, blockSize, feInfo,
toRemoveBlocks, ecPolicyName, logRetryCache); toRemoveBlocks, shouldReplicate, ecPolicyName, logRetryCache);
} catch (IOException e) { } catch (IOException e) {
skipSync = e instanceof StandbyException; skipSync = e instanceof StandbyException;
throw e; throw e;

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
@ -46,6 +47,7 @@ import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
import java.util.Collection; import java.util.Collection;
import java.util.EnumSet;
import java.util.List; import java.util.List;
import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains; import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains;
@ -564,4 +566,60 @@ public class TestErasureCodingPolicies {
assertEquals(ecPolicyOnDir, fs.getErasureCodingPolicy(dirPath)); assertEquals(ecPolicyOnDir, fs.getErasureCodingPolicy(dirPath));
fs.delete(dirPath, true); fs.delete(dirPath, true);
} }
/**
* Enforce file as replicated file without regarding its parent's EC policy.
*/
@Test
public void testEnforceAsReplicatedFile() throws Exception {
final Path dirPath = new Path("/striped");
final Path filePath = new Path(dirPath, "file");
fs.mkdirs(dirPath);
fs.setErasureCodingPolicy(dirPath, EC_POLICY.getName());
final String ecPolicyName = "RS-10-4-64k";
fs.newFSDataOutputStreamBuilder(filePath).build().close();
assertEquals(EC_POLICY, fs.getErasureCodingPolicy(filePath));
fs.delete(filePath, true);
fs.newFSDataOutputStreamBuilder(filePath)
.setEcPolicyName(ecPolicyName)
.build()
.close();
assertEquals(ecPolicyName, fs.getErasureCodingPolicy(filePath).getName());
fs.delete(filePath, true);
try {
fs.newFSDataOutputStreamBuilder(filePath)
.setEcPolicyName(ecPolicyName)
.replicate()
.build().close();
Assert.fail("shouldReplicate and ecPolicyName are exclusive " +
"parameters. Set both is not allowed.");
}catch (Exception e){
GenericTestUtils.assertExceptionContains("shouldReplicate and " +
"ecPolicyName are exclusive parameters. Set both is not allowed!", e);
}
try {
final DFSClient dfsClient = fs.getClient();
dfsClient.create(filePath.toString(), null,
EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE,
CreateFlag.SHOULD_REPLICATE), false, (short) 1, 1024, null, 1024,
null, null, ecPolicyName);
Assert.fail("SHOULD_REPLICATE flag and ecPolicyName are exclusive " +
"parameters. Set both is not allowed.");
}catch (Exception e){
GenericTestUtils.assertExceptionContains("SHOULD_REPLICATE flag and " +
"ecPolicyName are exclusive parameters. Set both is not allowed!", e);
}
fs.newFSDataOutputStreamBuilder(filePath)
.replicate()
.build()
.close();
assertNull(fs.getErasureCodingPolicy(filePath));
fs.delete(dirPath, true);
}
} }

View File

@ -1946,7 +1946,9 @@ public class TestBalancer {
public void testBalancerWithStripedFile() throws Exception { public void testBalancerWithStripedFile() throws Exception {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
initConfWithStripe(conf); initConfWithStripe(conf);
NameNodeConnector.setWrite2IdFile(true);
doTestBalancerWithStripedFile(conf); doTestBalancerWithStripedFile(conf);
NameNodeConnector.setWrite2IdFile(false);
} }
private void doTestBalancerWithStripedFile(Configuration conf) throws Exception { private void doTestBalancerWithStripedFile(Configuration conf) throws Exception {