HDDS-1586. Allow Ozone RPC client to read with topology awareness. Contributed by Sammi Chen.

This commit is contained in:
Xiaoyu Yao 2019-07-09 14:43:55 -07:00
parent 96d0555913
commit 030307226a
No known key found for this signature in database
GPG Key ID: 2A33E32176F50EF3
39 changed files with 803 additions and 89 deletions

View File

@ -23,6 +23,7 @@ import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.HddsUtils; import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.XceiverClientProtocolServiceGrpc; import org.apache.hadoop.hdds.protocol.datanode.proto.XceiverClientProtocolServiceGrpc;
@ -64,7 +65,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
/** /**
* A Client for the storageContainer protocol. * A Client for the storageContainer protocol for read object data.
*/ */
public class XceiverClientGrpc extends XceiverClientSpi { public class XceiverClientGrpc extends XceiverClientSpi {
static final Logger LOG = LoggerFactory.getLogger(XceiverClientGrpc.class); static final Logger LOG = LoggerFactory.getLogger(XceiverClientGrpc.class);
@ -76,6 +77,7 @@ public class XceiverClientGrpc extends XceiverClientSpi {
private final Semaphore semaphore; private final Semaphore semaphore;
private boolean closed = false; private boolean closed = false;
private SecurityConfig secConfig; private SecurityConfig secConfig;
private final boolean topologyAwareRead;
/** /**
* Constructs a client that can communicate with the Container framework on * Constructs a client that can communicate with the Container framework on
@ -96,6 +98,9 @@ public class XceiverClientGrpc extends XceiverClientSpi {
this.metrics = XceiverClientManager.getXceiverClientMetrics(); this.metrics = XceiverClientManager.getXceiverClientMetrics();
this.channels = new HashMap<>(); this.channels = new HashMap<>();
this.asyncStubs = new HashMap<>(); this.asyncStubs = new HashMap<>();
this.topologyAwareRead = Boolean.parseBoolean(config.get(
ScmConfigKeys.DFS_NETWORK_TOPOLOGY_AWARE_READ_ENABLED,
ScmConfigKeys.DFS_NETWORK_TOPOLOGY_AWARE_READ_ENABLED_DEFAULT));
} }
/** /**
@ -103,9 +108,10 @@ public class XceiverClientGrpc extends XceiverClientSpi {
*/ */
@Override @Override
public void connect() throws Exception { public void connect() throws Exception {
// leader by default is the 1st datanode in the datanode list of pipleline // connect to the closest node, if closest node doesn't exist, delegate to
DatanodeDetails dn = this.pipeline.getFirstNode(); // first node, which is usually the leader in the pipeline.
// just make a connection to the 1st datanode at the beginning DatanodeDetails dn = this.pipeline.getClosestNode();
// just make a connection to the picked datanode at the beginning
connectToDatanode(dn, null); connectToDatanode(dn, null);
} }
@ -114,9 +120,11 @@ public class XceiverClientGrpc extends XceiverClientSpi {
*/ */
@Override @Override
public void connect(String encodedToken) throws Exception { public void connect(String encodedToken) throws Exception {
// leader by default is the 1st datanode in the datanode list of pipleline // connect to the closest node, if closest node doesn't exist, delegate to
DatanodeDetails dn = this.pipeline.getFirstNode(); // first node, which is usually the leader in the pipeline.
// just make a connection to the 1st datanode at the beginning DatanodeDetails dn;
dn = this.pipeline.getClosestNode();
// just make a connection to the picked datanode at the beginning
connectToDatanode(dn, encodedToken); connectToDatanode(dn, encodedToken);
} }
@ -132,7 +140,8 @@ public class XceiverClientGrpc extends XceiverClientSpi {
// Add credential context to the client call // Add credential context to the client call
String userName = UserGroupInformation.getCurrentUser().getShortUserName(); String userName = UserGroupInformation.getCurrentUser().getShortUserName();
LOG.debug("Connecting to server Port : " + dn.getIpAddress()); LOG.debug("Nodes in pipeline : {}", pipeline.getNodes().toString());
LOG.debug("Connecting to server : {}", dn.getIpAddress());
NettyChannelBuilder channelBuilder = NettyChannelBuilder channelBuilder =
NettyChannelBuilder.forAddress(dn.getIpAddress(), port).usePlaintext() NettyChannelBuilder.forAddress(dn.getIpAddress(), port).usePlaintext()
.maxInboundMessageSize(OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE) .maxInboundMessageSize(OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE)
@ -252,7 +261,15 @@ public class XceiverClientGrpc extends XceiverClientSpi {
// TODO: cache the correct leader info in here, so that any subsequent calls // TODO: cache the correct leader info in here, so that any subsequent calls
// should first go to leader // should first go to leader
XceiverClientReply reply = new XceiverClientReply(null); XceiverClientReply reply = new XceiverClientReply(null);
for (DatanodeDetails dn : pipeline.getNodes()) { List<DatanodeDetails> datanodeList;
if ((request.getCmdType() == ContainerProtos.Type.ReadChunk ||
request.getCmdType() == ContainerProtos.Type.GetSmallFile) &&
topologyAwareRead) {
datanodeList = pipeline.getNodesInOrder();
} else {
datanodeList = pipeline.getNodes();
}
for (DatanodeDetails dn : datanodeList) {
try { try {
LOG.debug("Executing command " + request + " on datanode " + dn); LOG.debug("Executing command " + request + " on datanode " + dn);
// In case the command gets retried on a 2nd datanode, // In case the command gets retried on a 2nd datanode,
@ -349,6 +366,8 @@ public class XceiverClientGrpc extends XceiverClientSpi {
reconnect(dn, token); reconnect(dn, token);
} }
LOG.debug("Send command {} to datanode {}", request.getCmdType().toString(),
dn.getNetworkFullPath());
final CompletableFuture<ContainerCommandResponseProto> replyFuture = final CompletableFuture<ContainerCommandResponseProto> replyFuture =
new CompletableFuture<>(); new CompletableFuture<>();
semaphore.acquire(); semaphore.acquire();

View File

@ -29,6 +29,8 @@ import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.ozone.OzoneSecurityUtil; import org.apache.hadoop.ozone.OzoneSecurityUtil;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
@ -57,7 +59,8 @@ import static org.apache.hadoop.hdds.scm.ScmConfigKeys
* not being used for a period of time. * not being used for a period of time.
*/ */
public class XceiverClientManager implements Closeable { public class XceiverClientManager implements Closeable {
private static final Logger LOG =
LoggerFactory.getLogger(XceiverClientManager.class);
//TODO : change this to SCM configuration class //TODO : change this to SCM configuration class
private final Configuration conf; private final Configuration conf;
private final Cache<String, XceiverClientSpi> clientCache; private final Cache<String, XceiverClientSpi> clientCache;
@ -65,6 +68,7 @@ public class XceiverClientManager implements Closeable {
private static XceiverClientMetrics metrics; private static XceiverClientMetrics metrics;
private boolean isSecurityEnabled; private boolean isSecurityEnabled;
private final boolean topologyAwareRead;
/** /**
* Creates a new XceiverClientManager. * Creates a new XceiverClientManager.
* *
@ -98,6 +102,9 @@ public class XceiverClientManager implements Closeable {
} }
} }
}).build(); }).build();
topologyAwareRead = Boolean.parseBoolean(conf.get(
ScmConfigKeys.DFS_NETWORK_TOPOLOGY_AWARE_READ_ENABLED,
ScmConfigKeys.DFS_NETWORK_TOPOLOGY_AWARE_READ_ENABLED_DEFAULT));
} }
@VisibleForTesting @VisibleForTesting
@ -118,12 +125,32 @@ public class XceiverClientManager implements Closeable {
*/ */
public XceiverClientSpi acquireClient(Pipeline pipeline) public XceiverClientSpi acquireClient(Pipeline pipeline)
throws IOException { throws IOException {
return acquireClient(pipeline, false);
}
/**
* Acquires a XceiverClientSpi connected to a container for read.
*
* If there is already a cached XceiverClientSpi, simply return
* the cached otherwise create a new one.
*
* @param pipeline the container pipeline for the client connection
* @return XceiverClientSpi connected to a container
* @throws IOException if a XceiverClientSpi cannot be acquired
*/
public XceiverClientSpi acquireClientForReadData(Pipeline pipeline)
throws IOException {
return acquireClient(pipeline, true);
}
private XceiverClientSpi acquireClient(Pipeline pipeline, boolean read)
throws IOException {
Preconditions.checkNotNull(pipeline); Preconditions.checkNotNull(pipeline);
Preconditions.checkArgument(pipeline.getNodes() != null); Preconditions.checkArgument(pipeline.getNodes() != null);
Preconditions.checkArgument(!pipeline.getNodes().isEmpty()); Preconditions.checkArgument(!pipeline.getNodes().isEmpty());
synchronized (clientCache) { synchronized (clientCache) {
XceiverClientSpi info = getClient(pipeline); XceiverClientSpi info = getClient(pipeline, read);
info.incrementReference(); info.incrementReference();
return info; return info;
} }
@ -136,12 +163,28 @@ public class XceiverClientManager implements Closeable {
* @param invalidateClient if true, invalidates the client in cache * @param invalidateClient if true, invalidates the client in cache
*/ */
public void releaseClient(XceiverClientSpi client, boolean invalidateClient) { public void releaseClient(XceiverClientSpi client, boolean invalidateClient) {
releaseClient(client, invalidateClient, false);
}
/**
* Releases a read XceiverClientSpi after use.
*
* @param client client to release
* @param invalidateClient if true, invalidates the client in cache
*/
public void releaseClientForReadData(XceiverClientSpi client,
boolean invalidateClient) {
releaseClient(client, invalidateClient, true);
}
private void releaseClient(XceiverClientSpi client, boolean invalidateClient,
boolean read) {
Preconditions.checkNotNull(client); Preconditions.checkNotNull(client);
synchronized (clientCache) { synchronized (clientCache) {
client.decrementReference(); client.decrementReference();
if (invalidateClient) { if (invalidateClient) {
Pipeline pipeline = client.getPipeline(); Pipeline pipeline = client.getPipeline();
String key = pipeline.getId().getId().toString() + pipeline.getType(); String key = getPipelineCacheKey(pipeline, read);
XceiverClientSpi cachedClient = clientCache.getIfPresent(key); XceiverClientSpi cachedClient = clientCache.getIfPresent(key);
if (cachedClient == client) { if (cachedClient == client) {
clientCache.invalidate(key); clientCache.invalidate(key);
@ -150,11 +193,13 @@ public class XceiverClientManager implements Closeable {
} }
} }
private XceiverClientSpi getClient(Pipeline pipeline) private XceiverClientSpi getClient(Pipeline pipeline, boolean forRead)
throws IOException { throws IOException {
HddsProtos.ReplicationType type = pipeline.getType(); HddsProtos.ReplicationType type = pipeline.getType();
try { try {
String key = pipeline.getId().getId().toString() + type; // create different client for read different pipeline node based on
// network topology
String key = getPipelineCacheKey(pipeline, forRead);
// Append user short name to key to prevent a different user // Append user short name to key to prevent a different user
// from using same instance of xceiverClient. // from using same instance of xceiverClient.
key = isSecurityEnabled ? key = isSecurityEnabled ?
@ -184,6 +229,19 @@ public class XceiverClientManager implements Closeable {
} }
} }
private String getPipelineCacheKey(Pipeline pipeline, boolean forRead) {
String key = pipeline.getId().getId().toString() + pipeline.getType();
if (topologyAwareRead && forRead) {
try {
key += pipeline.getClosestNode().getHostName();
} catch (IOException e) {
LOG.error("Failed to get closest node to create pipeline cache key:" +
e.getMessage());
}
}
return key;
}
/** /**
* Close and remove all the cached clients. * Close and remove all the cached clients.
*/ */

View File

@ -151,7 +151,7 @@ public class BlockInputStream extends InputStream implements Seekable {
pipeline = Pipeline.newBuilder(pipeline) pipeline = Pipeline.newBuilder(pipeline)
.setType(HddsProtos.ReplicationType.STAND_ALONE).build(); .setType(HddsProtos.ReplicationType.STAND_ALONE).build();
} }
xceiverClient = xceiverClientManager.acquireClient(pipeline); xceiverClient = xceiverClientManager.acquireClientForReadData(pipeline);
boolean success = false; boolean success = false;
List<ChunkInfo> chunks; List<ChunkInfo> chunks;
try { try {
@ -170,7 +170,7 @@ public class BlockInputStream extends InputStream implements Seekable {
success = true; success = true;
} finally { } finally {
if (!success) { if (!success) {
xceiverClientManager.releaseClient(xceiverClient, false); xceiverClientManager.releaseClientForReadData(xceiverClient, false);
} }
} }

View File

@ -76,6 +76,7 @@ public class DatanodeDetails extends NodeImpl implements
this.ipAddress = datanodeDetails.ipAddress; this.ipAddress = datanodeDetails.ipAddress;
this.hostName = datanodeDetails.hostName; this.hostName = datanodeDetails.hostName;
this.ports = datanodeDetails.ports; this.ports = datanodeDetails.ports;
this.setNetworkName(datanodeDetails.getNetworkName());
} }
/** /**
@ -192,6 +193,12 @@ public class DatanodeDetails extends NodeImpl implements
builder.addPort(newPort( builder.addPort(newPort(
Port.Name.valueOf(port.getName().toUpperCase()), port.getValue())); Port.Name.valueOf(port.getName().toUpperCase()), port.getValue()));
} }
if (datanodeDetailsProto.hasNetworkLocation()) {
builder.setNetworkLocation(datanodeDetailsProto.getNetworkLocation());
}
if (datanodeDetailsProto.hasNetworkName()) {
builder.setNetworkName(datanodeDetailsProto.getNetworkName());
}
return builder.build(); return builder.build();
} }
@ -213,6 +220,7 @@ public class DatanodeDetails extends NodeImpl implements
builder.setCertSerialId(certSerialId); builder.setCertSerialId(certSerialId);
} }
builder.setNetworkLocation(getNetworkLocation()); builder.setNetworkLocation(getNetworkLocation());
builder.setNetworkName(getNetworkName());
for (Port port : ports) { for (Port port : ports) {
builder.addPorts(HddsProtos.Port.newBuilder() builder.addPorts(HddsProtos.Port.newBuilder()
@ -268,6 +276,7 @@ public class DatanodeDetails extends NodeImpl implements
private String id; private String id;
private String ipAddress; private String ipAddress;
private String hostName; private String hostName;
private String networkName;
private String networkLocation; private String networkLocation;
private List<Port> ports; private List<Port> ports;
private String certSerialId; private String certSerialId;
@ -313,6 +322,17 @@ public class DatanodeDetails extends NodeImpl implements
return this; return this;
} }
/**
* Sets the network name of DataNode.
*
* @param name network name
* @return DatanodeDetails.Builder
*/
public Builder setNetworkName(String name) {
this.networkName = name;
return this;
}
/** /**
* Sets the network location of DataNode. * Sets the network location of DataNode.
* *
@ -358,8 +378,12 @@ public class DatanodeDetails extends NodeImpl implements
if (networkLocation == null) { if (networkLocation == null) {
networkLocation = NetConstants.DEFAULT_RACK; networkLocation = NetConstants.DEFAULT_RACK;
} }
return new DatanodeDetails(id, ipAddress, hostName, networkLocation, DatanodeDetails dn = new DatanodeDetails(id, ipAddress, hostName,
ports, certSerialId); networkLocation, ports, certSerialId);
if (networkName != null) {
dn.setNetworkName(networkName);
}
return dn;
} }
} }

View File

@ -368,6 +368,10 @@ public final class ScmConfigKeys {
"ozone.scm.network.topology.schema.file"; "ozone.scm.network.topology.schema.file";
public static final String OZONE_SCM_NETWORK_TOPOLOGY_SCHEMA_FILE_DEFAULT = public static final String OZONE_SCM_NETWORK_TOPOLOGY_SCHEMA_FILE_DEFAULT =
"network-topology-default.xml"; "network-topology-default.xml";
public static final String DFS_NETWORK_TOPOLOGY_AWARE_READ_ENABLED =
"dfs.network.topology.aware.read.enable";
public static final String DFS_NETWORK_TOPOLOGY_AWARE_READ_ENABLED_DEFAULT =
"true";
public static final String HDDS_TRACING_ENABLED = "hdds.tracing.enabled"; public static final String HDDS_TRACING_ENABLED = "hdds.tracing.enabled";
public static final boolean HDDS_TRACING_ENABLED_DEFAULT = true; public static final boolean HDDS_TRACING_ENABLED_DEFAULT = true;

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hdds.scm.net; package org.apache.hadoop.hdds.scm.net;
import java.util.Collection; import java.util.Collection;
import java.util.List;
/** /**
* The interface defines a network topology. * The interface defines a network topology.
@ -246,5 +247,6 @@ public interface NetworkTopology {
* @param nodes Available replicas with the requested data * @param nodes Available replicas with the requested data
* @param activeLen Number of active nodes at the front of the array * @param activeLen Number of active nodes at the front of the array
*/ */
void sortByDistanceCost(Node reader, Node[] nodes, int activeLen); List<? extends Node> sortByDistanceCost(Node reader,
List<? extends Node> nodes, int activeLen);
} }

View File

@ -23,6 +23,7 @@ import com.google.common.collect.Lists;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
@ -545,10 +546,6 @@ public class NetworkTopologyImpl implements NetworkTopology{
ancestorGen); ancestorGen);
return null; return null;
} }
LOG.debug("Choosing random from \"{}\" available nodes on node \"{}\"," +
" scope=\"{}\", excludedScope=\"{}\", excludeNodes=\"{}\".",
availableNodes, scopeNode, scopeNode.getNetworkFullPath(),
excludedScope, excludedNodes);
// scope is a Leaf node // scope is a Leaf node
if (!(scopeNode instanceof InnerNode)) { if (!(scopeNode instanceof InnerNode)) {
@ -556,15 +553,21 @@ public class NetworkTopologyImpl implements NetworkTopology{
} }
Node ret; Node ret;
int nodeIndex;
if (leafIndex >= 0) { if (leafIndex >= 0) {
ret = ((InnerNode)scopeNode).getLeaf(leafIndex % availableNodes, nodeIndex = leafIndex % availableNodes;
excludedScope, mutableExNodes, ancestorGen); ret = ((InnerNode)scopeNode).getLeaf(nodeIndex, excludedScope,
mutableExNodes, ancestorGen);
} else { } else {
final int index = ThreadLocalRandom.current().nextInt(availableNodes); nodeIndex = ThreadLocalRandom.current().nextInt(availableNodes);
ret = ((InnerNode)scopeNode).getLeaf(index, excludedScope, mutableExNodes, ret = ((InnerNode)scopeNode).getLeaf(nodeIndex, excludedScope,
ancestorGen); mutableExNodes, ancestorGen);
} }
LOG.debug("chooseRandom return {}", ret); LOG.debug("Choosing node[index={},random={}] from \"{}\" available nodes" +
" scope=\"{}\", excludedScope=\"{}\", excludeNodes=\"{}\".",
nodeIndex, (leafIndex == -1 ? "true" : "false"), availableNodes,
scopeNode.getNetworkFullPath(), excludedScope, excludedNodes);
LOG.debug("Chosen node = {}", (ret == null ? "not found" : ret.toString()));
return ret; return ret;
} }
@ -583,13 +586,16 @@ public class NetworkTopologyImpl implements NetworkTopology{
(node1 == null && node2 == null)) { (node1 == null && node2 == null)) {
return 0; return 0;
} }
if (node1 == null || node2 == null) {
LOG.warn("One of the nodes is a null pointer");
return Integer.MAX_VALUE;
}
int cost = 0; int cost = 0;
netlock.readLock().lock(); netlock.readLock().lock();
try { try {
if (node1 == null || node2 == null || if ((node1.getAncestor(maxLevel - 1) != clusterTree) ||
(node1.getAncestor(maxLevel - 1) != clusterTree) ||
(node2.getAncestor(maxLevel - 1) != clusterTree)) { (node2.getAncestor(maxLevel - 1) != clusterTree)) {
LOG.warn("One of the nodes is a null pointer"); LOG.warn("One of the nodes is outside of network topology");
return Integer.MAX_VALUE; return Integer.MAX_VALUE;
} }
int level1 = node1.getLevel(); int level1 = node1.getLevel();
@ -630,17 +636,21 @@ public class NetworkTopologyImpl implements NetworkTopology{
* @param nodes Available replicas with the requested data * @param nodes Available replicas with the requested data
* @param activeLen Number of active nodes at the front of the array * @param activeLen Number of active nodes at the front of the array
*/ */
public void sortByDistanceCost(Node reader, Node[] nodes, int activeLen) { public List<? extends Node> sortByDistanceCost(Node reader,
List<? extends Node> nodes, int activeLen) {
/** Sort weights for the nodes array */ /** Sort weights for the nodes array */
if (reader == null) {
return nodes;
}
int[] costs = new int[activeLen]; int[] costs = new int[activeLen];
for (int i = 0; i < activeLen; i++) { for (int i = 0; i < activeLen; i++) {
costs[i] = getDistanceCost(reader, nodes[i]); costs[i] = getDistanceCost(reader, nodes.get(i));
} }
// Add cost/node pairs to a TreeMap to sort // Add cost/node pairs to a TreeMap to sort
TreeMap<Integer, List<Node>> tree = new TreeMap<Integer, List<Node>>(); TreeMap<Integer, List<Node>> tree = new TreeMap<Integer, List<Node>>();
for (int i = 0; i < activeLen; i++) { for (int i = 0; i < activeLen; i++) {
int cost = costs[i]; int cost = costs[i];
Node node = nodes[i]; Node node = nodes.get(i);
List<Node> list = tree.get(cost); List<Node> list = tree.get(cost);
if (list == null) { if (list == null) {
list = Lists.newArrayListWithExpectedSize(1); list = Lists.newArrayListWithExpectedSize(1);
@ -648,17 +658,20 @@ public class NetworkTopologyImpl implements NetworkTopology{
} }
list.add(node); list.add(node);
} }
int idx = 0;
List<Node> ret = new ArrayList<>();
for (List<Node> list: tree.values()) { for (List<Node> list: tree.values()) {
if (list != null) { if (list != null) {
Collections.shuffle(list); Collections.shuffle(list);
for (Node n: list) { for (Node n: list) {
nodes[idx] = n; ret.add(n);
idx++;
} }
} }
} }
Preconditions.checkState(idx == activeLen, "Wrong number of nodes sorted!");
Preconditions.checkState(ret.size() == activeLen,
"Wrong number of nodes sorted!");
return ret;
} }
/** /**

View File

@ -25,9 +25,12 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -38,12 +41,16 @@ import java.util.stream.Collectors;
*/ */
public final class Pipeline { public final class Pipeline {
private static final Logger LOG = LoggerFactory
.getLogger(Pipeline.class);
private final PipelineID id; private final PipelineID id;
private final ReplicationType type; private final ReplicationType type;
private final ReplicationFactor factor; private final ReplicationFactor factor;
private PipelineState state; private PipelineState state;
private Map<DatanodeDetails, Long> nodeStatus; private Map<DatanodeDetails, Long> nodeStatus;
// nodes with ordered distance to client
private ThreadLocal<List<DatanodeDetails>> nodesInOrder = new ThreadLocal<>();
/** /**
* The immutable properties of pipeline object is used in * The immutable properties of pipeline object is used in
@ -112,6 +119,14 @@ public final class Pipeline {
return nodeStatus.keySet().iterator().next(); return nodeStatus.keySet().iterator().next();
} }
public DatanodeDetails getClosestNode() throws IOException {
if (nodesInOrder.get() == null || nodesInOrder.get().isEmpty()) {
LOG.debug("Nodes in order is empty, delegate to getFirstNode");
return getFirstNode();
}
return nodesInOrder.get().get(0);
}
public boolean isClosed() { public boolean isClosed() {
return state == PipelineState.CLOSED; return state == PipelineState.CLOSED;
} }
@ -120,6 +135,18 @@ public final class Pipeline {
return state == PipelineState.OPEN; return state == PipelineState.OPEN;
} }
public void setNodesInOrder(List<DatanodeDetails> nodes) {
nodesInOrder.set(nodes);
}
public List<DatanodeDetails> getNodesInOrder() {
if (nodesInOrder.get() == null || nodesInOrder.get().isEmpty()) {
LOG.debug("Nodes in order is empty, delegate to getNodes");
return getNodes();
}
return nodesInOrder.get();
}
void reportDatanode(DatanodeDetails dn) throws IOException { void reportDatanode(DatanodeDetails dn) throws IOException {
if (nodeStatus.get(dn) == null) { if (nodeStatus.get(dn) == null) {
throw new IOException( throw new IOException(
@ -152,6 +179,22 @@ public final class Pipeline {
.addAllMembers(nodeStatus.keySet().stream() .addAllMembers(nodeStatus.keySet().stream()
.map(DatanodeDetails::getProtoBufMessage) .map(DatanodeDetails::getProtoBufMessage)
.collect(Collectors.toList())); .collect(Collectors.toList()));
// To save the message size on wire, only transfer the node order based on
// network topology
List<DatanodeDetails> nodes = nodesInOrder.get();
if (nodes != null && !nodes.isEmpty()) {
for (int i = 0; i < nodes.size(); i++) {
Iterator<DatanodeDetails> it = nodeStatus.keySet().iterator();
for (int j = 0; j < nodeStatus.keySet().size(); j++) {
if (it.next().equals(nodes.get(i))) {
builder.addMemberOrders(j);
break;
}
}
}
LOG.info("Serialize pipeline {} with nodesInOrder{ }", id.toString(),
nodes);
}
return builder.build(); return builder.build();
} }
@ -164,10 +207,10 @@ public final class Pipeline {
.setState(PipelineState.fromProtobuf(pipeline.getState())) .setState(PipelineState.fromProtobuf(pipeline.getState()))
.setNodes(pipeline.getMembersList().stream() .setNodes(pipeline.getMembersList().stream()
.map(DatanodeDetails::getFromProtoBuf).collect(Collectors.toList())) .map(DatanodeDetails::getFromProtoBuf).collect(Collectors.toList()))
.setNodesInOrder(pipeline.getMemberOrdersList())
.build(); .build();
} }
@Override @Override
public boolean equals(Object o) { public boolean equals(Object o) {
if (this == o) { if (this == o) {
@ -228,6 +271,8 @@ public final class Pipeline {
private ReplicationFactor factor = null; private ReplicationFactor factor = null;
private PipelineState state = null; private PipelineState state = null;
private Map<DatanodeDetails, Long> nodeStatus = null; private Map<DatanodeDetails, Long> nodeStatus = null;
private List<Integer> nodeOrder = null;
private List<DatanodeDetails> nodesInOrder = null;
public Builder() {} public Builder() {}
@ -237,6 +282,7 @@ public final class Pipeline {
this.factor = pipeline.factor; this.factor = pipeline.factor;
this.state = pipeline.state; this.state = pipeline.state;
this.nodeStatus = pipeline.nodeStatus; this.nodeStatus = pipeline.nodeStatus;
this.nodesInOrder = pipeline.nodesInOrder.get();
} }
public Builder setId(PipelineID id1) { public Builder setId(PipelineID id1) {
@ -265,13 +311,42 @@ public final class Pipeline {
return this; return this;
} }
public Builder setNodesInOrder(List<Integer> orders) {
this.nodeOrder = orders;
return this;
}
public Pipeline build() { public Pipeline build() {
Preconditions.checkNotNull(id); Preconditions.checkNotNull(id);
Preconditions.checkNotNull(type); Preconditions.checkNotNull(type);
Preconditions.checkNotNull(factor); Preconditions.checkNotNull(factor);
Preconditions.checkNotNull(state); Preconditions.checkNotNull(state);
Preconditions.checkNotNull(nodeStatus); Preconditions.checkNotNull(nodeStatus);
return new Pipeline(id, type, factor, state, nodeStatus); Pipeline pipeline = new Pipeline(id, type, factor, state, nodeStatus);
if (nodeOrder != null && !nodeOrder.isEmpty()) {
// This branch is for build from ProtoBuf
List<DatanodeDetails> nodesWithOrder = new ArrayList<>();
for(int i = 0; i < nodeOrder.size(); i++) {
int nodeIndex = nodeOrder.get(i);
Iterator<DatanodeDetails> it = nodeStatus.keySet().iterator();
while(it.hasNext() && nodeIndex >= 0) {
DatanodeDetails node = it.next();
if (nodeIndex == 0) {
nodesWithOrder.add(node);
break;
}
nodeIndex--;
}
}
LOG.info("Deserialize nodesInOrder {} in pipeline {}", nodesWithOrder,
id.toString());
pipeline.setNodesInOrder(nodesWithOrder);
} else if (nodesInOrder != null){
// This branch is for pipeline clone
pipeline.setNodesInOrder(nodesInOrder);
}
return pipeline;
} }
} }

View File

@ -17,6 +17,7 @@
*/ */
package org.apache.hadoop.hdds.scm.protocol; package org.apache.hadoop.hdds.scm.protocol;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
import org.apache.hadoop.security.KerberosInfo; import org.apache.hadoop.security.KerberosInfo;
@ -74,4 +75,12 @@ public interface ScmBlockLocationProtocol extends Closeable {
* Gets the Clusterid and SCM Id from SCM. * Gets the Clusterid and SCM Id from SCM.
*/ */
ScmInfo getScmInfo() throws IOException; ScmInfo getScmInfo() throws IOException;
/**
* Sort datanodes with distance to client.
* @param nodes list of network name of each node.
* @param clientMachine client address, depends, can be hostname or ipaddress.
*/
List<DatanodeDetails> sortDatanodes(List<String> nodes,
String clientMachine) throws IOException;
} }

View File

@ -24,6 +24,7 @@ import java.util.stream.Collectors;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdds.client.ContainerBlockID; import org.apache.hadoop.hdds.client.ContainerBlockID;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.SCMBlockLocationRequest; import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.SCMBlockLocationRequest;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.SCMBlockLocationResponse; import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.SCMBlockLocationResponse;
@ -34,6 +35,10 @@ import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.Allo
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.DeleteScmKeyBlocksRequestProto; import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.DeleteScmKeyBlocksRequestProto;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.DeleteScmKeyBlocksResponseProto; import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.DeleteScmKeyBlocksResponseProto;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.KeyBlocks; import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.KeyBlocks;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos
.SortDatanodesRequestProto;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos
.SortDatanodesResponseProto;
import org.apache.hadoop.hdds.scm.ScmInfo; import org.apache.hadoop.hdds.scm.ScmInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock; import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
@ -227,6 +232,35 @@ public final class ScmBlockLocationProtocolClientSideTranslatorPB
return builder.build(); return builder.build();
} }
/**
* Sort the datanodes based on distance from client.
* @return List<DatanodeDetails></>
* @throws IOException
*/
@Override
public List<DatanodeDetails> sortDatanodes(List<String> nodes,
String clientMachine) throws IOException {
SortDatanodesRequestProto request = SortDatanodesRequestProto
.newBuilder()
.addAllNodeNetworkName(nodes)
.setClient(clientMachine)
.build();
SCMBlockLocationRequest wrapper = createSCMBlockRequest(
Type.SortDatanodes)
.setSortDatanodesRequest(request)
.build();
final SCMBlockLocationResponse wrappedResponse =
handleError(submitRequest(wrapper));
SortDatanodesResponseProto resp =
wrappedResponse.getSortDatanodesResponse();
List<DatanodeDetails> results = new ArrayList<>(resp.getNodeCount());
results.addAll(resp.getNodeList().stream()
.map(node -> DatanodeDetails.getFromProtoBuf(node))
.collect(Collectors.toList()));
return results;
}
@Override @Override
public Object getUnderlyingProxyObject() { public Object getUnderlyingProxyObject() {
return rpcProxy; return rpcProxy;

View File

@ -230,7 +230,7 @@ public final class ContainerProtocolCalls {
ReadChunkRequestProto.newBuilder() ReadChunkRequestProto.newBuilder()
.setBlockID(blockID.getDatanodeBlockIDProtobuf()) .setBlockID(blockID.getDatanodeBlockIDProtobuf())
.setChunkData(chunk); .setChunkData(chunk);
String id = xceiverClient.getPipeline().getFirstNode().getUuidString(); String id = xceiverClient.getPipeline().getClosestNode().getUuidString();
ContainerCommandRequestProto.Builder builder = ContainerCommandRequestProto.Builder builder =
ContainerCommandRequestProto.newBuilder().setCmdType(Type.ReadChunk) ContainerCommandRequestProto.newBuilder().setCmdType(Type.ReadChunk)
.setContainerID(blockID.getContainerID()) .setContainerID(blockID.getContainerID())
@ -494,7 +494,7 @@ public final class ContainerProtocolCalls {
GetSmallFileRequestProto GetSmallFileRequestProto
.newBuilder().setBlock(getBlock) .newBuilder().setBlock(getBlock)
.build(); .build();
String id = client.getPipeline().getFirstNode().getUuidString(); String id = client.getPipeline().getClosestNode().getUuidString();
ContainerCommandRequestProto.Builder builder = ContainerCommandRequestProto ContainerCommandRequestProto.Builder builder = ContainerCommandRequestProto
.newBuilder() .newBuilder()

View File

@ -35,7 +35,8 @@ public enum SCMAction implements AuditAction {
CLOSE_PIPELINE, CLOSE_PIPELINE,
DELETE_CONTAINER, DELETE_CONTAINER,
IN_SAFE_MODE, IN_SAFE_MODE,
FORCE_EXIT_SAFE_MODE; FORCE_EXIT_SAFE_MODE,
SORT_DATANODE;
@Override @Override
public String getAction() { public String getAction() {

View File

@ -23,6 +23,7 @@ import io.opentracing.Scope;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos; import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos
.AllocateBlockResponse; .AllocateBlockResponse;
import org.apache.hadoop.hdds.scm.ScmInfo; import org.apache.hadoop.hdds.scm.ScmInfo;
@ -50,6 +51,10 @@ import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos
.SCMBlockLocationRequest; .SCMBlockLocationRequest;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos
.Status; .Status;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos
.SortDatanodesRequestProto;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos
.SortDatanodesResponseProto;
import org.apache.hadoop.hdds.tracing.TracingUtil; import org.apache.hadoop.hdds.tracing.TracingUtil;
import org.apache.hadoop.ozone.common.BlockGroup; import org.apache.hadoop.ozone.common.BlockGroup;
import org.apache.hadoop.ozone.common.DeleteBlockGroupResult; import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
@ -114,6 +119,10 @@ public final class ScmBlockLocationProtocolServerSideTranslatorPB
response.setGetScmInfoResponse( response.setGetScmInfoResponse(
getScmInfo(request.getGetScmInfoRequest())); getScmInfo(request.getGetScmInfoRequest()));
break; break;
case SortDatanodes:
response.setSortDatanodesResponse(
sortDatanodes(request.getSortDatanodesRequest()));
break;
default: default:
// Should never happen // Should never happen
throw new IOException("Unknown Operation "+request.getCmdType()+ throw new IOException("Unknown Operation "+request.getCmdType()+
@ -193,4 +202,21 @@ public final class ScmBlockLocationProtocolServerSideTranslatorPB
.setScmId(scmInfo.getScmId()) .setScmId(scmInfo.getScmId())
.build(); .build();
} }
public SortDatanodesResponseProto sortDatanodes(
SortDatanodesRequestProto request) throws ServiceException {
SortDatanodesResponseProto.Builder resp =
SortDatanodesResponseProto.newBuilder();
try {
List<String> nodeList = request.getNodeNetworkNameList();
final List<DatanodeDetails> results =
impl.sortDatanodes(nodeList, request.getClient());
if (results != null && results.size() > 0) {
results.stream().forEach(dn -> resp.addNode(dn.getProtoBufMessage()));
}
return resp.build();
} catch (IOException ex) {
throw new ServiceException(ex);
}
}
} }

View File

@ -37,6 +37,7 @@ enum Type {
AllocateScmBlock = 11; AllocateScmBlock = 11;
DeleteScmKeyBlocks = 12; DeleteScmKeyBlocks = 12;
GetScmInfo = 13; GetScmInfo = 13;
SortDatanodes = 14;
} }
message SCMBlockLocationRequest { message SCMBlockLocationRequest {
@ -51,6 +52,7 @@ message SCMBlockLocationRequest {
optional AllocateScmBlockRequestProto allocateScmBlockRequest = 11; optional AllocateScmBlockRequestProto allocateScmBlockRequest = 11;
optional DeleteScmKeyBlocksRequestProto deleteScmKeyBlocksRequest = 12; optional DeleteScmKeyBlocksRequestProto deleteScmKeyBlocksRequest = 12;
optional hadoop.hdds.GetScmInfoRequestProto getScmInfoRequest = 13; optional hadoop.hdds.GetScmInfoRequestProto getScmInfoRequest = 13;
optional SortDatanodesRequestProto sortDatanodesRequest = 14;
} }
message SCMBlockLocationResponse { message SCMBlockLocationResponse {
@ -71,6 +73,7 @@ message SCMBlockLocationResponse {
optional AllocateScmBlockResponseProto allocateScmBlockResponse = 11; optional AllocateScmBlockResponseProto allocateScmBlockResponse = 11;
optional DeleteScmKeyBlocksResponseProto deleteScmKeyBlocksResponse = 12; optional DeleteScmKeyBlocksResponseProto deleteScmKeyBlocksResponse = 12;
optional hadoop.hdds.GetScmInfoResponseProto getScmInfoResponse = 13; optional hadoop.hdds.GetScmInfoResponseProto getScmInfoResponse = 13;
optional SortDatanodesResponseProto sortDatanodesResponse = 14;
} }
/** /**
@ -185,6 +188,19 @@ message AllocateScmBlockResponseProto {
repeated AllocateBlockResponse blocks = 3; repeated AllocateBlockResponse blocks = 3;
} }
/**
* Datanode sort request sent by OM to SCM, it contains
* multiple number of datanodes.
*/
message SortDatanodesRequestProto{
required string client = 1;
repeated string nodeNetworkName = 2;
}
message SortDatanodesResponseProto{
repeated DatanodeDetailsProto node = 1;
}
/** /**
* Protocol used from OzoneManager to StorageContainerManager. * Protocol used from OzoneManager to StorageContainerManager.
* See request and response messages for details of the RPC calls. * See request and response messages for details of the RPC calls.

View File

@ -34,7 +34,9 @@ message DatanodeDetailsProto {
required string hostName = 3; // hostname required string hostName = 3; // hostname
repeated Port ports = 4; repeated Port ports = 4;
optional string certSerialId = 5; // Certificate serial id. optional string certSerialId = 5; // Certificate serial id.
optional string networkLocation = 6; // Network topology location // network name, can be Ip address or host name, depends
optional string networkName = 6;
optional string networkLocation = 7; // Network topology location
} }
/** /**
@ -71,6 +73,7 @@ message Pipeline {
optional ReplicationType type = 4 [default = STAND_ALONE]; optional ReplicationType type = 4 [default = STAND_ALONE];
optional ReplicationFactor factor = 5 [default = ONE]; optional ReplicationFactor factor = 5 [default = ONE];
required PipelineID id = 6; required PipelineID id = 6;
repeated uint32 memberOrders = 7;
} }
message KeyValue { message KeyValue {

View File

@ -2345,6 +2345,14 @@
awareness document for xml and yaml topology definition samples. awareness document for xml and yaml topology definition samples.
</description> </description>
</property> </property>
<property>
<name>dfs.network.topology.aware.read.enable</name>
<value>true</value>
<tag>OZONE, PERFORMANCE</tag>
<description>
Whether to enable topology aware read to improve the read performance.
</description>
</property>
<property> <property>
<name>ozone.recon.container.db.impl</name> <name>ozone.recon.container.db.impl</name>
<value>RocksDB</value> <value>RocksDB</value>

View File

@ -784,15 +784,16 @@ public class TestNetworkTopologyImpl {
for (Node[] nodeList : nodes) { for (Node[] nodeList : nodes) {
int length = nodeList.length; int length = nodeList.length;
while (length > 0) { while (length > 0) {
cluster.sortByDistanceCost(reader, nodeList, length); List<? extends Node> ret = cluster.sortByDistanceCost(reader,
for (int i = 0; i < nodeList.length; i++) { Arrays.asList(nodeList), length);
if ((i + 1) < nodeList.length) { for (int i = 0; i < ret.size(); i++) {
int cost1 = cluster.getDistanceCost(reader, nodeList[i]); if ((i + 1) < ret.size()) {
int cost2 = cluster.getDistanceCost(reader, nodeList[i + 1]); int cost1 = cluster.getDistanceCost(reader, ret.get(i));
int cost2 = cluster.getDistanceCost(reader, ret.get(i + 1));
assertTrue("reader:" + (reader != null ? assertTrue("reader:" + (reader != null ?
reader.getNetworkFullPath() : "null") + reader.getNetworkFullPath() : "null") +
",node1:" + nodeList[i].getNetworkFullPath() + ",node1:" + ret.get(i).getNetworkFullPath() +
",node2:" + nodeList[i + 1].getNetworkFullPath() + ",node2:" + ret.get(i + 1).getNetworkFullPath() +
",cost1:" + cost1 + ",cost2:" + cost2, ",cost1:" + cost1 + ",cost2:" + cost2,
cost1 == Integer.MAX_VALUE || cost1 <= cost2); cost1 == Integer.MAX_VALUE || cost1 <= cost2);
} }
@ -803,20 +804,22 @@ public class TestNetworkTopologyImpl {
} }
// sort all nodes // sort all nodes
Node[] nodeList = dataNodes.clone(); List<Node> nodeList = Arrays.asList(dataNodes.clone());
for (Node reader : readers) { for (Node reader : readers) {
int length = nodeList.length; int length = nodeList.size();
while (length >= 0) { while (length >= 0) {
cluster.sortByDistanceCost(reader, nodeList, length); List<? extends Node> sortedNodeList =
for (int i = 0; i < nodeList.length; i++) { cluster.sortByDistanceCost(reader, nodeList, length);
if ((i + 1) < nodeList.length) { for (int i = 0; i < sortedNodeList.size(); i++) {
int cost1 = cluster.getDistanceCost(reader, nodeList[i]); if ((i + 1) < sortedNodeList.size()) {
int cost2 = cluster.getDistanceCost(reader, nodeList[i + 1]); int cost1 = cluster.getDistanceCost(reader, sortedNodeList.get(i));
int cost2 = cluster.getDistanceCost(
reader, sortedNodeList.get(i + 1));
// node can be removed when called in testConcurrentAccess // node can be removed when called in testConcurrentAccess
assertTrue("reader:" + (reader != null ? assertTrue("reader:" + (reader != null ?
reader.getNetworkFullPath() : "null") + reader.getNetworkFullPath() : "null") +
",node1:" + nodeList[i].getNetworkFullPath() + ",node1:" + sortedNodeList.get(i).getNetworkFullPath() +
",node2:" + nodeList[i + 1].getNetworkFullPath() + ",node2:" + sortedNodeList.get(i + 1).getNetworkFullPath() +
",cost1:" + cost1 + ",cost2:" + cost2, ",cost1:" + cost1 + ",cost2:" + cost2,
cost1 == Integer.MAX_VALUE || cost1 <= cost2); cost1 == Integer.MAX_VALUE || cost1 <= cost2);
} }

View File

@ -566,6 +566,8 @@ public class SCMNodeManager implements NodeManager {
node = clusterMap.getNode(location + NetConstants.PATH_SEPARATOR_STR + node = clusterMap.getNode(location + NetConstants.PATH_SEPARATOR_STR +
address); address);
} }
LOG.debug("Get node for {} return {}", address, (node == null ?
"not found" : node.getNetworkFullPath()));
return node == null ? null : (DatanodeDetails)node; return node == null ? null : (DatanodeDetails)node;
} }

View File

@ -25,6 +25,7 @@ import com.google.common.collect.Maps;
import com.google.protobuf.BlockingService; import com.google.protobuf.BlockingService;
import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos; import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos;
import org.apache.hadoop.hdds.scm.HddsServerUtil; import org.apache.hadoop.hdds.scm.HddsServerUtil;
@ -33,6 +34,8 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
import org.apache.hadoop.hdds.scm.container.common.helpers.DeleteBlockResult; import org.apache.hadoop.hdds.scm.container.common.helpers.DeleteBlockResult;
import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.net.Node;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol; import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolPB; import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolPB;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
@ -279,6 +282,35 @@ public class SCMBlockProtocolServer implements
} }
} }
@Override
public List<DatanodeDetails> sortDatanodes(List<String> nodes,
String clientMachine) throws IOException {
boolean auditSuccess = true;
try{
NodeManager nodeManager = scm.getScmNodeManager();
Node client = nodeManager.getNode(clientMachine);
List<Node> nodeList = new ArrayList();
nodes.stream().forEach(path -> nodeList.add(nodeManager.getNode(path)));
List<? extends Node> sortedNodeList = scm.getClusterMap()
.sortByDistanceCost(client, nodeList, nodes.size());
List<DatanodeDetails> ret = new ArrayList<>();
sortedNodeList.stream().forEach(node -> ret.add((DatanodeDetails)node));
return ret;
} catch (Exception ex) {
auditSuccess = false;
AUDIT.logReadFailure(
buildAuditMessageForFailure(SCMAction.SORT_DATANODE, null, ex)
);
throw ex;
} finally {
if(auditSuccess) {
AUDIT.logReadSuccess(
buildAuditMessageForSuccess(SCMAction.SORT_DATANODE, null)
);
}
}
}
@Override @Override
public AuditMessage buildAuditMessageForSuccess( public AuditMessage buildAuditMessageForSuccess(
AuditAction op, Map<String, String> auditMap) { AuditAction op, Map<String, String> auditMap) {

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.hdds.scm.server;
import org.apache.hadoop.hdds.scm.block.BlockManager; import org.apache.hadoop.hdds.scm.block.BlockManager;
import org.apache.hadoop.hdds.scm.net.NetworkTopology;
import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager; import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager;
import org.apache.hadoop.hdds.scm.container.ContainerManager; import org.apache.hadoop.hdds.scm.container.ContainerManager;
import org.apache.hadoop.hdds.scm.container.ReplicationManager; import org.apache.hadoop.hdds.scm.container.ReplicationManager;
@ -65,6 +66,7 @@ public final class SCMConfigurator {
private SCMSafeModeManager scmSafeModeManager; private SCMSafeModeManager scmSafeModeManager;
private CertificateServer certificateServer; private CertificateServer certificateServer;
private SCMMetadataStore metadataStore; private SCMMetadataStore metadataStore;
private NetworkTopology networkTopology;
/** /**
* Allows user to specify a version of Node manager to use with this SCM. * Allows user to specify a version of Node manager to use with this SCM.
@ -137,6 +139,15 @@ public final class SCMConfigurator {
this.metadataStore = scmMetadataStore; this.metadataStore = scmMetadataStore;
} }
/**
* Allows user to specify a custom version of Network Topology Cluster
* to be used with this SCM.
* @param networkTopology - network topology cluster.
*/
public void setNetworkTopology(NetworkTopology networkTopology) {
this.networkTopology = networkTopology;
}
/** /**
* Gets SCM Node Manager. * Gets SCM Node Manager.
* @return Node Manager. * @return Node Manager.
@ -200,4 +211,12 @@ public final class SCMConfigurator {
public SCMMetadataStore getMetadataStore() { public SCMMetadataStore getMetadataStore() {
return metadataStore; return metadataStore;
} }
/**
* Get network topology cluster tree.
* @return NetworkTopology.
*/
public NetworkTopology getNetworkTopology() {
return networkTopology;
}
} }

View File

@ -374,7 +374,11 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
private void initializeSystemManagers(OzoneConfiguration conf, private void initializeSystemManagers(OzoneConfiguration conf,
SCMConfigurator configurator) SCMConfigurator configurator)
throws IOException { throws IOException {
clusterMap = new NetworkTopologyImpl(conf); if (configurator.getNetworkTopology() != null) {
clusterMap = configurator.getNetworkTopology();
} else {
clusterMap = new NetworkTopologyImpl(conf);
}
if(configurator.getScmNodeManager() != null) { if(configurator.getScmNodeManager() != null) {
scmNodeManager = configurator.getScmNodeManager(); scmNodeManager = configurator.getScmNodeManager();

View File

@ -124,8 +124,8 @@ public final class TestUtils {
+ "." + random.nextInt(256) + "." + random.nextInt(256)
+ "." + random.nextInt(256) + "." + random.nextInt(256)
+ "." + random.nextInt(256); + "." + random.nextInt(256);
return createDatanodeDetails(uuid.toString(), "localhost", ipAddress, return createDatanodeDetails(uuid.toString(), "localhost" + "-" + ipAddress,
null); ipAddress, null);
} }
/** /**

View File

@ -19,6 +19,9 @@ package org.apache.hadoop.hdds.scm.container;
import org.apache.hadoop.hdds.protocol.proto import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.PipelineReportsProto; .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
import org.apache.hadoop.hdds.scm.TestUtils; import org.apache.hadoop.hdds.scm.TestUtils;
import org.apache.hadoop.hdds.scm.net.NetConstants;
import org.apache.hadoop.hdds.scm.net.NetworkTopology;
import org.apache.hadoop.hdds.scm.net.Node;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
@ -83,6 +86,7 @@ public class MockNodeManager implements NodeManager {
private final Map<UUID, List<SCMCommand>> commandMap; private final Map<UUID, List<SCMCommand>> commandMap;
private final Node2PipelineMap node2PipelineMap; private final Node2PipelineMap node2PipelineMap;
private final Node2ContainerMap node2ContainerMap; private final Node2ContainerMap node2ContainerMap;
private NetworkTopology clusterMap;
public MockNodeManager(boolean initializeFakeNodes, int nodeCount) { public MockNodeManager(boolean initializeFakeNodes, int nodeCount) {
this.healthyNodes = new LinkedList<>(); this.healthyNodes = new LinkedList<>();
@ -366,6 +370,9 @@ public class MockNodeManager implements NodeManager {
try { try {
node2ContainerMap.insertNewDatanode(datanodeDetails.getUuid(), node2ContainerMap.insertNewDatanode(datanodeDetails.getUuid(),
Collections.emptySet()); Collections.emptySet());
if (clusterMap != null) {
clusterMap.add(datanodeDetails);
}
} catch (SCMException e) { } catch (SCMException e) {
e.printStackTrace(); e.printStackTrace();
} }
@ -453,7 +460,12 @@ public class MockNodeManager implements NodeManager {
@Override @Override
public DatanodeDetails getNode(String address) { public DatanodeDetails getNode(String address) {
return null; Node node = clusterMap.getNode(NetConstants.DEFAULT_RACK + "/" + address);
return node == null ? null : (DatanodeDetails)node;
}
public void setNetworkTopology(NetworkTopology topology) {
this.clusterMap = topology;
} }
/** /**

View File

@ -51,6 +51,7 @@ import org.junit.rules.ExpectedException;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
@ -1147,6 +1148,15 @@ public class TestSCMNodeManager {
List<DatanodeDetails> nodeList = nodeManager.getAllNodes(); List<DatanodeDetails> nodeList = nodeManager.getAllNodes();
nodeList.stream().forEach(node -> nodeList.stream().forEach(node ->
Assert.assertTrue(node.getNetworkLocation().equals("/rack1"))); Assert.assertTrue(node.getNetworkLocation().equals("/rack1")));
// test get node
if (useHostname) {
Arrays.stream(hostNames).forEach(hostname ->
Assert.assertNotNull(nodeManager.getNode(hostname)));
} else {
Arrays.stream(ipAddress).forEach(ip ->
Assert.assertNotNull(nodeManager.getNode(ip)));
}
} }
} }
} }

View File

@ -136,6 +136,7 @@ function ozonecmd_case
;; ;;
sh | shell) sh | shell)
HADOOP_CLASSNAME=org.apache.hadoop.ozone.web.ozShell.OzoneShell HADOOP_CLASSNAME=org.apache.hadoop.ozone.web.ozShell.OzoneShell
HADOOP_OPTS="${HADOOP_OPTS} ${HDFS_OM_SH_OPTS}"
OZONE_RUN_ARTIFACT_NAME="hadoop-ozone-ozone-manager" OZONE_RUN_ARTIFACT_NAME="hadoop-ozone-ozone-manager"
;; ;;
s3) s3)
@ -171,6 +172,7 @@ function ozonecmd_case
;; ;;
scmcli) scmcli)
HADOOP_CLASSNAME=org.apache.hadoop.hdds.scm.cli.SCMCLI HADOOP_CLASSNAME=org.apache.hadoop.hdds.scm.cli.SCMCLI
HADOOP_OPTS="${HADOOP_OPTS} ${HDFS_SCM_CLI_OPTS}"
OZONE_RUN_ARTIFACT_NAME="hadoop-hdds-tools" OZONE_RUN_ARTIFACT_NAME="hadoop-hdds-tools"
;; ;;
version) version)

View File

@ -19,6 +19,7 @@ services:
datanode_1: datanode_1:
image: apache/ozone-runner:${HADOOP_RUNNER_VERSION} image: apache/ozone-runner:${HADOOP_RUNNER_VERSION}
privileged: true #required by the profiler privileged: true #required by the profiler
container_name: datanode_1
volumes: volumes:
- ../..:/opt/hadoop - ../..:/opt/hadoop
ports: ports:
@ -28,11 +29,12 @@ services:
env_file: env_file:
- ./docker-config - ./docker-config
networks: networks:
service_network: net:
ipv4_address: 10.5.0.4 ipv4_address: 10.5.0.4
datanode_2: datanode_2:
image: apache/ozone-runner:${HADOOP_RUNNER_VERSION} image: apache/ozone-runner:${HADOOP_RUNNER_VERSION}
privileged: true #required by the profiler privileged: true #required by the profiler
container_name: datanode_2
volumes: volumes:
- ../..:/opt/hadoop - ../..:/opt/hadoop
ports: ports:
@ -42,11 +44,12 @@ services:
env_file: env_file:
- ./docker-config - ./docker-config
networks: networks:
service_network: net:
ipv4_address: 10.5.0.5 ipv4_address: 10.5.0.5
datanode_3: datanode_3:
image: apache/ozone-runner:${HADOOP_RUNNER_VERSION} image: apache/ozone-runner:${HADOOP_RUNNER_VERSION}
privileged: true #required by the profiler privileged: true #required by the profiler
container_name: datanode_3
volumes: volumes:
- ../..:/opt/hadoop - ../..:/opt/hadoop
ports: ports:
@ -56,11 +59,12 @@ services:
env_file: env_file:
- ./docker-config - ./docker-config
networks: networks:
service_network: net:
ipv4_address: 10.5.0.6 ipv4_address: 10.5.0.6
datanode_4: datanode_4:
image: apache/ozone-runner:${HADOOP_RUNNER_VERSION} image: apache/ozone-runner:${HADOOP_RUNNER_VERSION}
privileged: true #required by the profiler privileged: true #required by the profiler
container_name: datanode_4
volumes: volumes:
- ../..:/opt/hadoop - ../..:/opt/hadoop
ports: ports:
@ -70,11 +74,12 @@ services:
env_file: env_file:
- ./docker-config - ./docker-config
networks: networks:
service_network: net:
ipv4_address: 10.5.0.7 ipv4_address: 10.5.0.7
om: om:
image: apache/ozone-runner:${HADOOP_RUNNER_VERSION} image: apache/ozone-runner:${HADOOP_RUNNER_VERSION}
privileged: true #required by the profiler privileged: true #required by the profiler
container_name: om
volumes: volumes:
- ../..:/opt/hadoop - ../..:/opt/hadoop
ports: ports:
@ -85,11 +90,12 @@ services:
- ./docker-config - ./docker-config
command: ["/opt/hadoop/bin/ozone","om"] command: ["/opt/hadoop/bin/ozone","om"]
networks: networks:
service_network: net:
ipv4_address: 10.5.0.70 ipv4_address: 10.5.0.70
scm: scm:
image: apache/ozone-runner::${HADOOP_RUNNER_VERSION} image: apache/ozone-runner:${HADOOP_RUNNER_VERSION}
privileged: true #required by the profiler privileged: true #required by the profiler
container_name: scm
volumes: volumes:
- ../..:/opt/hadoop - ../..:/opt/hadoop
ports: ports:
@ -100,10 +106,10 @@ services:
ENSURE_SCM_INITIALIZED: /data/metadata/scm/current/VERSION ENSURE_SCM_INITIALIZED: /data/metadata/scm/current/VERSION
command: ["/opt/hadoop/bin/ozone","scm"] command: ["/opt/hadoop/bin/ozone","scm"]
networks: networks:
service_network: net:
ipv4_address: 10.5.0.71 ipv4_address: 10.5.0.71
networks: networks:
service_network: net:
driver: bridge driver: bridge
ipam: ipam:
config: config:

View File

@ -29,7 +29,8 @@ OZONE-SITE.XML_hdds.profiler.endpoint.enabled=true
HDFS-SITE.XML_rpc.metrics.quantile.enable=true HDFS-SITE.XML_rpc.metrics.quantile.enable=true
HDFS-SITE.XML_rpc.metrics.percentiles.intervals=60,300 HDFS-SITE.XML_rpc.metrics.percentiles.intervals=60,300
HDFS-SITE.XML_net.topology.node.switch.mapping.impl=org.apache.hadoop.net.TableMapping HDFS-SITE.XML_net.topology.node.switch.mapping.impl=org.apache.hadoop.net.TableMapping
HDFS-SITE.XML_net.topology.table.file.name=/opt/hadoop/compose/ozone-net-topology/network-config HDFS-SITE.XML_net.topology.table.file.name=/opt/hadoop/compose/ozone-topology/network-config
HDFS-SITE.XML_dfs.network.topology.aware.read.enable=true
ASYNC_PROFILER_HOME=/opt/profiler ASYNC_PROFILER_HOME=/opt/profiler
LOG4J.PROPERTIES_log4j.rootLogger=DEBUG, ARF LOG4J.PROPERTIES_log4j.rootLogger=DEBUG, ARF
LOG4J.PROPERTIES_log4j.appender.stdout=org.apache.log4j.ConsoleAppender LOG4J.PROPERTIES_log4j.appender.stdout=org.apache.log4j.ConsoleAppender
@ -46,6 +47,8 @@ LOG4J.PROPERTIES_log4j.appender.ARF.file=/opt/hadoop/logs/${module.name}-${user.
HDDS_DN_OPTS=-Dmodule.name=datanode HDDS_DN_OPTS=-Dmodule.name=datanode
HDFS_OM_OPTS=-Dmodule.name=om HDFS_OM_OPTS=-Dmodule.name=om
HDFS_STORAGECONTAINERMANAGER_OPTS=-Dmodule.name=scm HDFS_STORAGECONTAINERMANAGER_OPTS=-Dmodule.name=scm
HDFS_OM_SH_OPTS=-Dmodule.name=sh
HDFS_SCM_CLI_OPTS=-Dmodule.name=scmcli
#Enable this variable to print out all hadoop rpc traffic to the stdout. See http://byteman.jboss.org/ to define your own instrumentation. #Enable this variable to print out all hadoop rpc traffic to the stdout. See http://byteman.jboss.org/ to define your own instrumentation.
#BYTEMAN_SCRIPT_URL=https://raw.githubusercontent.com/apache/hadoop/trunk/dev-support/byteman/hadooprpc.btm #BYTEMAN_SCRIPT_URL=https://raw.githubusercontent.com/apache/hadoop/trunk/dev-support/byteman/hadooprpc.btm

View File

@ -41,6 +41,7 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.StorageType; import org.apache.hadoop.hdds.protocol.StorageType;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
@ -100,9 +101,11 @@ import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.RandomUtils; import org.apache.commons.lang3.RandomUtils;
import static org.apache.hadoop.hdds.client.ReplicationFactor.ONE; import static org.apache.hadoop.hdds.client.ReplicationFactor.ONE;
import static org.apache.hadoop.hdds.client.ReplicationFactor.THREE;
import static org.apache.hadoop.hdds.client.ReplicationType.STAND_ALONE; import static org.apache.hadoop.hdds.client.ReplicationType.STAND_ALONE;
import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.either; import static org.hamcrest.CoreMatchers.either;
import org.junit.Assert; import org.junit.Assert;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
@ -114,6 +117,8 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import org.junit.Ignore; import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
* This is an abstract class to test all the public facing APIs of Ozone * This is an abstract class to test all the public facing APIs of Ozone
@ -124,6 +129,8 @@ import org.junit.Test;
*/ */
public abstract class TestOzoneRpcClientAbstract { public abstract class TestOzoneRpcClientAbstract {
static final Logger LOG =
LoggerFactory.getLogger(TestOzoneRpcClientAbstract.class);
private static MiniOzoneCluster cluster = null; private static MiniOzoneCluster cluster = null;
private static OzoneClient ozClient = null; private static OzoneClient ozClient = null;
private static ObjectStore store = null; private static ObjectStore store = null;
@ -140,7 +147,7 @@ public abstract class TestOzoneRpcClientAbstract {
*/ */
static void startCluster(OzoneConfiguration conf) throws Exception { static void startCluster(OzoneConfiguration conf) throws Exception {
cluster = MiniOzoneCluster.newBuilder(conf) cluster = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(10) .setNumDatanodes(3)
.setScmId(scmId) .setScmId(scmId)
.build(); .build();
cluster.waitForClusterToBeReady(); cluster.waitForClusterToBeReady();
@ -664,6 +671,80 @@ public abstract class TestOzoneRpcClientAbstract {
Assert.assertEquals(value.getBytes().length, keyInfo.getDataSize()); Assert.assertEquals(value.getBytes().length, keyInfo.getDataSize());
} }
/**
* Tests get the information of key with network topology awareness enabled.
* @throws IOException
*/
@Test
public void testGetKeyAndFileWithNetworkTopology() throws IOException {
String volumeName = UUID.randomUUID().toString();
String bucketName = UUID.randomUUID().toString();
String value = "sample value";
store.createVolume(volumeName);
OzoneVolume volume = store.getVolume(volumeName);
volume.createBucket(bucketName);
OzoneBucket bucket = volume.getBucket(bucketName);
String keyName = UUID.randomUUID().toString();
// Write data into a key
OzoneOutputStream out = bucket.createKey(keyName,
value.getBytes().length, ReplicationType.RATIS,
THREE, new HashMap<>());
out.write(value.getBytes());
out.close();
// Since the rpc client is outside of cluster, then getFirstNode should be
// equal to getClosestNode.
OmKeyArgs.Builder builder = new OmKeyArgs.Builder();
builder.setVolumeName(volumeName).setBucketName(bucketName)
.setKeyName(keyName).setRefreshPipeline(true);
// read key with topology aware read enabled(default)
try {
OzoneInputStream is = bucket.readKey(keyName);
byte[] b = new byte[value.getBytes().length];
is.read(b);
Assert.assertTrue(Arrays.equals(b, value.getBytes()));
} catch (OzoneChecksumException e) {
fail("Reading key should success");
}
// read file with topology aware read enabled(default)
try {
OzoneInputStream is = bucket.readFile(keyName);
byte[] b = new byte[value.getBytes().length];
is.read(b);
Assert.assertTrue(Arrays.equals(b, value.getBytes()));
} catch (OzoneChecksumException e) {
fail("Reading file should success");
}
// read key with topology aware read disabled
Configuration conf = cluster.getConf();
conf.set(ScmConfigKeys.DFS_NETWORK_TOPOLOGY_AWARE_READ_ENABLED, "false");
OzoneClient newClient = OzoneClientFactory.getRpcClient(conf);
ObjectStore newStore = newClient.getObjectStore();
OzoneBucket newBucket =
newStore.getVolume(volumeName).getBucket(bucketName);
try {
OzoneInputStream is = newBucket.readKey(keyName);
byte[] b = new byte[value.getBytes().length];
is.read(b);
Assert.assertTrue(Arrays.equals(b, value.getBytes()));
} catch (OzoneChecksumException e) {
fail("Reading key should success");
}
// read file with topology aware read disabled
try {
OzoneInputStream is = newBucket.readFile(keyName);
byte[] b = new byte[value.getBytes().length];
is.read(b);
Assert.assertTrue(Arrays.equals(b, value.getBytes()));
} catch (OzoneChecksumException e) {
fail("Reading file should success");
}
}
@Test @Test
public void testPutKeyRatisOneNode() public void testPutKeyRatisOneNode()

View File

@ -19,18 +19,37 @@
package org.apache.hadoop.ozone.client.rpc; package org.apache.hadoop.ozone.client.rpc;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.UUID;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.ozone.client.ObjectStore;
import org.apache.hadoop.ozone.client.OzoneBucket;
import org.apache.hadoop.ozone.client.OzoneClient;
import org.apache.hadoop.ozone.client.OzoneClientFactory;
import org.apache.hadoop.ozone.client.OzoneVolume;
import org.apache.hadoop.ozone.client.io.OzoneInputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.common.OzoneChecksumException;
import org.apache.hadoop.ozone.om.OMConfigKeys; import org.apache.hadoop.ozone.om.OMConfigKeys;
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test;
import static org.apache.hadoop.hdds.client.ReplicationFactor.THREE;
import static org.junit.Assert.fail;
/** /**
* This class is to test all the public facing APIs of Ozone Client with an * This class is to test all the public facing APIs of Ozone Client with an
* active OM Ratis server. * active OM Ratis server.
*/ */
public class TestOzoneRpcClientWithRatis extends TestOzoneRpcClientAbstract { public class TestOzoneRpcClientWithRatis extends TestOzoneRpcClientAbstract {
private static OzoneConfiguration conf;
/** /**
* Create a MiniOzoneCluster for testing. * Create a MiniOzoneCluster for testing.
* Ozone is made active by setting OZONE_ENABLED = true. * Ozone is made active by setting OZONE_ENABLED = true.
@ -41,7 +60,7 @@ public class TestOzoneRpcClientWithRatis extends TestOzoneRpcClientAbstract {
*/ */
@BeforeClass @BeforeClass
public static void init() throws Exception { public static void init() throws Exception {
OzoneConfiguration conf = new OzoneConfiguration(); conf = new OzoneConfiguration();
conf.setInt(ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT, 1); conf.setInt(ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT, 1);
conf.setBoolean(OMConfigKeys.OZONE_OM_RATIS_ENABLE_KEY, true); conf.setBoolean(OMConfigKeys.OZONE_OM_RATIS_ENABLE_KEY, true);
startCluster(conf); startCluster(conf);
@ -55,4 +74,77 @@ public class TestOzoneRpcClientWithRatis extends TestOzoneRpcClientAbstract {
shutdownCluster(); shutdownCluster();
} }
/**
* Tests get the information of key with network topology awareness enabled.
* @throws IOException
*/
@Test
public void testGetKeyAndFileWithNetworkTopology() throws IOException {
String volumeName = UUID.randomUUID().toString();
String bucketName = UUID.randomUUID().toString();
String value = "sample value";
getStore().createVolume(volumeName);
OzoneVolume volume = getStore().getVolume(volumeName);
volume.createBucket(bucketName);
OzoneBucket bucket = volume.getBucket(bucketName);
String keyName = UUID.randomUUID().toString();
// Write data into a key
OzoneOutputStream out = bucket.createKey(keyName,
value.getBytes().length, ReplicationType.RATIS,
THREE, new HashMap<>());
out.write(value.getBytes());
out.close();
// Since the rpc client is outside of cluster, then getFirstNode should be
// equal to getClosestNode.
OmKeyArgs.Builder builder = new OmKeyArgs.Builder();
builder.setVolumeName(volumeName).setBucketName(bucketName)
.setKeyName(keyName).setRefreshPipeline(true);
// read key with topology aware read enabled(default)
try {
OzoneInputStream is = bucket.readKey(keyName);
byte[] b = new byte[value.getBytes().length];
is.read(b);
Assert.assertTrue(Arrays.equals(b, value.getBytes()));
} catch (OzoneChecksumException e) {
fail("Reading key should success");
}
// read file with topology aware read enabled(default)
try {
OzoneInputStream is = bucket.readFile(keyName);
byte[] b = new byte[value.getBytes().length];
is.read(b);
Assert.assertTrue(Arrays.equals(b, value.getBytes()));
} catch (OzoneChecksumException e) {
fail("Reading file should success");
}
// read key with topology aware read disabled
conf.set(ScmConfigKeys.DFS_NETWORK_TOPOLOGY_AWARE_READ_ENABLED, "false");
OzoneClient newClient = OzoneClientFactory.getRpcClient(conf);
ObjectStore newStore = newClient.getObjectStore();
OzoneBucket newBucket =
newStore.getVolume(volumeName).getBucket(bucketName);
try {
OzoneInputStream is = newBucket.readKey(keyName);
byte[] b = new byte[value.getBytes().length];
is.read(b);
Assert.assertTrue(Arrays.equals(b, value.getBytes()));
} catch (OzoneChecksumException e) {
fail("Reading key should success");
}
// read file with topology aware read disabled
try {
OzoneInputStream is = newBucket.readFile(keyName);
byte[] b = new byte[value.getBytes().length];
is.read(b);
Assert.assertTrue(Arrays.equals(b, value.getBytes()));
} catch (OzoneChecksumException e) {
fail("Reading file should success");
}
}
} }

View File

@ -36,7 +36,9 @@ import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.RandomStringUtils; import org.apache.commons.lang.RandomStringUtils;
import org.apache.hadoop.conf.StorageUnit; import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.scm.TestUtils; import org.apache.hadoop.hdds.scm.TestUtils;
@ -44,7 +46,12 @@ import org.apache.hadoop.hdds.scm.container.MockNodeManager;
import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes; import org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes;
import org.apache.hadoop.hdds.scm.net.NetworkTopology;
import org.apache.hadoop.hdds.scm.net.NetworkTopologyImpl;
import org.apache.hadoop.hdds.scm.net.NodeSchema;
import org.apache.hadoop.hdds.scm.net.NodeSchemaManager;
import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol; import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
import org.apache.hadoop.hdds.scm.server.SCMConfigurator; import org.apache.hadoop.hdds.scm.server.SCMConfigurator;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager; import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
@ -53,6 +60,7 @@ import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.ozone.om.helpers.OmPrefixInfo; import org.apache.hadoop.ozone.om.helpers.OmPrefixInfo;
import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs; import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
import org.apache.hadoop.ozone.om.helpers.OpenKeySession; import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
@ -71,6 +79,7 @@ import org.apache.hadoop.test.LambdaTestUtils;
import org.junit.After; import org.junit.After;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Assume;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
@ -80,6 +89,11 @@ import org.mockito.Mockito;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_KEY_PREALLOCATION_BLOCKS_MAX; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_KEY_PREALLOCATION_BLOCKS_MAX;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAULT; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAULT;
import static org.apache.hadoop.hdds.scm.net.NetConstants.LEAF_SCHEMA;
import static org.apache.hadoop.hdds.scm.net.NetConstants.RACK_SCHEMA;
import static org.apache.hadoop.hdds.scm.net.NetConstants.ROOT_SCHEMA;
import static org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType.ALL; import static org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType.ALL;
/** /**
@ -91,6 +105,7 @@ public class TestKeyManagerImpl {
private static KeyManagerImpl keyManager; private static KeyManagerImpl keyManager;
private static VolumeManagerImpl volumeManager; private static VolumeManagerImpl volumeManager;
private static BucketManagerImpl bucketManager; private static BucketManagerImpl bucketManager;
private static NodeManager nodeManager;
private static StorageContainerManager scm; private static StorageContainerManager scm;
private static ScmBlockLocationProtocol mockScmBlockLocationProtocol; private static ScmBlockLocationProtocol mockScmBlockLocationProtocol;
private static OzoneConfiguration conf; private static OzoneConfiguration conf;
@ -113,9 +128,17 @@ public class TestKeyManagerImpl {
metadataManager = new OmMetadataManagerImpl(conf); metadataManager = new OmMetadataManagerImpl(conf);
volumeManager = new VolumeManagerImpl(metadataManager, conf); volumeManager = new VolumeManagerImpl(metadataManager, conf);
bucketManager = new BucketManagerImpl(metadataManager); bucketManager = new BucketManagerImpl(metadataManager);
NodeManager nodeManager = new MockNodeManager(true, 10); nodeManager = new MockNodeManager(true, 10);
NodeSchema[] schemas = new NodeSchema[]
{ROOT_SCHEMA, RACK_SCHEMA, LEAF_SCHEMA};
NodeSchemaManager schemaManager = NodeSchemaManager.getInstance();
schemaManager.init(schemas, false);
NetworkTopology clusterMap = new NetworkTopologyImpl(schemaManager);
nodeManager.getAllNodes().stream().forEach(node -> clusterMap.add(node));
((MockNodeManager)nodeManager).setNetworkTopology(clusterMap);
SCMConfigurator configurator = new SCMConfigurator(); SCMConfigurator configurator = new SCMConfigurator();
configurator.setScmNodeManager(nodeManager); configurator.setScmNodeManager(nodeManager);
configurator.setNetworkTopology(clusterMap);
scm = TestUtils.getScm(conf, configurator); scm = TestUtils.getScm(conf, configurator);
scm.start(); scm.start();
scm.exitSafeMode(); scm.exitSafeMode();
@ -563,7 +586,7 @@ public class TestKeyManagerImpl {
// lookup for a non-existent file // lookup for a non-existent file
try { try {
keyManager.lookupFile(keyArgs); keyManager.lookupFile(keyArgs, null);
Assert.fail("Lookup file should fail for non existent file"); Assert.fail("Lookup file should fail for non existent file");
} catch (OMException ex) { } catch (OMException ex) {
if (ex.getResult() != OMException.ResultCodes.FILE_NOT_FOUND) { if (ex.getResult() != OMException.ResultCodes.FILE_NOT_FOUND) {
@ -576,14 +599,15 @@ public class TestKeyManagerImpl {
keyArgs.setLocationInfoList( keyArgs.setLocationInfoList(
keySession.getKeyInfo().getLatestVersionLocations().getLocationList()); keySession.getKeyInfo().getLatestVersionLocations().getLocationList());
keyManager.commitKey(keyArgs, keySession.getId()); keyManager.commitKey(keyArgs, keySession.getId());
Assert.assertEquals(keyManager.lookupFile(keyArgs).getKeyName(), keyName); Assert.assertEquals(keyManager.lookupFile(keyArgs, null).getKeyName(),
keyName);
// lookup for created file // lookup for created file
keyArgs = createBuilder() keyArgs = createBuilder()
.setKeyName("") .setKeyName("")
.build(); .build();
try { try {
keyManager.lookupFile(keyArgs); keyManager.lookupFile(keyArgs, null);
Assert.fail("Lookup file should fail for a directory"); Assert.fail("Lookup file should fail for a directory");
} catch (OMException ex) { } catch (OMException ex) {
if (ex.getResult() != OMException.ResultCodes.NOT_A_FILE) { if (ex.getResult() != OMException.ResultCodes.NOT_A_FILE) {
@ -596,6 +620,81 @@ public class TestKeyManagerImpl {
return createBuilder().setKeyName(toKeyName).build(); return createBuilder().setKeyName(toKeyName).build();
} }
@Test
public void testLookupKeyWithLocation() throws IOException {
String keyName = RandomStringUtils.randomAlphabetic(5);
OmKeyArgs keyArgs = createBuilder()
.setKeyName(keyName)
.build();
// lookup for a non-existent key
try {
keyManager.lookupKey(keyArgs, null);
Assert.fail("Lookup key should fail for non existent key");
} catch (OMException ex) {
if (ex.getResult() != OMException.ResultCodes.KEY_NOT_FOUND) {
throw ex;
}
}
// create a key
OpenKeySession keySession = keyManager.createFile(keyArgs, false, false);
// randomly select 3 datanodes
List<DatanodeDetails> nodeList = new ArrayList<>();
nodeList.add((DatanodeDetails)scm.getClusterMap().getNode(
0, null, null, null, null, 0));
nodeList.add((DatanodeDetails)scm.getClusterMap().getNode(
1, null, null, null, null, 0));
nodeList.add((DatanodeDetails)scm.getClusterMap().getNode(
2, null, null, null, null, 0));
Assume.assumeFalse(nodeList.get(0).equals(nodeList.get(1)));
Assume.assumeFalse(nodeList.get(0).equals(nodeList.get(2)));
// create a pipeline using 3 datanodes
Pipeline pipeline = scm.getPipelineManager().createPipeline(
ReplicationType.RATIS, ReplicationFactor.THREE, nodeList);
List<OmKeyLocationInfo> locationInfoList = new ArrayList<>();
locationInfoList.add(
new OmKeyLocationInfo.Builder().setPipeline(pipeline)
.setBlockID(new BlockID(1L, 1L)).build());
keyArgs.setLocationInfoList(locationInfoList);
keyManager.commitKey(keyArgs, keySession.getId());
OmKeyInfo key = keyManager.lookupKey(keyArgs, null);
Assert.assertEquals(key.getKeyName(), keyName);
List<OmKeyLocationInfo> keyLocations =
key.getLatestVersionLocations().getLocationList();
DatanodeDetails leader =
keyLocations.get(0).getPipeline().getFirstNode();
DatanodeDetails follower1 =
keyLocations.get(0).getPipeline().getNodes().get(1);
DatanodeDetails follower2 =
keyLocations.get(0).getPipeline().getNodes().get(2);
Assert.assertNotEquals(leader, follower1);
Assert.assertNotEquals(follower1, follower2);
// lookup key, leader as client
OmKeyInfo key1 = keyManager.lookupKey(keyArgs, leader.getNetworkName());
Assert.assertEquals(leader, key1.getLatestVersionLocations()
.getLocationList().get(0).getPipeline().getClosestNode());
// lookup key, follower1 as client
OmKeyInfo key2 = keyManager.lookupKey(keyArgs, follower1.getNetworkName());
Assert.assertEquals(follower1, key2.getLatestVersionLocations()
.getLocationList().get(0).getPipeline().getClosestNode());
// lookup key, follower2 as client
OmKeyInfo key3 = keyManager.lookupKey(keyArgs, follower2.getNetworkName());
Assert.assertEquals(follower2, key3.getLatestVersionLocations()
.getLocationList().get(0).getPipeline().getClosestNode());
// lookup key, random node as client
OmKeyInfo key4 = keyManager.lookupKey(keyArgs,
"/d=default-drack/127.0.0.1");
Assert.assertEquals(leader, key4.getLatestVersionLocations()
.getLocationList().get(0).getPipeline().getClosestNode());
}
@Test @Test
public void testListStatus() throws IOException { public void testListStatus() throws IOException {
String superDir = RandomStringUtils.randomAlphabetic(5); String superDir = RandomStringUtils.randomAlphabetic(5);

View File

@ -258,7 +258,7 @@ public class TestOmMetrics {
Mockito.doReturn(null).when(mockKm).openKey(null); Mockito.doReturn(null).when(mockKm).openKey(null);
Mockito.doNothing().when(mockKm).deleteKey(null); Mockito.doNothing().when(mockKm).deleteKey(null);
Mockito.doReturn(null).when(mockKm).lookupKey(null); Mockito.doReturn(null).when(mockKm).lookupKey(null, "");
Mockito.doReturn(null).when(mockKm).listKeys(null, null, null, null, 0); Mockito.doReturn(null).when(mockKm).listKeys(null, null, null, null, 0);
Mockito.doNothing().when(mockKm).commitKey(any(OmKeyArgs.class), anyLong()); Mockito.doNothing().when(mockKm).commitKey(any(OmKeyArgs.class), anyLong());
Mockito.doReturn(null).when(mockKm).initiateMultipartUpload( Mockito.doReturn(null).when(mockKm).initiateMultipartUpload(
@ -293,7 +293,7 @@ public class TestOmMetrics {
// inject exception to test for Failure Metrics // inject exception to test for Failure Metrics
Mockito.doThrow(exception).when(mockKm).openKey(null); Mockito.doThrow(exception).when(mockKm).openKey(null);
Mockito.doThrow(exception).when(mockKm).deleteKey(null); Mockito.doThrow(exception).when(mockKm).deleteKey(null);
Mockito.doThrow(exception).when(mockKm).lookupKey(null); Mockito.doThrow(exception).when(mockKm).lookupKey(null, "");
Mockito.doThrow(exception).when(mockKm).listKeys( Mockito.doThrow(exception).when(mockKm).listKeys(
null, null, null, null, 0); null, null, null, null, 0);
Mockito.doThrow(exception).when(mockKm).commitKey(any(OmKeyArgs.class), Mockito.doThrow(exception).when(mockKm).commitKey(any(OmKeyArgs.class),

View File

@ -127,10 +127,12 @@ public interface KeyManager extends OzoneManagerFS {
* DistributedStorageHandler will use to access the data on datanode. * DistributedStorageHandler will use to access the data on datanode.
* *
* @param args the args of the key provided by client. * @param args the args of the key provided by client.
* @param clientAddress a hint to key manager, order the datanode in returned
* pipeline by distance between client and datanode.
* @return a OmKeyInfo instance client uses to talk to container. * @return a OmKeyInfo instance client uses to talk to container.
* @throws IOException * @throws IOException
*/ */
OmKeyInfo lookupKey(OmKeyArgs args) throws IOException; OmKeyInfo lookupKey(OmKeyArgs args, String clientAddress) throws IOException;
/** /**
* Renames an existing key within a bucket. * Renames an existing key within a bucket.

View File

@ -43,6 +43,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FileEncryptionInfo; import org.apache.hadoop.fs.FileEncryptionInfo;
import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
@ -663,7 +664,8 @@ public class KeyManagerImpl implements KeyManager {
} }
@Override @Override
public OmKeyInfo lookupKey(OmKeyArgs args) throws IOException { public OmKeyInfo lookupKey(OmKeyArgs args, String clientAddress)
throws IOException {
Preconditions.checkNotNull(args); Preconditions.checkNotNull(args);
String volumeName = args.getVolumeName(); String volumeName = args.getVolumeName();
String bucketName = args.getBucketName(); String bucketName = args.getBucketName();
@ -718,6 +720,7 @@ public class KeyManagerImpl implements KeyManager {
}); });
} }
} }
sortDatanodeInPipeline(value, clientAddress);
return value; return value;
} catch (IOException ex) { } catch (IOException ex) {
LOG.debug("Get key failed for volume:{} bucket:{} key:{}", LOG.debug("Get key failed for volume:{} bucket:{} key:{}",
@ -1855,7 +1858,8 @@ public class KeyManagerImpl implements KeyManager {
* invalid arguments * invalid arguments
*/ */
@Override @Override
public OmKeyInfo lookupFile(OmKeyArgs args) throws IOException { public OmKeyInfo lookupFile(OmKeyArgs args, String clientAddress)
throws IOException {
Preconditions.checkNotNull(args, "Key args can not be null"); Preconditions.checkNotNull(args, "Key args can not be null");
String volumeName = args.getVolumeName(); String volumeName = args.getVolumeName();
String bucketName = args.getBucketName(); String bucketName = args.getBucketName();
@ -1865,6 +1869,7 @@ public class KeyManagerImpl implements KeyManager {
try { try {
OzoneFileStatus fileStatus = getFileStatus(args); OzoneFileStatus fileStatus = getFileStatus(args);
if (fileStatus.isFile()) { if (fileStatus.isFile()) {
sortDatanodeInPipeline(fileStatus.getKeyInfo(), clientAddress);
return fileStatus.getKeyInfo(); return fileStatus.getKeyInfo();
} }
//if key is not of type file or if key is not found we throw an exception //if key is not of type file or if key is not found we throw an exception
@ -2052,4 +2057,31 @@ public class KeyManagerImpl implements KeyManager {
return encInfo; return encInfo;
} }
private void sortDatanodeInPipeline(OmKeyInfo keyInfo, String clientMachine) {
if (keyInfo != null && clientMachine != null && !clientMachine.isEmpty()) {
for (OmKeyLocationInfoGroup key : keyInfo.getKeyLocationVersions()) {
key.getLocationList().forEach(k -> {
List<DatanodeDetails> nodes = k.getPipeline().getNodes();
List<String> nodeList = new ArrayList<>();
nodes.stream().forEach(node ->
nodeList.add(node.getNetworkName()));
try {
List<DatanodeDetails> sortedNodes = scmClient.getBlockClient()
.sortDatanodes(nodeList, clientMachine);
k.getPipeline().setNodesInOrder(sortedNodes);
LOG.debug("Sort datanodes {} for client {}, return {}", nodes,
clientMachine, sortedNodes);
} catch (IOException e) {
LOG.warn("Unable to sort datanodes based on distance to " +
"client, volume=" + keyInfo.getVolumeName() +
", bucket=" + keyInfo.getBucketName() +
", key=" + keyInfo.getKeyName() +
", client=" + clientMachine +
", datanodes=" + nodes.toString() +
", exception=" + e.getMessage());
}
});
}
}
}
} }

View File

@ -2347,7 +2347,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
boolean auditSuccess = true; boolean auditSuccess = true;
try { try {
metrics.incNumKeyLookups(); metrics.incNumKeyLookups();
return keyManager.lookupKey(args); return keyManager.lookupKey(args, getClientAddress());
} catch (Exception ex) { } catch (Exception ex) {
metrics.incNumKeyLookupFails(); metrics.incNumKeyLookupFails();
auditSuccess = false; auditSuccess = false;
@ -2550,6 +2550,14 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
} }
} }
private static String getClientAddress() {
String clientMachine = Server.getRemoteAddress();
if (clientMachine == null) { //not a RPC client
clientMachine = "";
}
return clientMachine;
}
@Override @Override
public String getRpcPort() { public String getRpcPort() {
return "" + omRpcAddress.getPort(); return "" + omRpcAddress.getPort();
@ -2975,7 +2983,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
boolean auditSuccess = true; boolean auditSuccess = true;
try { try {
metrics.incNumLookupFile(); metrics.incNumLookupFile();
return keyManager.lookupFile(args); return keyManager.lookupFile(args, getClientAddress());
} catch (Exception ex) { } catch (Exception ex) {
metrics.incNumLookupFileFails(); metrics.incNumLookupFileFails();
auditSuccess = false; auditSuccess = false;

View File

@ -38,7 +38,16 @@ public interface OzoneManagerFS extends IOzoneAcl {
OpenKeySession createFile(OmKeyArgs args, boolean isOverWrite, OpenKeySession createFile(OmKeyArgs args, boolean isOverWrite,
boolean isRecursive) throws IOException; boolean isRecursive) throws IOException;
OmKeyInfo lookupFile(OmKeyArgs args) throws IOException; /**
* Look up a file. Return the info of the file to client side.
*
* @param args the args of the key provided by client.
* @param clientAddress a hint to key manager, order the datanode in returned
* pipeline by distance between client and datanode.
* @return a OmKeyInfo instance client uses to talk to container.
* @throws IOException
*/
OmKeyInfo lookupFile(OmKeyArgs args, String clientAddress) throws IOException;
List<OzoneFileStatus> listStatus(OmKeyArgs keyArgs, boolean recursive, List<OzoneFileStatus> listStatus(OmKeyArgs keyArgs, boolean recursive,
String startKey, long numEntries) throws IOException; String startKey, long numEntries) throws IOException;

View File

@ -182,6 +182,12 @@ public class ScmBlockLocationTestIngClient implements ScmBlockLocationProtocol {
return builder.build(); return builder.build();
} }
@Override
public List<DatanodeDetails> sortDatanodes(List<String> nodes,
String clientMachine) throws IOException {
return null;
}
@Override @Override
public void close() throws IOException { public void close() throws IOException {