HDFS-14283. DFSInputStream to prefer cached replica. Contributed by Lisheng Sun.
This commit is contained in:
parent
adecdb8b53
commit
7fddf4855e
|
@ -1054,10 +1054,21 @@ public class DFSInputStream extends FSInputStream
|
|||
StorageType[] storageTypes = block.getStorageTypes();
|
||||
DatanodeInfo chosenNode = null;
|
||||
StorageType storageType = null;
|
||||
if (nodes != null) {
|
||||
if (dfsClient.getConf().isReadUseCachePriority()) {
|
||||
DatanodeInfo[] cachedLocs = block.getCachedLocations();
|
||||
if (cachedLocs != null) {
|
||||
for (int i = 0; i < cachedLocs.length; i++) {
|
||||
if (isValidNode(cachedLocs[i], ignoredNodes)) {
|
||||
chosenNode = cachedLocs[i];
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (chosenNode == null && nodes != null) {
|
||||
for (int i = 0; i < nodes.length; i++) {
|
||||
if (!dfsClient.getDeadNodes(this).containsKey(nodes[i])
|
||||
&& (ignoredNodes == null || !ignoredNodes.contains(nodes[i]))) {
|
||||
if (isValidNode(nodes[i], ignoredNodes)) {
|
||||
chosenNode = nodes[i];
|
||||
// Storage types are ordered to correspond with nodes, so use the same
|
||||
// index to get storage type.
|
||||
|
@ -1090,6 +1101,15 @@ public class DFSInputStream extends FSInputStream
|
|||
", ignoredNodes = " + ignoredNodes);
|
||||
}
|
||||
|
||||
private boolean isValidNode(DatanodeInfo node,
|
||||
Collection<DatanodeInfo> ignoredNodes) {
|
||||
if (!dfsClient.getDeadNodes(this).containsKey(node)
|
||||
&& (ignoredNodes == null || !ignoredNodes.contains(node))) {
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private static String getBestNodeDNAddrPairErrorString(
|
||||
DatanodeInfo nodes[], AbstractMap<DatanodeInfo,
|
||||
DatanodeInfo> deadNodes, Collection<DatanodeInfo> ignoredNodes) {
|
||||
|
|
|
@ -108,6 +108,9 @@ public interface HdfsClientConfigKeys {
|
|||
String DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL =
|
||||
"dfs.client.use.legacy.blockreader.local";
|
||||
boolean DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL_DEFAULT = false;
|
||||
String DFS_CLIENT_READ_USE_CACHE_PRIORITY =
|
||||
"dfs.client.read.use.cache.priority";
|
||||
boolean DFS_CLIENT_READ_USE_CACHE_PRIORITY_DEFAULT = false;
|
||||
String DFS_CLIENT_DATANODE_RESTART_TIMEOUT_KEY =
|
||||
"dfs.client.datanode-restart.timeout";
|
||||
long DFS_CLIENT_DATANODE_RESTART_TIMEOUT_DEFAULT = 30;
|
||||
|
|
|
@ -60,6 +60,8 @@ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_KEY_
|
|||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_MS;
|
||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY;
|
||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_READ_USE_CACHE_PRIORITY;
|
||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_READ_USE_CACHE_PRIORITY_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_KEY;
|
||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT;
|
||||
|
@ -150,6 +152,8 @@ public class DfsClientConf {
|
|||
|
||||
private final boolean dataTransferTcpNoDelay;
|
||||
|
||||
private final boolean readUseCachePriority;
|
||||
|
||||
private final boolean deadNodeDetectionEnabled;
|
||||
private final long leaseHardLimitPeriod;
|
||||
|
||||
|
@ -260,6 +264,8 @@ public class DfsClientConf {
|
|||
slowIoWarningThresholdMs = conf.getLong(
|
||||
DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_KEY,
|
||||
DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_DEFAULT);
|
||||
readUseCachePriority = conf.getBoolean(DFS_CLIENT_READ_USE_CACHE_PRIORITY,
|
||||
DFS_CLIENT_READ_USE_CACHE_PRIORITY_DEFAULT);
|
||||
|
||||
refreshReadBlockLocationsMS = conf.getLong(
|
||||
HdfsClientConfigKeys.DFS_CLIENT_REFRESH_READ_BLOCK_LOCATIONS_MS_KEY,
|
||||
|
@ -630,6 +636,13 @@ public class DfsClientConf {
|
|||
return leaseHardLimitPeriod;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the readUseCachePriority
|
||||
*/
|
||||
public boolean isReadUseCachePriority() {
|
||||
return readUseCachePriority;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the replicaAccessorBuilderClasses
|
||||
*/
|
||||
|
|
|
@ -268,6 +268,7 @@ public class LocatedBlock {
|
|||
+ "; corrupt=" + corrupt
|
||||
+ "; offset=" + offset
|
||||
+ "; locs=" + Arrays.asList(locs)
|
||||
+ "; cachedLocs=" + Arrays.asList(cachedLocs)
|
||||
+ "}";
|
||||
}
|
||||
|
||||
|
|
|
@ -2978,6 +2978,15 @@
|
|||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.client.read.use.cache.priority</name>
|
||||
<value>false</value>
|
||||
<description>
|
||||
If true, the cached replica of the datanode is preferred
|
||||
else the replica closest to client is preferred.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.block.local-path-access.user</name>
|
||||
<value></value>
|
||||
|
|
|
@ -17,11 +17,14 @@
|
|||
*/
|
||||
package org.apache.hadoop.hdfs;
|
||||
|
||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_READ_USE_CACHE_PRIORITY;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.hamcrest.CoreMatchers.equalTo;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
|
@ -33,7 +36,9 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
|
||||
import org.apache.hadoop.net.unix.DomainSocket;
|
||||
import org.apache.hadoop.net.unix.TemporarySocketDirectory;
|
||||
|
@ -219,4 +224,62 @@ public class TestDFSInputStream {
|
|||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReadWithPreferredCachingReplica() throws IOException {
|
||||
Configuration conf = new Configuration();
|
||||
conf.setBoolean(DFS_CLIENT_READ_USE_CACHE_PRIORITY, true);
|
||||
MiniDFSCluster cluster =
|
||||
new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
|
||||
cluster.waitActive();
|
||||
DistributedFileSystem fs = null;
|
||||
Path filePath = new Path("/testReadPreferredCachingReplica");
|
||||
try {
|
||||
fs = cluster.getFileSystem();
|
||||
FSDataOutputStream out = fs.create(filePath, true, 4096, (short) 3, 512);
|
||||
DFSInputStream dfsInputStream =
|
||||
(DFSInputStream) fs.open(filePath).getWrappedStream();
|
||||
LocatedBlock lb = mock(LocatedBlock.class);
|
||||
when(lb.getCachedLocations()).thenReturn(new DatanodeInfo[0]);
|
||||
DatanodeID nodeId = new DatanodeID("localhost", "localhost", "dn0", 1111,
|
||||
1112, 1113, 1114);
|
||||
DatanodeInfo dnInfo = new DatanodeDescriptor(nodeId);
|
||||
when(lb.getCachedLocations()).thenReturn(new DatanodeInfo[] {dnInfo});
|
||||
DatanodeInfo retDNInfo =
|
||||
dfsInputStream.getBestNodeDNAddrPair(lb, null).info;
|
||||
assertEquals(dnInfo, retDNInfo);
|
||||
} finally {
|
||||
fs.delete(filePath, true);
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReadWithoutPreferredCachingReplica() throws IOException {
|
||||
Configuration conf = new Configuration();
|
||||
conf.setBoolean(DFS_CLIENT_READ_USE_CACHE_PRIORITY, false);
|
||||
MiniDFSCluster cluster =
|
||||
new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
|
||||
cluster.waitActive();
|
||||
DistributedFileSystem fs = null;
|
||||
Path filePath = new Path("/testReadWithoutPreferredCachingReplica");
|
||||
try {
|
||||
fs = cluster.getFileSystem();
|
||||
FSDataOutputStream out = fs.create(filePath, true, 4096, (short) 3, 512);
|
||||
DFSInputStream dfsInputStream =
|
||||
(DFSInputStream) fs.open(filePath).getWrappedStream();
|
||||
LocatedBlock lb = mock(LocatedBlock.class);
|
||||
when(lb.getCachedLocations()).thenReturn(new DatanodeInfo[0]);
|
||||
DatanodeID nodeId = new DatanodeID("localhost", "localhost", "dn0", 1111,
|
||||
1112, 1113, 1114);
|
||||
DatanodeInfo dnInfo = new DatanodeDescriptor(nodeId);
|
||||
when(lb.getLocations()).thenReturn(new DatanodeInfo[] {dnInfo});
|
||||
DatanodeInfo retDNInfo =
|
||||
dfsInputStream.getBestNodeDNAddrPair(lb, null).info;
|
||||
assertEquals(dnInfo, retDNInfo);
|
||||
} finally {
|
||||
fs.delete(filePath, true);
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue