HDFS-13739. Add option to disable rack local write preference. Contributed by Ayush Saxena.

This commit is contained in:
Ayush Saxena 2020-02-18 22:39:28 +05:30
parent d6d7f8d8c5
commit ac4b556e2d
7 changed files with 74 additions and 4 deletions

View File

@ -123,7 +123,13 @@ public enum CreateFlag {
* locality. The first block replica should be placed randomly within the * locality. The first block replica should be placed randomly within the
* cluster. Subsequent block replicas should follow DataNode locality rules. * cluster. Subsequent block replicas should follow DataNode locality rules.
*/ */
IGNORE_CLIENT_LOCALITY((short) 0x100); IGNORE_CLIENT_LOCALITY((short) 0x100),
/**
* Advise that a block replica NOT be written to the local rack DataNode where
* 'local' means the same rack as the client is being run on.
*/
NO_LOCAL_RACK((short) 0x120);
private final short mode; private final short mode;

View File

@ -51,7 +51,15 @@ public enum AddBlockFlag {
* *
* @see CreateFlag#IGNORE_CLIENT_LOCALITY * @see CreateFlag#IGNORE_CLIENT_LOCALITY
*/ */
IGNORE_CLIENT_LOCALITY((short) 0x02); IGNORE_CLIENT_LOCALITY((short) 0x02),
/**
* Advise that a block replica NOT be written to the local rack DataNode where
* 'local' means the same host as the client is being run on.
*
* @see CreateFlag#NO_LOCAL_WRITE
*/
NO_LOCAL_RACK((short) 0x03);
private final short mode; private final short mode;

View File

@ -201,6 +201,9 @@ public class DFSOutputStream extends FSOutputSummer
if (flag.contains(CreateFlag.NO_LOCAL_WRITE)) { if (flag.contains(CreateFlag.NO_LOCAL_WRITE)) {
this.addBlockFlags.add(AddBlockFlag.NO_LOCAL_WRITE); this.addBlockFlags.add(AddBlockFlag.NO_LOCAL_WRITE);
} }
if (flag.contains(CreateFlag.NO_LOCAL_RACK)) {
this.addBlockFlags.add(AddBlockFlag.NO_LOCAL_RACK);
}
if (flag.contains(CreateFlag.IGNORE_CLIENT_LOCALITY)) { if (flag.contains(CreateFlag.IGNORE_CLIENT_LOCALITY)) {
this.addBlockFlags.add(AddBlockFlag.IGNORE_CLIENT_LOCALITY); this.addBlockFlags.add(AddBlockFlag.IGNORE_CLIENT_LOCALITY);
} }

View File

@ -3384,6 +3384,16 @@ public class DistributedFileSystem extends FileSystem
return this; return this;
} }
/**
* Advise that a block replica NOT be written to the local rack DataNode.
*
* @see CreateFlag for the details.
*/
public HdfsDataOutputStreamBuilder noLocalRack() {
getFlags().add(CreateFlag.NO_LOCAL_RACK);
return this;
}
@VisibleForTesting @VisibleForTesting
String getStoragePolicyName() { String getStoragePolicyName() {
return storagePolicyName; return storagePolicyName;

View File

@ -284,14 +284,35 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
Node localNode = null; Node localNode = null;
boolean avoidStaleNodes = (stats != null boolean avoidStaleNodes = (stats != null
&& stats.isAvoidingStaleDataNodesForWrite()); && stats.isAvoidingStaleDataNodesForWrite());
boolean avoidLocalRack = (addBlockFlags != null
&& addBlockFlags.contains(AddBlockFlag.NO_LOCAL_RACK) && writer != null
&& clusterMap.getNumOfRacks() > 2);
boolean avoidLocalNode = (addBlockFlags != null boolean avoidLocalNode = (addBlockFlags != null
&& addBlockFlags.contains(AddBlockFlag.NO_LOCAL_WRITE) && addBlockFlags.contains(AddBlockFlag.NO_LOCAL_WRITE)
&& writer != null && writer != null
&& !excludedNodes.contains(writer)); && !excludedNodes.contains(writer));
// Attempt to exclude local rack if the client suggests so. If no enough
// nodes can be obtained or number of racks are less than three, it falls
// back to the default block placement
// policy.
if (avoidLocalRack) {
results = new ArrayList<>(chosenStorage);
Set<Node> excludedNodeCopy = new HashSet<>(excludedNodes);
excludedNodeCopy
.addAll(clusterMap.getLeaves(writer.getNetworkLocation()));
localNode = chooseTarget(numOfReplicas, writer, excludedNodeCopy,
blocksize, maxNodesPerRack, results, avoidStaleNodes, storagePolicy,
EnumSet.noneOf(StorageType.class), results.isEmpty(), sTypes);
if (results.size() < numOfReplicas) {
// not enough nodes; discard results and fall back
results = null;
}
}
// Attempt to exclude local node if the client suggests so. If no enough // Attempt to exclude local node if the client suggests so. If no enough
// nodes can be obtained, it falls back to the default block placement // nodes can be obtained, it falls back to the default block placement
// policy. // policy.
if (avoidLocalNode) { if (avoidLocalNode && results == null) {
results = new ArrayList<>(chosenStorage); results = new ArrayList<>(chosenStorage);
Set<Node> excludedNodeCopy = new HashSet<>(excludedNodes); Set<Node> excludedNodeCopy = new HashSet<>(excludedNodes);
if (writer != null) { if (writer != null) {

View File

@ -1709,7 +1709,7 @@ public class TestDistributedFileSystem {
HdfsDataOutputStreamBuilder builder = fs.createFile(testFilePath); HdfsDataOutputStreamBuilder builder = fs.createFile(testFilePath);
builder.append().overwrite(false).newBlock().lazyPersist().noLocalWrite() builder.append().overwrite(false).newBlock().lazyPersist().noLocalWrite()
.ecPolicyName("ec-policy"); .ecPolicyName("ec-policy").noLocalRack();
EnumSet<CreateFlag> flags = builder.getFlags(); EnumSet<CreateFlag> flags = builder.getFlags();
assertTrue(flags.contains(CreateFlag.APPEND)); assertTrue(flags.contains(CreateFlag.APPEND));
assertTrue(flags.contains(CreateFlag.CREATE)); assertTrue(flags.contains(CreateFlag.CREATE));
@ -1717,6 +1717,7 @@ public class TestDistributedFileSystem {
assertTrue(flags.contains(CreateFlag.NO_LOCAL_WRITE)); assertTrue(flags.contains(CreateFlag.NO_LOCAL_WRITE));
assertFalse(flags.contains(CreateFlag.OVERWRITE)); assertFalse(flags.contains(CreateFlag.OVERWRITE));
assertFalse(flags.contains(CreateFlag.SYNC_BLOCK)); assertFalse(flags.contains(CreateFlag.SYNC_BLOCK));
assertTrue(flags.contains(CreateFlag.NO_LOCAL_RACK));
assertEquals("ec-policy", builder.getEcPolicyName()); assertEquals("ec-policy", builder.getEcPolicyName());
assertFalse(builder.shouldReplicate()); assertFalse(builder.shouldReplicate());

View File

@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus; import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.AddBlockFlag;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
@ -98,6 +99,26 @@ public class TestDefaultBlockPlacementPolicy {
testPlacement(clientMachine, "/RACK3", true); testPlacement(clientMachine, "/RACK3", true);
} }
@Test
public void testNonLocalRackPlacement() throws Exception {
String clientMachine = "/host0";
EnumSet<CreateFlag> flags = EnumSet.of(CreateFlag.CREATE);
flags.add(CreateFlag.NO_LOCAL_RACK);
HdfsFileStatus fileStatus = namesystem.startFile("/file", perm,
clientMachine, clientMachine, flags, true, REPLICATION_FACTOR,
DEFAULT_BLOCK_SIZE, null, null, null, false);
LocatedBlock locatedBlock = nameNodeRpc.addBlock("/file", clientMachine,
null, null, fileStatus.getFileId(), null,
EnumSet.of(AddBlockFlag.NO_LOCAL_RACK));
assertTrue(locatedBlock.getLocations()[0].getNetworkLocation() != "/RACK0");
assertNotEquals("/RACK0",
locatedBlock.getLocations()[0].getNetworkLocation());
assertNotEquals("/RACK0",
locatedBlock.getLocations()[1].getNetworkLocation());
assertNotEquals("/RACK0",
locatedBlock.getLocations()[2].getNetworkLocation());
}
/** /**
* Verify local node selection with using DFSNetworkTopology. * Verify local node selection with using DFSNetworkTopology.
*/ */