diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java index 8dd3753887f..3cc6fa164dd 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java @@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.HddsUtils; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto; import org.apache.hadoop.hdds.protocol.datanode.proto.XceiverClientProtocolServiceGrpc; @@ -64,7 +65,7 @@ import java.util.concurrent.TimeoutException; /** - * A Client for the storageContainer protocol. + * A Client for the storageContainer protocol for read object data. */ public class XceiverClientGrpc extends XceiverClientSpi { static final Logger LOG = LoggerFactory.getLogger(XceiverClientGrpc.class); @@ -76,6 +77,7 @@ public class XceiverClientGrpc extends XceiverClientSpi { private final Semaphore semaphore; private boolean closed = false; private SecurityConfig secConfig; + private final boolean topologyAwareRead; /** * Constructs a client that can communicate with the Container framework on @@ -96,6 +98,9 @@ public XceiverClientGrpc(Pipeline pipeline, Configuration config) { this.metrics = XceiverClientManager.getXceiverClientMetrics(); this.channels = new HashMap<>(); this.asyncStubs = new HashMap<>(); + this.topologyAwareRead = Boolean.parseBoolean(config.get( + ScmConfigKeys.DFS_NETWORK_TOPOLOGY_AWARE_READ_ENABLED, + ScmConfigKeys.DFS_NETWORK_TOPOLOGY_AWARE_READ_ENABLED_DEFAULT)); } /** @@ -103,9 +108,10 @@ public XceiverClientGrpc(Pipeline pipeline, Configuration config) { */ @Override public void connect() throws Exception { - // leader by default is the 1st datanode in the datanode list of pipleline - DatanodeDetails dn = this.pipeline.getFirstNode(); - // just make a connection to the 1st datanode at the beginning + // connect to the closest node, if closest node doesn't exist, delegate to + // first node, which is usually the leader in the pipeline. + DatanodeDetails dn = this.pipeline.getClosestNode(); + // just make a connection to the picked datanode at the beginning connectToDatanode(dn, null); } @@ -114,9 +120,11 @@ public void connect() throws Exception { */ @Override public void connect(String encodedToken) throws Exception { - // leader by default is the 1st datanode in the datanode list of pipleline - DatanodeDetails dn = this.pipeline.getFirstNode(); - // just make a connection to the 1st datanode at the beginning + // connect to the closest node, if closest node doesn't exist, delegate to + // first node, which is usually the leader in the pipeline. + DatanodeDetails dn; + dn = this.pipeline.getClosestNode(); + // just make a connection to the picked datanode at the beginning connectToDatanode(dn, encodedToken); } @@ -132,7 +140,8 @@ private void connectToDatanode(DatanodeDetails dn, String encodedToken) // Add credential context to the client call String userName = UserGroupInformation.getCurrentUser().getShortUserName(); - LOG.debug("Connecting to server Port : " + dn.getIpAddress()); + LOG.debug("Nodes in pipeline : {}", pipeline.getNodes().toString()); + LOG.debug("Connecting to server : {}", dn.getIpAddress()); NettyChannelBuilder channelBuilder = NettyChannelBuilder.forAddress(dn.getIpAddress(), port).usePlaintext() .maxInboundMessageSize(OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE) @@ -252,7 +261,15 @@ private XceiverClientReply sendCommandWithRetry( // TODO: cache the correct leader info in here, so that any subsequent calls // should first go to leader XceiverClientReply reply = new XceiverClientReply(null); - for (DatanodeDetails dn : pipeline.getNodes()) { + List datanodeList; + if ((request.getCmdType() == ContainerProtos.Type.ReadChunk || + request.getCmdType() == ContainerProtos.Type.GetSmallFile) && + topologyAwareRead) { + datanodeList = pipeline.getNodesInOrder(); + } else { + datanodeList = pipeline.getNodes(); + } + for (DatanodeDetails dn : datanodeList) { try { LOG.debug("Executing command " + request + " on datanode " + dn); // In case the command gets retried on a 2nd datanode, @@ -349,6 +366,8 @@ private XceiverClientReply sendCommandAsync( reconnect(dn, token); } + LOG.debug("Send command {} to datanode {}", request.getCmdType().toString(), + dn.getNetworkFullPath()); final CompletableFuture replyFuture = new CompletableFuture<>(); semaphore.acquire(); diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java index f9b5e6d05cb..57799aab4d0 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java @@ -29,6 +29,8 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.ozone.OzoneSecurityUtil; import org.apache.hadoop.security.UserGroupInformation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.Closeable; import java.io.IOException; @@ -57,7 +59,8 @@ * not being used for a period of time. */ public class XceiverClientManager implements Closeable { - + private static final Logger LOG = + LoggerFactory.getLogger(XceiverClientManager.class); //TODO : change this to SCM configuration class private final Configuration conf; private final Cache clientCache; @@ -65,6 +68,7 @@ public class XceiverClientManager implements Closeable { private static XceiverClientMetrics metrics; private boolean isSecurityEnabled; + private final boolean topologyAwareRead; /** * Creates a new XceiverClientManager. * @@ -98,6 +102,9 @@ public void onRemoval( } } }).build(); + topologyAwareRead = Boolean.parseBoolean(conf.get( + ScmConfigKeys.DFS_NETWORK_TOPOLOGY_AWARE_READ_ENABLED, + ScmConfigKeys.DFS_NETWORK_TOPOLOGY_AWARE_READ_ENABLED_DEFAULT)); } @VisibleForTesting @@ -118,12 +125,32 @@ public Cache getClientCache() { */ public XceiverClientSpi acquireClient(Pipeline pipeline) throws IOException { + return acquireClient(pipeline, false); + } + + /** + * Acquires a XceiverClientSpi connected to a container for read. + * + * If there is already a cached XceiverClientSpi, simply return + * the cached otherwise create a new one. + * + * @param pipeline the container pipeline for the client connection + * @return XceiverClientSpi connected to a container + * @throws IOException if a XceiverClientSpi cannot be acquired + */ + public XceiverClientSpi acquireClientForReadData(Pipeline pipeline) + throws IOException { + return acquireClient(pipeline, true); + } + + private XceiverClientSpi acquireClient(Pipeline pipeline, boolean read) + throws IOException { Preconditions.checkNotNull(pipeline); Preconditions.checkArgument(pipeline.getNodes() != null); Preconditions.checkArgument(!pipeline.getNodes().isEmpty()); synchronized (clientCache) { - XceiverClientSpi info = getClient(pipeline); + XceiverClientSpi info = getClient(pipeline, read); info.incrementReference(); return info; } @@ -136,12 +163,28 @@ public XceiverClientSpi acquireClient(Pipeline pipeline) * @param invalidateClient if true, invalidates the client in cache */ public void releaseClient(XceiverClientSpi client, boolean invalidateClient) { + releaseClient(client, invalidateClient, false); + } + + /** + * Releases a read XceiverClientSpi after use. + * + * @param client client to release + * @param invalidateClient if true, invalidates the client in cache + */ + public void releaseClientForReadData(XceiverClientSpi client, + boolean invalidateClient) { + releaseClient(client, invalidateClient, true); + } + + private void releaseClient(XceiverClientSpi client, boolean invalidateClient, + boolean read) { Preconditions.checkNotNull(client); synchronized (clientCache) { client.decrementReference(); if (invalidateClient) { Pipeline pipeline = client.getPipeline(); - String key = pipeline.getId().getId().toString() + pipeline.getType(); + String key = getPipelineCacheKey(pipeline, read); XceiverClientSpi cachedClient = clientCache.getIfPresent(key); if (cachedClient == client) { clientCache.invalidate(key); @@ -150,11 +193,13 @@ public void releaseClient(XceiverClientSpi client, boolean invalidateClient) { } } - private XceiverClientSpi getClient(Pipeline pipeline) + private XceiverClientSpi getClient(Pipeline pipeline, boolean forRead) throws IOException { HddsProtos.ReplicationType type = pipeline.getType(); try { - String key = pipeline.getId().getId().toString() + type; + // create different client for read different pipeline node based on + // network topology + String key = getPipelineCacheKey(pipeline, forRead); // Append user short name to key to prevent a different user // from using same instance of xceiverClient. key = isSecurityEnabled ? @@ -184,6 +229,19 @@ public XceiverClientSpi call() throws Exception { } } + private String getPipelineCacheKey(Pipeline pipeline, boolean forRead) { + String key = pipeline.getId().getId().toString() + pipeline.getType(); + if (topologyAwareRead && forRead) { + try { + key += pipeline.getClosestNode().getHostName(); + } catch (IOException e) { + LOG.error("Failed to get closest node to create pipeline cache key:" + + e.getMessage()); + } + } + return key; + } + /** * Close and remove all the cached clients. */ diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java index e66db5ffb61..35807f49842 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java @@ -151,7 +151,7 @@ protected List getChunkInfos() throws IOException { pipeline = Pipeline.newBuilder(pipeline) .setType(HddsProtos.ReplicationType.STAND_ALONE).build(); } - xceiverClient = xceiverClientManager.acquireClient(pipeline); + xceiverClient = xceiverClientManager.acquireClientForReadData(pipeline); boolean success = false; List chunks; try { @@ -170,7 +170,7 @@ protected List getChunkInfos() throws IOException { success = true; } finally { if (!success) { - xceiverClientManager.releaseClient(xceiverClient, false); + xceiverClientManager.releaseClientForReadData(xceiverClient, false); } } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java index 34de02899dc..31e4df03e32 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java @@ -76,6 +76,7 @@ protected DatanodeDetails(DatanodeDetails datanodeDetails) { this.ipAddress = datanodeDetails.ipAddress; this.hostName = datanodeDetails.hostName; this.ports = datanodeDetails.ports; + this.setNetworkName(datanodeDetails.getNetworkName()); } /** @@ -192,6 +193,12 @@ public static DatanodeDetails getFromProtoBuf( builder.addPort(newPort( Port.Name.valueOf(port.getName().toUpperCase()), port.getValue())); } + if (datanodeDetailsProto.hasNetworkLocation()) { + builder.setNetworkLocation(datanodeDetailsProto.getNetworkLocation()); + } + if (datanodeDetailsProto.hasNetworkName()) { + builder.setNetworkName(datanodeDetailsProto.getNetworkName()); + } return builder.build(); } @@ -213,6 +220,7 @@ public HddsProtos.DatanodeDetailsProto getProtoBufMessage() { builder.setCertSerialId(certSerialId); } builder.setNetworkLocation(getNetworkLocation()); + builder.setNetworkName(getNetworkName()); for (Port port : ports) { builder.addPorts(HddsProtos.Port.newBuilder() @@ -268,6 +276,7 @@ public static final class Builder { private String id; private String ipAddress; private String hostName; + private String networkName; private String networkLocation; private List ports; private String certSerialId; @@ -313,6 +322,17 @@ public Builder setHostName(String host) { return this; } + /** + * Sets the network name of DataNode. + * + * @param name network name + * @return DatanodeDetails.Builder + */ + public Builder setNetworkName(String name) { + this.networkName = name; + return this; + } + /** * Sets the network location of DataNode. * @@ -358,8 +378,12 @@ public DatanodeDetails build() { if (networkLocation == null) { networkLocation = NetConstants.DEFAULT_RACK; } - return new DatanodeDetails(id, ipAddress, hostName, networkLocation, - ports, certSerialId); + DatanodeDetails dn = new DatanodeDetails(id, ipAddress, hostName, + networkLocation, ports, certSerialId); + if (networkName != null) { + dn.setNetworkName(networkName); + } + return dn; } } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java index 1213dee4f8a..4b3b89db54f 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java @@ -368,6 +368,10 @@ public final class ScmConfigKeys { "ozone.scm.network.topology.schema.file"; public static final String OZONE_SCM_NETWORK_TOPOLOGY_SCHEMA_FILE_DEFAULT = "network-topology-default.xml"; + public static final String DFS_NETWORK_TOPOLOGY_AWARE_READ_ENABLED = + "dfs.network.topology.aware.read.enable"; + public static final String DFS_NETWORK_TOPOLOGY_AWARE_READ_ENABLED_DEFAULT = + "true"; public static final String HDDS_TRACING_ENABLED = "hdds.tracing.enabled"; public static final boolean HDDS_TRACING_ENABLED_DEFAULT = true; diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NetworkTopology.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NetworkTopology.java index a3d3680e475..8d8571ddb0a 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NetworkTopology.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NetworkTopology.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdds.scm.net; import java.util.Collection; +import java.util.List; /** * The interface defines a network topology. @@ -246,5 +247,6 @@ Node getNode(int leafIndex, String scope, String excludedScope, * @param nodes Available replicas with the requested data * @param activeLen Number of active nodes at the front of the array */ - void sortByDistanceCost(Node reader, Node[] nodes, int activeLen); + List sortByDistanceCost(Node reader, + List nodes, int activeLen); } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NetworkTopologyImpl.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NetworkTopologyImpl.java index 8613ed7116a..1a03215a513 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NetworkTopologyImpl.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NetworkTopologyImpl.java @@ -23,6 +23,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -545,10 +546,6 @@ private Node chooseNodeInternal(String scope, int leafIndex, ancestorGen); return null; } - LOG.debug("Choosing random from \"{}\" available nodes on node \"{}\"," + - " scope=\"{}\", excludedScope=\"{}\", excludeNodes=\"{}\".", - availableNodes, scopeNode, scopeNode.getNetworkFullPath(), - excludedScope, excludedNodes); // scope is a Leaf node if (!(scopeNode instanceof InnerNode)) { @@ -556,15 +553,21 @@ private Node chooseNodeInternal(String scope, int leafIndex, } Node ret; + int nodeIndex; if (leafIndex >= 0) { - ret = ((InnerNode)scopeNode).getLeaf(leafIndex % availableNodes, - excludedScope, mutableExNodes, ancestorGen); + nodeIndex = leafIndex % availableNodes; + ret = ((InnerNode)scopeNode).getLeaf(nodeIndex, excludedScope, + mutableExNodes, ancestorGen); } else { - final int index = ThreadLocalRandom.current().nextInt(availableNodes); - ret = ((InnerNode)scopeNode).getLeaf(index, excludedScope, mutableExNodes, - ancestorGen); + nodeIndex = ThreadLocalRandom.current().nextInt(availableNodes); + ret = ((InnerNode)scopeNode).getLeaf(nodeIndex, excludedScope, + mutableExNodes, ancestorGen); } - LOG.debug("chooseRandom return {}", ret); + LOG.debug("Choosing node[index={},random={}] from \"{}\" available nodes" + + " scope=\"{}\", excludedScope=\"{}\", excludeNodes=\"{}\".", + nodeIndex, (leafIndex == -1 ? "true" : "false"), availableNodes, + scopeNode.getNetworkFullPath(), excludedScope, excludedNodes); + LOG.debug("Chosen node = {}", (ret == null ? "not found" : ret.toString())); return ret; } @@ -583,13 +586,16 @@ public int getDistanceCost(Node node1, Node node2) { (node1 == null && node2 == null)) { return 0; } + if (node1 == null || node2 == null) { + LOG.warn("One of the nodes is a null pointer"); + return Integer.MAX_VALUE; + } int cost = 0; netlock.readLock().lock(); try { - if (node1 == null || node2 == null || - (node1.getAncestor(maxLevel - 1) != clusterTree) || + if ((node1.getAncestor(maxLevel - 1) != clusterTree) || (node2.getAncestor(maxLevel - 1) != clusterTree)) { - LOG.warn("One of the nodes is a null pointer"); + LOG.warn("One of the nodes is outside of network topology"); return Integer.MAX_VALUE; } int level1 = node1.getLevel(); @@ -630,17 +636,21 @@ public int getDistanceCost(Node node1, Node node2) { * @param nodes Available replicas with the requested data * @param activeLen Number of active nodes at the front of the array */ - public void sortByDistanceCost(Node reader, Node[] nodes, int activeLen) { + public List sortByDistanceCost(Node reader, + List nodes, int activeLen) { /** Sort weights for the nodes array */ + if (reader == null) { + return nodes; + } int[] costs = new int[activeLen]; for (int i = 0; i < activeLen; i++) { - costs[i] = getDistanceCost(reader, nodes[i]); + costs[i] = getDistanceCost(reader, nodes.get(i)); } // Add cost/node pairs to a TreeMap to sort TreeMap> tree = new TreeMap>(); for (int i = 0; i < activeLen; i++) { int cost = costs[i]; - Node node = nodes[i]; + Node node = nodes.get(i); List list = tree.get(cost); if (list == null) { list = Lists.newArrayListWithExpectedSize(1); @@ -648,17 +658,20 @@ public void sortByDistanceCost(Node reader, Node[] nodes, int activeLen) { } list.add(node); } - int idx = 0; + + List ret = new ArrayList<>(); for (List list: tree.values()) { if (list != null) { Collections.shuffle(list); for (Node n: list) { - nodes[idx] = n; - idx++; + ret.add(n); } } } - Preconditions.checkState(idx == activeLen, "Wrong number of nodes sorted!"); + + Preconditions.checkState(ret.size() == activeLen, + "Wrong number of nodes sorted!"); + return ret; } /** diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java index 7f75dd1b000..0a91f6ba7ca 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java @@ -25,9 +25,12 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; +import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -38,12 +41,16 @@ */ public final class Pipeline { + private static final Logger LOG = LoggerFactory + .getLogger(Pipeline.class); private final PipelineID id; private final ReplicationType type; private final ReplicationFactor factor; private PipelineState state; private Map nodeStatus; + // nodes with ordered distance to client + private ThreadLocal> nodesInOrder = new ThreadLocal<>(); /** * The immutable properties of pipeline object is used in @@ -112,6 +119,14 @@ public DatanodeDetails getFirstNode() throws IOException { return nodeStatus.keySet().iterator().next(); } + public DatanodeDetails getClosestNode() throws IOException { + if (nodesInOrder.get() == null || nodesInOrder.get().isEmpty()) { + LOG.debug("Nodes in order is empty, delegate to getFirstNode"); + return getFirstNode(); + } + return nodesInOrder.get().get(0); + } + public boolean isClosed() { return state == PipelineState.CLOSED; } @@ -120,6 +135,18 @@ public boolean isOpen() { return state == PipelineState.OPEN; } + public void setNodesInOrder(List nodes) { + nodesInOrder.set(nodes); + } + + public List getNodesInOrder() { + if (nodesInOrder.get() == null || nodesInOrder.get().isEmpty()) { + LOG.debug("Nodes in order is empty, delegate to getNodes"); + return getNodes(); + } + return nodesInOrder.get(); + } + void reportDatanode(DatanodeDetails dn) throws IOException { if (nodeStatus.get(dn) == null) { throw new IOException( @@ -152,6 +179,22 @@ public HddsProtos.Pipeline getProtobufMessage() .addAllMembers(nodeStatus.keySet().stream() .map(DatanodeDetails::getProtoBufMessage) .collect(Collectors.toList())); + // To save the message size on wire, only transfer the node order based on + // network topology + List nodes = nodesInOrder.get(); + if (nodes != null && !nodes.isEmpty()) { + for (int i = 0; i < nodes.size(); i++) { + Iterator it = nodeStatus.keySet().iterator(); + for (int j = 0; j < nodeStatus.keySet().size(); j++) { + if (it.next().equals(nodes.get(i))) { + builder.addMemberOrders(j); + break; + } + } + } + LOG.info("Serialize pipeline {} with nodesInOrder{ }", id.toString(), + nodes); + } return builder.build(); } @@ -164,10 +207,10 @@ public static Pipeline getFromProtobuf(HddsProtos.Pipeline pipeline) .setState(PipelineState.fromProtobuf(pipeline.getState())) .setNodes(pipeline.getMembersList().stream() .map(DatanodeDetails::getFromProtoBuf).collect(Collectors.toList())) + .setNodesInOrder(pipeline.getMemberOrdersList()) .build(); } - @Override public boolean equals(Object o) { if (this == o) { @@ -228,6 +271,8 @@ public static class Builder { private ReplicationFactor factor = null; private PipelineState state = null; private Map nodeStatus = null; + private List nodeOrder = null; + private List nodesInOrder = null; public Builder() {} @@ -237,6 +282,7 @@ public Builder(Pipeline pipeline) { this.factor = pipeline.factor; this.state = pipeline.state; this.nodeStatus = pipeline.nodeStatus; + this.nodesInOrder = pipeline.nodesInOrder.get(); } public Builder setId(PipelineID id1) { @@ -265,13 +311,42 @@ public Builder setNodes(List nodes) { return this; } + public Builder setNodesInOrder(List orders) { + this.nodeOrder = orders; + return this; + } + public Pipeline build() { Preconditions.checkNotNull(id); Preconditions.checkNotNull(type); Preconditions.checkNotNull(factor); Preconditions.checkNotNull(state); Preconditions.checkNotNull(nodeStatus); - return new Pipeline(id, type, factor, state, nodeStatus); + Pipeline pipeline = new Pipeline(id, type, factor, state, nodeStatus); + + if (nodeOrder != null && !nodeOrder.isEmpty()) { + // This branch is for build from ProtoBuf + List nodesWithOrder = new ArrayList<>(); + for(int i = 0; i < nodeOrder.size(); i++) { + int nodeIndex = nodeOrder.get(i); + Iterator it = nodeStatus.keySet().iterator(); + while(it.hasNext() && nodeIndex >= 0) { + DatanodeDetails node = it.next(); + if (nodeIndex == 0) { + nodesWithOrder.add(node); + break; + } + nodeIndex--; + } + } + LOG.info("Deserialize nodesInOrder {} in pipeline {}", nodesWithOrder, + id.toString()); + pipeline.setNodesInOrder(nodesWithOrder); + } else if (nodesInOrder != null){ + // This branch is for pipeline clone + pipeline.setNodesInOrder(nodesInOrder); + } + return pipeline; } } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocol.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocol.java index 12b591210ac..18045f88cbd 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocol.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocol.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hdds.scm.protocol; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; import org.apache.hadoop.security.KerberosInfo; @@ -74,4 +75,12 @@ List allocateBlock(long size, int numBlocks, * Gets the Clusterid and SCM Id from SCM. */ ScmInfo getScmInfo() throws IOException; + + /** + * Sort datanodes with distance to client. + * @param nodes list of network name of each node. + * @param clientMachine client address, depends, can be hostname or ipaddress. + */ + List sortDatanodes(List nodes, + String clientMachine) throws IOException; } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java index af53ea1688c..a262bb5bdbd 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java @@ -24,6 +24,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hdds.client.ContainerBlockID; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.SCMBlockLocationRequest; import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.SCMBlockLocationResponse; @@ -34,6 +35,10 @@ import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.DeleteScmKeyBlocksRequestProto; import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.DeleteScmKeyBlocksResponseProto; import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.KeyBlocks; +import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos + .SortDatanodesRequestProto; +import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos + .SortDatanodesResponseProto; import org.apache.hadoop.hdds.scm.ScmInfo; import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock; import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; @@ -227,6 +232,35 @@ public ScmInfo getScmInfo() throws IOException { return builder.build(); } + /** + * Sort the datanodes based on distance from client. + * @return List + * @throws IOException + */ + @Override + public List sortDatanodes(List nodes, + String clientMachine) throws IOException { + SortDatanodesRequestProto request = SortDatanodesRequestProto + .newBuilder() + .addAllNodeNetworkName(nodes) + .setClient(clientMachine) + .build(); + SCMBlockLocationRequest wrapper = createSCMBlockRequest( + Type.SortDatanodes) + .setSortDatanodesRequest(request) + .build(); + + final SCMBlockLocationResponse wrappedResponse = + handleError(submitRequest(wrapper)); + SortDatanodesResponseProto resp = + wrappedResponse.getSortDatanodesResponse(); + List results = new ArrayList<>(resp.getNodeCount()); + results.addAll(resp.getNodeList().stream() + .map(node -> DatanodeDetails.getFromProtoBuf(node)) + .collect(Collectors.toList())); + return results; + } + @Override public Object getUnderlyingProxyObject() { return rpcProxy; diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java index c29f3959183..d0ba60d9ad7 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java @@ -230,7 +230,7 @@ public static ContainerProtos.ReadChunkResponseProto readChunk( ReadChunkRequestProto.newBuilder() .setBlockID(blockID.getDatanodeBlockIDProtobuf()) .setChunkData(chunk); - String id = xceiverClient.getPipeline().getFirstNode().getUuidString(); + String id = xceiverClient.getPipeline().getClosestNode().getUuidString(); ContainerCommandRequestProto.Builder builder = ContainerCommandRequestProto.newBuilder().setCmdType(Type.ReadChunk) .setContainerID(blockID.getContainerID()) @@ -494,7 +494,7 @@ public static GetSmallFileResponseProto readSmallFile(XceiverClientSpi client, GetSmallFileRequestProto .newBuilder().setBlock(getBlock) .build(); - String id = client.getPipeline().getFirstNode().getUuidString(); + String id = client.getPipeline().getClosestNode().getUuidString(); ContainerCommandRequestProto.Builder builder = ContainerCommandRequestProto .newBuilder() diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/SCMAction.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/SCMAction.java index f6629551e17..2dc8df7d02a 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/SCMAction.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/SCMAction.java @@ -35,7 +35,8 @@ public enum SCMAction implements AuditAction { CLOSE_PIPELINE, DELETE_CONTAINER, IN_SAFE_MODE, - FORCE_EXIT_SAFE_MODE; + FORCE_EXIT_SAFE_MODE, + SORT_DATANODE; @Override public String getAction() { diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/ScmBlockLocationProtocolServerSideTranslatorPB.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/ScmBlockLocationProtocolServerSideTranslatorPB.java index 935d240031a..5c3648e6d34 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/ScmBlockLocationProtocolServerSideTranslatorPB.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/ScmBlockLocationProtocolServerSideTranslatorPB.java @@ -23,6 +23,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos .AllocateBlockResponse; import org.apache.hadoop.hdds.scm.ScmInfo; @@ -50,6 +51,10 @@ .SCMBlockLocationRequest; import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos .Status; +import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos + .SortDatanodesRequestProto; +import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos + .SortDatanodesResponseProto; import org.apache.hadoop.hdds.tracing.TracingUtil; import org.apache.hadoop.ozone.common.BlockGroup; import org.apache.hadoop.ozone.common.DeleteBlockGroupResult; @@ -114,6 +119,10 @@ public SCMBlockLocationResponse send(RpcController controller, response.setGetScmInfoResponse( getScmInfo(request.getGetScmInfoRequest())); break; + case SortDatanodes: + response.setSortDatanodesResponse( + sortDatanodes(request.getSortDatanodesRequest())); + break; default: // Should never happen throw new IOException("Unknown Operation "+request.getCmdType()+ @@ -193,4 +202,21 @@ public HddsProtos.GetScmInfoResponseProto getScmInfo( .setScmId(scmInfo.getScmId()) .build(); } + + public SortDatanodesResponseProto sortDatanodes( + SortDatanodesRequestProto request) throws ServiceException { + SortDatanodesResponseProto.Builder resp = + SortDatanodesResponseProto.newBuilder(); + try { + List nodeList = request.getNodeNetworkNameList(); + final List results = + impl.sortDatanodes(nodeList, request.getClient()); + if (results != null && results.size() > 0) { + results.stream().forEach(dn -> resp.addNode(dn.getProtoBufMessage())); + } + return resp.build(); + } catch (IOException ex) { + throw new ServiceException(ex); + } + } } diff --git a/hadoop-hdds/common/src/main/proto/ScmBlockLocationProtocol.proto b/hadoop-hdds/common/src/main/proto/ScmBlockLocationProtocol.proto index 81144ab9f07..ded0d027f6b 100644 --- a/hadoop-hdds/common/src/main/proto/ScmBlockLocationProtocol.proto +++ b/hadoop-hdds/common/src/main/proto/ScmBlockLocationProtocol.proto @@ -37,6 +37,7 @@ enum Type { AllocateScmBlock = 11; DeleteScmKeyBlocks = 12; GetScmInfo = 13; + SortDatanodes = 14; } message SCMBlockLocationRequest { @@ -51,6 +52,7 @@ message SCMBlockLocationRequest { optional AllocateScmBlockRequestProto allocateScmBlockRequest = 11; optional DeleteScmKeyBlocksRequestProto deleteScmKeyBlocksRequest = 12; optional hadoop.hdds.GetScmInfoRequestProto getScmInfoRequest = 13; + optional SortDatanodesRequestProto sortDatanodesRequest = 14; } message SCMBlockLocationResponse { @@ -71,6 +73,7 @@ message SCMBlockLocationResponse { optional AllocateScmBlockResponseProto allocateScmBlockResponse = 11; optional DeleteScmKeyBlocksResponseProto deleteScmKeyBlocksResponse = 12; optional hadoop.hdds.GetScmInfoResponseProto getScmInfoResponse = 13; + optional SortDatanodesResponseProto sortDatanodesResponse = 14; } /** @@ -185,6 +188,19 @@ message AllocateScmBlockResponseProto { repeated AllocateBlockResponse blocks = 3; } +/** + * Datanode sort request sent by OM to SCM, it contains + * multiple number of datanodes. + */ +message SortDatanodesRequestProto{ + required string client = 1; + repeated string nodeNetworkName = 2; +} + +message SortDatanodesResponseProto{ + repeated DatanodeDetailsProto node = 1; +} + /** * Protocol used from OzoneManager to StorageContainerManager. * See request and response messages for details of the RPC calls. diff --git a/hadoop-hdds/common/src/main/proto/hdds.proto b/hadoop-hdds/common/src/main/proto/hdds.proto index 2d5cb03ba00..6475f4c901c 100644 --- a/hadoop-hdds/common/src/main/proto/hdds.proto +++ b/hadoop-hdds/common/src/main/proto/hdds.proto @@ -34,7 +34,9 @@ message DatanodeDetailsProto { required string hostName = 3; // hostname repeated Port ports = 4; optional string certSerialId = 5; // Certificate serial id. - optional string networkLocation = 6; // Network topology location + // network name, can be Ip address or host name, depends + optional string networkName = 6; + optional string networkLocation = 7; // Network topology location } /** @@ -71,6 +73,7 @@ message Pipeline { optional ReplicationType type = 4 [default = STAND_ALONE]; optional ReplicationFactor factor = 5 [default = ONE]; required PipelineID id = 6; + repeated uint32 memberOrders = 7; } message KeyValue { diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index 27b02e6957f..7fd4ad0d26b 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -2345,6 +2345,14 @@ awareness document for xml and yaml topology definition samples. + + dfs.network.topology.aware.read.enable + true + OZONE, PERFORMANCE + + Whether to enable topology aware read to improve the read performance. + + ozone.recon.container.db.impl RocksDB diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/net/TestNetworkTopologyImpl.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/net/TestNetworkTopologyImpl.java index 0edfb07fba2..e0041a4ca59 100644 --- a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/net/TestNetworkTopologyImpl.java +++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/net/TestNetworkTopologyImpl.java @@ -784,15 +784,16 @@ public void testSortByDistanceCost() { for (Node[] nodeList : nodes) { int length = nodeList.length; while (length > 0) { - cluster.sortByDistanceCost(reader, nodeList, length); - for (int i = 0; i < nodeList.length; i++) { - if ((i + 1) < nodeList.length) { - int cost1 = cluster.getDistanceCost(reader, nodeList[i]); - int cost2 = cluster.getDistanceCost(reader, nodeList[i + 1]); + List ret = cluster.sortByDistanceCost(reader, + Arrays.asList(nodeList), length); + for (int i = 0; i < ret.size(); i++) { + if ((i + 1) < ret.size()) { + int cost1 = cluster.getDistanceCost(reader, ret.get(i)); + int cost2 = cluster.getDistanceCost(reader, ret.get(i + 1)); assertTrue("reader:" + (reader != null ? reader.getNetworkFullPath() : "null") + - ",node1:" + nodeList[i].getNetworkFullPath() + - ",node2:" + nodeList[i + 1].getNetworkFullPath() + + ",node1:" + ret.get(i).getNetworkFullPath() + + ",node2:" + ret.get(i + 1).getNetworkFullPath() + ",cost1:" + cost1 + ",cost2:" + cost2, cost1 == Integer.MAX_VALUE || cost1 <= cost2); } @@ -803,20 +804,22 @@ public void testSortByDistanceCost() { } // sort all nodes - Node[] nodeList = dataNodes.clone(); + List nodeList = Arrays.asList(dataNodes.clone()); for (Node reader : readers) { - int length = nodeList.length; + int length = nodeList.size(); while (length >= 0) { - cluster.sortByDistanceCost(reader, nodeList, length); - for (int i = 0; i < nodeList.length; i++) { - if ((i + 1) < nodeList.length) { - int cost1 = cluster.getDistanceCost(reader, nodeList[i]); - int cost2 = cluster.getDistanceCost(reader, nodeList[i + 1]); + List sortedNodeList = + cluster.sortByDistanceCost(reader, nodeList, length); + for (int i = 0; i < sortedNodeList.size(); i++) { + if ((i + 1) < sortedNodeList.size()) { + int cost1 = cluster.getDistanceCost(reader, sortedNodeList.get(i)); + int cost2 = cluster.getDistanceCost( + reader, sortedNodeList.get(i + 1)); // node can be removed when called in testConcurrentAccess assertTrue("reader:" + (reader != null ? reader.getNetworkFullPath() : "null") + - ",node1:" + nodeList[i].getNetworkFullPath() + - ",node2:" + nodeList[i + 1].getNetworkFullPath() + + ",node1:" + sortedNodeList.get(i).getNetworkFullPath() + + ",node2:" + sortedNodeList.get(i + 1).getNetworkFullPath() + ",cost1:" + cost1 + ",cost2:" + cost2, cost1 == Integer.MAX_VALUE || cost1 <= cost2); } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java index 93eba6199d5..6beb0a35b80 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java @@ -566,6 +566,8 @@ public DatanodeDetails getNode(String address) { node = clusterMap.getNode(location + NetConstants.PATH_SEPARATOR_STR + address); } + LOG.debug("Get node for {} return {}", address, (node == null ? + "not found" : node.getNetworkFullPath())); return node == null ? null : (DatanodeDetails)node; } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java index a14d0031f02..2eb9d470f62 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java @@ -25,6 +25,7 @@ import com.google.protobuf.BlockingService; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos; import org.apache.hadoop.hdds.scm.HddsServerUtil; @@ -33,6 +34,8 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.DeleteBlockResult; import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; import org.apache.hadoop.hdds.scm.exceptions.SCMException; +import org.apache.hadoop.hdds.scm.net.Node; +import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol; import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolPB; import org.apache.hadoop.io.IOUtils; @@ -279,6 +282,35 @@ public ScmInfo getScmInfo() throws IOException { } } + @Override + public List sortDatanodes(List nodes, + String clientMachine) throws IOException { + boolean auditSuccess = true; + try{ + NodeManager nodeManager = scm.getScmNodeManager(); + Node client = nodeManager.getNode(clientMachine); + List nodeList = new ArrayList(); + nodes.stream().forEach(path -> nodeList.add(nodeManager.getNode(path))); + List sortedNodeList = scm.getClusterMap() + .sortByDistanceCost(client, nodeList, nodes.size()); + List ret = new ArrayList<>(); + sortedNodeList.stream().forEach(node -> ret.add((DatanodeDetails)node)); + return ret; + } catch (Exception ex) { + auditSuccess = false; + AUDIT.logReadFailure( + buildAuditMessageForFailure(SCMAction.SORT_DATANODE, null, ex) + ); + throw ex; + } finally { + if(auditSuccess) { + AUDIT.logReadSuccess( + buildAuditMessageForSuccess(SCMAction.SORT_DATANODE, null) + ); + } + } + } + @Override public AuditMessage buildAuditMessageForSuccess( AuditAction op, Map auditMap) { diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMConfigurator.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMConfigurator.java index b45ce6f8640..9bbabd11ee0 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMConfigurator.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMConfigurator.java @@ -21,6 +21,7 @@ import org.apache.hadoop.hdds.scm.block.BlockManager; +import org.apache.hadoop.hdds.scm.net.NetworkTopology; import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager; import org.apache.hadoop.hdds.scm.container.ContainerManager; import org.apache.hadoop.hdds.scm.container.ReplicationManager; @@ -65,6 +66,7 @@ public final class SCMConfigurator { private SCMSafeModeManager scmSafeModeManager; private CertificateServer certificateServer; private SCMMetadataStore metadataStore; + private NetworkTopology networkTopology; /** * Allows user to specify a version of Node manager to use with this SCM. @@ -137,6 +139,15 @@ public void setMetadataStore(SCMMetadataStore scmMetadataStore) { this.metadataStore = scmMetadataStore; } + /** + * Allows user to specify a custom version of Network Topology Cluster + * to be used with this SCM. + * @param networkTopology - network topology cluster. + */ + public void setNetworkTopology(NetworkTopology networkTopology) { + this.networkTopology = networkTopology; + } + /** * Gets SCM Node Manager. * @return Node Manager. @@ -200,4 +211,12 @@ public CertificateServer getCertificateServer() { public SCMMetadataStore getMetadataStore() { return metadataStore; } + + /** + * Get network topology cluster tree. + * @return NetworkTopology. + */ + public NetworkTopology getNetworkTopology() { + return networkTopology; + } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java index 4d02ee480a7..6296df85c0d 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java @@ -374,7 +374,11 @@ public StorageContainerManager(OzoneConfiguration conf, private void initializeSystemManagers(OzoneConfiguration conf, SCMConfigurator configurator) throws IOException { - clusterMap = new NetworkTopologyImpl(conf); + if (configurator.getNetworkTopology() != null) { + clusterMap = configurator.getNetworkTopology(); + } else { + clusterMap = new NetworkTopologyImpl(conf); + } if(configurator.getScmNodeManager() != null) { scmNodeManager = configurator.getScmNodeManager(); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java index 0b7437e0288..e2ce7de0470 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java @@ -124,8 +124,8 @@ public static DatanodeDetails createDatanodeDetails(UUID uuid) { + "." + random.nextInt(256) + "." + random.nextInt(256) + "." + random.nextInt(256); - return createDatanodeDetails(uuid.toString(), "localhost", ipAddress, - null); + return createDatanodeDetails(uuid.toString(), "localhost" + "-" + ipAddress, + ipAddress, null); } /** diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java index 19fb3a76c6e..64eba29ed96 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java @@ -19,6 +19,9 @@ import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.PipelineReportsProto; import org.apache.hadoop.hdds.scm.TestUtils; +import org.apache.hadoop.hdds.scm.net.NetConstants; +import org.apache.hadoop.hdds.scm.net.NetworkTopology; +import org.apache.hadoop.hdds.scm.net.Node; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric; @@ -83,6 +86,7 @@ public class MockNodeManager implements NodeManager { private final Map> commandMap; private final Node2PipelineMap node2PipelineMap; private final Node2ContainerMap node2ContainerMap; + private NetworkTopology clusterMap; public MockNodeManager(boolean initializeFakeNodes, int nodeCount) { this.healthyNodes = new LinkedList<>(); @@ -366,6 +370,9 @@ public RegisteredCommand register(DatanodeDetails datanodeDetails, try { node2ContainerMap.insertNewDatanode(datanodeDetails.getUuid(), Collections.emptySet()); + if (clusterMap != null) { + clusterMap.add(datanodeDetails); + } } catch (SCMException e) { e.printStackTrace(); } @@ -453,7 +460,12 @@ public List getCommandQueue(UUID dnID) { @Override public DatanodeDetails getNode(String address) { - return null; + Node node = clusterMap.getNode(NetConstants.DEFAULT_RACK + "/" + address); + return node == null ? null : (DatanodeDetails)node; + } + + public void setNetworkTopology(NetworkTopology topology) { + this.clusterMap = topology; } /** diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java index ae810716dab..4657fa03fe5 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java @@ -51,6 +51,7 @@ import java.io.File; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.UUID; import java.util.concurrent.ExecutionException; @@ -1147,6 +1148,15 @@ private void testScmRegisterNodeWithNetworkTopology(boolean useHostname) List nodeList = nodeManager.getAllNodes(); nodeList.stream().forEach(node -> Assert.assertTrue(node.getNetworkLocation().equals("/rack1"))); + + // test get node + if (useHostname) { + Arrays.stream(hostNames).forEach(hostname -> + Assert.assertNotNull(nodeManager.getNode(hostname))); + } else { + Arrays.stream(ipAddress).forEach(ip -> + Assert.assertNotNull(nodeManager.getNode(ip))); + } } } } diff --git a/hadoop-ozone/common/src/main/bin/ozone b/hadoop-ozone/common/src/main/bin/ozone index f6fe147e507..838651c21b1 100755 --- a/hadoop-ozone/common/src/main/bin/ozone +++ b/hadoop-ozone/common/src/main/bin/ozone @@ -136,6 +136,7 @@ function ozonecmd_case ;; sh | shell) HADOOP_CLASSNAME=org.apache.hadoop.ozone.web.ozShell.OzoneShell + HADOOP_OPTS="${HADOOP_OPTS} ${HDFS_OM_SH_OPTS}" OZONE_RUN_ARTIFACT_NAME="hadoop-ozone-ozone-manager" ;; s3) @@ -171,6 +172,7 @@ function ozonecmd_case ;; scmcli) HADOOP_CLASSNAME=org.apache.hadoop.hdds.scm.cli.SCMCLI + HADOOP_OPTS="${HADOOP_OPTS} ${HDFS_SCM_CLI_OPTS}" OZONE_RUN_ARTIFACT_NAME="hadoop-hdds-tools" ;; version) diff --git a/hadoop-ozone/dist/src/main/compose/ozone-net-topology/.env b/hadoop-ozone/dist/src/main/compose/ozone-topology/.env similarity index 100% rename from hadoop-ozone/dist/src/main/compose/ozone-net-topology/.env rename to hadoop-ozone/dist/src/main/compose/ozone-topology/.env diff --git a/hadoop-ozone/dist/src/main/compose/ozone-net-topology/docker-compose.yaml b/hadoop-ozone/dist/src/main/compose/ozone-topology/docker-compose.yaml similarity index 90% rename from hadoop-ozone/dist/src/main/compose/ozone-net-topology/docker-compose.yaml rename to hadoop-ozone/dist/src/main/compose/ozone-topology/docker-compose.yaml index 4f7d5b2f4b9..b14f398aaac 100644 --- a/hadoop-ozone/dist/src/main/compose/ozone-net-topology/docker-compose.yaml +++ b/hadoop-ozone/dist/src/main/compose/ozone-topology/docker-compose.yaml @@ -19,6 +19,7 @@ services: datanode_1: image: apache/ozone-runner:${HADOOP_RUNNER_VERSION} privileged: true #required by the profiler + container_name: datanode_1 volumes: - ../..:/opt/hadoop ports: @@ -28,11 +29,12 @@ services: env_file: - ./docker-config networks: - service_network: + net: ipv4_address: 10.5.0.4 datanode_2: image: apache/ozone-runner:${HADOOP_RUNNER_VERSION} privileged: true #required by the profiler + container_name: datanode_2 volumes: - ../..:/opt/hadoop ports: @@ -42,11 +44,12 @@ services: env_file: - ./docker-config networks: - service_network: + net: ipv4_address: 10.5.0.5 datanode_3: image: apache/ozone-runner:${HADOOP_RUNNER_VERSION} privileged: true #required by the profiler + container_name: datanode_3 volumes: - ../..:/opt/hadoop ports: @@ -56,11 +59,12 @@ services: env_file: - ./docker-config networks: - service_network: + net: ipv4_address: 10.5.0.6 datanode_4: image: apache/ozone-runner:${HADOOP_RUNNER_VERSION} privileged: true #required by the profiler + container_name: datanode_4 volumes: - ../..:/opt/hadoop ports: @@ -70,11 +74,12 @@ services: env_file: - ./docker-config networks: - service_network: + net: ipv4_address: 10.5.0.7 om: image: apache/ozone-runner:${HADOOP_RUNNER_VERSION} privileged: true #required by the profiler + container_name: om volumes: - ../..:/opt/hadoop ports: @@ -85,11 +90,12 @@ services: - ./docker-config command: ["/opt/hadoop/bin/ozone","om"] networks: - service_network: + net: ipv4_address: 10.5.0.70 scm: - image: apache/ozone-runner::${HADOOP_RUNNER_VERSION} + image: apache/ozone-runner:${HADOOP_RUNNER_VERSION} privileged: true #required by the profiler + container_name: scm volumes: - ../..:/opt/hadoop ports: @@ -100,10 +106,10 @@ services: ENSURE_SCM_INITIALIZED: /data/metadata/scm/current/VERSION command: ["/opt/hadoop/bin/ozone","scm"] networks: - service_network: + net: ipv4_address: 10.5.0.71 networks: - service_network: + net: driver: bridge ipam: config: diff --git a/hadoop-ozone/dist/src/main/compose/ozone-net-topology/docker-config b/hadoop-ozone/dist/src/main/compose/ozone-topology/docker-config similarity index 96% rename from hadoop-ozone/dist/src/main/compose/ozone-net-topology/docker-config rename to hadoop-ozone/dist/src/main/compose/ozone-topology/docker-config index ea98240ef90..f5f8f3fdbbc 100644 --- a/hadoop-ozone/dist/src/main/compose/ozone-net-topology/docker-config +++ b/hadoop-ozone/dist/src/main/compose/ozone-topology/docker-config @@ -29,7 +29,8 @@ OZONE-SITE.XML_hdds.profiler.endpoint.enabled=true HDFS-SITE.XML_rpc.metrics.quantile.enable=true HDFS-SITE.XML_rpc.metrics.percentiles.intervals=60,300 HDFS-SITE.XML_net.topology.node.switch.mapping.impl=org.apache.hadoop.net.TableMapping -HDFS-SITE.XML_net.topology.table.file.name=/opt/hadoop/compose/ozone-net-topology/network-config +HDFS-SITE.XML_net.topology.table.file.name=/opt/hadoop/compose/ozone-topology/network-config +HDFS-SITE.XML_dfs.network.topology.aware.read.enable=true ASYNC_PROFILER_HOME=/opt/profiler LOG4J.PROPERTIES_log4j.rootLogger=DEBUG, ARF LOG4J.PROPERTIES_log4j.appender.stdout=org.apache.log4j.ConsoleAppender @@ -46,6 +47,8 @@ LOG4J.PROPERTIES_log4j.appender.ARF.file=/opt/hadoop/logs/${module.name}-${user. HDDS_DN_OPTS=-Dmodule.name=datanode HDFS_OM_OPTS=-Dmodule.name=om HDFS_STORAGECONTAINERMANAGER_OPTS=-Dmodule.name=scm +HDFS_OM_SH_OPTS=-Dmodule.name=sh +HDFS_SCM_CLI_OPTS=-Dmodule.name=scmcli #Enable this variable to print out all hadoop rpc traffic to the stdout. See http://byteman.jboss.org/ to define your own instrumentation. #BYTEMAN_SCRIPT_URL=https://raw.githubusercontent.com/apache/hadoop/trunk/dev-support/byteman/hadooprpc.btm diff --git a/hadoop-ozone/dist/src/main/compose/ozone-net-topology/network-config b/hadoop-ozone/dist/src/main/compose/ozone-topology/network-config similarity index 100% rename from hadoop-ozone/dist/src/main/compose/ozone-net-topology/network-config rename to hadoop-ozone/dist/src/main/compose/ozone-topology/network-config diff --git a/hadoop-ozone/dist/src/main/compose/ozone-net-topology/test.sh b/hadoop-ozone/dist/src/main/compose/ozone-topology/test.sh similarity index 100% rename from hadoop-ozone/dist/src/main/compose/ozone-net-topology/test.sh rename to hadoop-ozone/dist/src/main/compose/ozone-topology/test.sh diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java index 868e04a83b9..2b510fce539 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hdds.protocol.StorageType; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; @@ -100,9 +101,11 @@ import org.apache.commons.lang3.RandomUtils; import static org.apache.hadoop.hdds.client.ReplicationFactor.ONE; +import static org.apache.hadoop.hdds.client.ReplicationFactor.THREE; import static org.apache.hadoop.hdds.client.ReplicationType.STAND_ALONE; import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.either; + import org.junit.Assert; import static org.junit.Assert.assertEquals; @@ -114,6 +117,8 @@ import static org.junit.Assert.fail; import org.junit.Ignore; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * This is an abstract class to test all the public facing APIs of Ozone @@ -124,6 +129,8 @@ */ public abstract class TestOzoneRpcClientAbstract { + static final Logger LOG = + LoggerFactory.getLogger(TestOzoneRpcClientAbstract.class); private static MiniOzoneCluster cluster = null; private static OzoneClient ozClient = null; private static ObjectStore store = null; @@ -140,7 +147,7 @@ public abstract class TestOzoneRpcClientAbstract { */ static void startCluster(OzoneConfiguration conf) throws Exception { cluster = MiniOzoneCluster.newBuilder(conf) - .setNumDatanodes(10) + .setNumDatanodes(3) .setScmId(scmId) .build(); cluster.waitForClusterToBeReady(); @@ -664,6 +671,80 @@ public void testValidateBlockLengthWithCommitKey() throws IOException { Assert.assertEquals(value.getBytes().length, keyInfo.getDataSize()); } + /** + * Tests get the information of key with network topology awareness enabled. + * @throws IOException + */ + @Test + public void testGetKeyAndFileWithNetworkTopology() throws IOException { + String volumeName = UUID.randomUUID().toString(); + String bucketName = UUID.randomUUID().toString(); + + String value = "sample value"; + store.createVolume(volumeName); + OzoneVolume volume = store.getVolume(volumeName); + volume.createBucket(bucketName); + OzoneBucket bucket = volume.getBucket(bucketName); + String keyName = UUID.randomUUID().toString(); + + // Write data into a key + OzoneOutputStream out = bucket.createKey(keyName, + value.getBytes().length, ReplicationType.RATIS, + THREE, new HashMap<>()); + out.write(value.getBytes()); + out.close(); + + // Since the rpc client is outside of cluster, then getFirstNode should be + // equal to getClosestNode. + OmKeyArgs.Builder builder = new OmKeyArgs.Builder(); + builder.setVolumeName(volumeName).setBucketName(bucketName) + .setKeyName(keyName).setRefreshPipeline(true); + + // read key with topology aware read enabled(default) + try { + OzoneInputStream is = bucket.readKey(keyName); + byte[] b = new byte[value.getBytes().length]; + is.read(b); + Assert.assertTrue(Arrays.equals(b, value.getBytes())); + } catch (OzoneChecksumException e) { + fail("Reading key should success"); + } + // read file with topology aware read enabled(default) + try { + OzoneInputStream is = bucket.readFile(keyName); + byte[] b = new byte[value.getBytes().length]; + is.read(b); + Assert.assertTrue(Arrays.equals(b, value.getBytes())); + } catch (OzoneChecksumException e) { + fail("Reading file should success"); + } + + // read key with topology aware read disabled + Configuration conf = cluster.getConf(); + conf.set(ScmConfigKeys.DFS_NETWORK_TOPOLOGY_AWARE_READ_ENABLED, "false"); + OzoneClient newClient = OzoneClientFactory.getRpcClient(conf); + ObjectStore newStore = newClient.getObjectStore(); + OzoneBucket newBucket = + newStore.getVolume(volumeName).getBucket(bucketName); + try { + OzoneInputStream is = newBucket.readKey(keyName); + byte[] b = new byte[value.getBytes().length]; + is.read(b); + Assert.assertTrue(Arrays.equals(b, value.getBytes())); + } catch (OzoneChecksumException e) { + fail("Reading key should success"); + } + // read file with topology aware read disabled + + try { + OzoneInputStream is = newBucket.readFile(keyName); + byte[] b = new byte[value.getBytes().length]; + is.read(b); + Assert.assertTrue(Arrays.equals(b, value.getBytes())); + } catch (OzoneChecksumException e) { + fail("Reading file should success"); + } + } @Test public void testPutKeyRatisOneNode() diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientWithRatis.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientWithRatis.java index 7b0b8c4c6d8..dc6e40744a1 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientWithRatis.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientWithRatis.java @@ -19,18 +19,37 @@ package org.apache.hadoop.ozone.client.rpc; import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.UUID; + +import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.ozone.client.ObjectStore; +import org.apache.hadoop.ozone.client.OzoneBucket; +import org.apache.hadoop.ozone.client.OzoneClient; +import org.apache.hadoop.ozone.client.OzoneClientFactory; +import org.apache.hadoop.ozone.client.OzoneVolume; +import org.apache.hadoop.ozone.client.io.OzoneInputStream; +import org.apache.hadoop.ozone.client.io.OzoneOutputStream; +import org.apache.hadoop.ozone.common.OzoneChecksumException; import org.apache.hadoop.ozone.om.OMConfigKeys; +import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.BeforeClass; +import org.junit.Test; + +import static org.apache.hadoop.hdds.client.ReplicationFactor.THREE; +import static org.junit.Assert.fail; /** * This class is to test all the public facing APIs of Ozone Client with an * active OM Ratis server. */ public class TestOzoneRpcClientWithRatis extends TestOzoneRpcClientAbstract { - + private static OzoneConfiguration conf; /** * Create a MiniOzoneCluster for testing. * Ozone is made active by setting OZONE_ENABLED = true. @@ -41,7 +60,7 @@ public class TestOzoneRpcClientWithRatis extends TestOzoneRpcClientAbstract { */ @BeforeClass public static void init() throws Exception { - OzoneConfiguration conf = new OzoneConfiguration(); + conf = new OzoneConfiguration(); conf.setInt(ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT, 1); conf.setBoolean(OMConfigKeys.OZONE_OM_RATIS_ENABLE_KEY, true); startCluster(conf); @@ -55,4 +74,77 @@ public static void shutdown() throws IOException { shutdownCluster(); } + /** + * Tests get the information of key with network topology awareness enabled. + * @throws IOException + */ + @Test + public void testGetKeyAndFileWithNetworkTopology() throws IOException { + String volumeName = UUID.randomUUID().toString(); + String bucketName = UUID.randomUUID().toString(); + + String value = "sample value"; + getStore().createVolume(volumeName); + OzoneVolume volume = getStore().getVolume(volumeName); + volume.createBucket(bucketName); + OzoneBucket bucket = volume.getBucket(bucketName); + String keyName = UUID.randomUUID().toString(); + + // Write data into a key + OzoneOutputStream out = bucket.createKey(keyName, + value.getBytes().length, ReplicationType.RATIS, + THREE, new HashMap<>()); + out.write(value.getBytes()); + out.close(); + + // Since the rpc client is outside of cluster, then getFirstNode should be + // equal to getClosestNode. + OmKeyArgs.Builder builder = new OmKeyArgs.Builder(); + builder.setVolumeName(volumeName).setBucketName(bucketName) + .setKeyName(keyName).setRefreshPipeline(true); + + // read key with topology aware read enabled(default) + try { + OzoneInputStream is = bucket.readKey(keyName); + byte[] b = new byte[value.getBytes().length]; + is.read(b); + Assert.assertTrue(Arrays.equals(b, value.getBytes())); + } catch (OzoneChecksumException e) { + fail("Reading key should success"); + } + // read file with topology aware read enabled(default) + try { + OzoneInputStream is = bucket.readFile(keyName); + byte[] b = new byte[value.getBytes().length]; + is.read(b); + Assert.assertTrue(Arrays.equals(b, value.getBytes())); + } catch (OzoneChecksumException e) { + fail("Reading file should success"); + } + + // read key with topology aware read disabled + conf.set(ScmConfigKeys.DFS_NETWORK_TOPOLOGY_AWARE_READ_ENABLED, "false"); + OzoneClient newClient = OzoneClientFactory.getRpcClient(conf); + ObjectStore newStore = newClient.getObjectStore(); + OzoneBucket newBucket = + newStore.getVolume(volumeName).getBucket(bucketName); + try { + OzoneInputStream is = newBucket.readKey(keyName); + byte[] b = new byte[value.getBytes().length]; + is.read(b); + Assert.assertTrue(Arrays.equals(b, value.getBytes())); + } catch (OzoneChecksumException e) { + fail("Reading key should success"); + } + // read file with topology aware read disabled + + try { + OzoneInputStream is = newBucket.readFile(keyName); + byte[] b = new byte[value.getBytes().length]; + is.read(b); + Assert.assertTrue(Arrays.equals(b, value.getBytes())); + } catch (OzoneChecksumException e) { + fail("Reading file should success"); + } + } } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java index e9e6b2504a8..0aa301ae8fd 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java @@ -36,7 +36,9 @@ import org.apache.commons.lang.RandomStringUtils; import org.apache.hadoop.conf.StorageUnit; import org.apache.hadoop.hdds.HddsConfigKeys; +import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; import org.apache.hadoop.hdds.scm.TestUtils; @@ -44,7 +46,12 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes; +import org.apache.hadoop.hdds.scm.net.NetworkTopology; +import org.apache.hadoop.hdds.scm.net.NetworkTopologyImpl; +import org.apache.hadoop.hdds.scm.net.NodeSchema; +import org.apache.hadoop.hdds.scm.net.NodeSchemaManager; import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol; import org.apache.hadoop.hdds.scm.server.SCMConfigurator; import org.apache.hadoop.hdds.scm.server.StorageContainerManager; @@ -53,6 +60,7 @@ import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; +import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; import org.apache.hadoop.ozone.om.helpers.OmPrefixInfo; import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs; import org.apache.hadoop.ozone.om.helpers.OpenKeySession; @@ -71,6 +79,7 @@ import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; +import org.junit.Assume; import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; @@ -80,6 +89,11 @@ import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_KEY_PREALLOCATION_BLOCKS_MAX; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAULT; + +import static org.apache.hadoop.hdds.scm.net.NetConstants.LEAF_SCHEMA; +import static org.apache.hadoop.hdds.scm.net.NetConstants.RACK_SCHEMA; +import static org.apache.hadoop.hdds.scm.net.NetConstants.ROOT_SCHEMA; + import static org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType.ALL; /** @@ -91,6 +105,7 @@ public class TestKeyManagerImpl { private static KeyManagerImpl keyManager; private static VolumeManagerImpl volumeManager; private static BucketManagerImpl bucketManager; + private static NodeManager nodeManager; private static StorageContainerManager scm; private static ScmBlockLocationProtocol mockScmBlockLocationProtocol; private static OzoneConfiguration conf; @@ -113,9 +128,17 @@ public static void setUp() throws Exception { metadataManager = new OmMetadataManagerImpl(conf); volumeManager = new VolumeManagerImpl(metadataManager, conf); bucketManager = new BucketManagerImpl(metadataManager); - NodeManager nodeManager = new MockNodeManager(true, 10); + nodeManager = new MockNodeManager(true, 10); + NodeSchema[] schemas = new NodeSchema[] + {ROOT_SCHEMA, RACK_SCHEMA, LEAF_SCHEMA}; + NodeSchemaManager schemaManager = NodeSchemaManager.getInstance(); + schemaManager.init(schemas, false); + NetworkTopology clusterMap = new NetworkTopologyImpl(schemaManager); + nodeManager.getAllNodes().stream().forEach(node -> clusterMap.add(node)); + ((MockNodeManager)nodeManager).setNetworkTopology(clusterMap); SCMConfigurator configurator = new SCMConfigurator(); configurator.setScmNodeManager(nodeManager); + configurator.setNetworkTopology(clusterMap); scm = TestUtils.getScm(conf, configurator); scm.start(); scm.exitSafeMode(); @@ -563,7 +586,7 @@ public void testLookupFile() throws IOException { // lookup for a non-existent file try { - keyManager.lookupFile(keyArgs); + keyManager.lookupFile(keyArgs, null); Assert.fail("Lookup file should fail for non existent file"); } catch (OMException ex) { if (ex.getResult() != OMException.ResultCodes.FILE_NOT_FOUND) { @@ -576,14 +599,15 @@ public void testLookupFile() throws IOException { keyArgs.setLocationInfoList( keySession.getKeyInfo().getLatestVersionLocations().getLocationList()); keyManager.commitKey(keyArgs, keySession.getId()); - Assert.assertEquals(keyManager.lookupFile(keyArgs).getKeyName(), keyName); + Assert.assertEquals(keyManager.lookupFile(keyArgs, null).getKeyName(), + keyName); // lookup for created file keyArgs = createBuilder() .setKeyName("") .build(); try { - keyManager.lookupFile(keyArgs); + keyManager.lookupFile(keyArgs, null); Assert.fail("Lookup file should fail for a directory"); } catch (OMException ex) { if (ex.getResult() != OMException.ResultCodes.NOT_A_FILE) { @@ -596,6 +620,81 @@ private OmKeyArgs createKeyArgs(String toKeyName) throws IOException { return createBuilder().setKeyName(toKeyName).build(); } + @Test + public void testLookupKeyWithLocation() throws IOException { + String keyName = RandomStringUtils.randomAlphabetic(5); + OmKeyArgs keyArgs = createBuilder() + .setKeyName(keyName) + .build(); + + // lookup for a non-existent key + try { + keyManager.lookupKey(keyArgs, null); + Assert.fail("Lookup key should fail for non existent key"); + } catch (OMException ex) { + if (ex.getResult() != OMException.ResultCodes.KEY_NOT_FOUND) { + throw ex; + } + } + + // create a key + OpenKeySession keySession = keyManager.createFile(keyArgs, false, false); + // randomly select 3 datanodes + List nodeList = new ArrayList<>(); + nodeList.add((DatanodeDetails)scm.getClusterMap().getNode( + 0, null, null, null, null, 0)); + nodeList.add((DatanodeDetails)scm.getClusterMap().getNode( + 1, null, null, null, null, 0)); + nodeList.add((DatanodeDetails)scm.getClusterMap().getNode( + 2, null, null, null, null, 0)); + Assume.assumeFalse(nodeList.get(0).equals(nodeList.get(1))); + Assume.assumeFalse(nodeList.get(0).equals(nodeList.get(2))); + // create a pipeline using 3 datanodes + Pipeline pipeline = scm.getPipelineManager().createPipeline( + ReplicationType.RATIS, ReplicationFactor.THREE, nodeList); + List locationInfoList = new ArrayList<>(); + locationInfoList.add( + new OmKeyLocationInfo.Builder().setPipeline(pipeline) + .setBlockID(new BlockID(1L, 1L)).build()); + keyArgs.setLocationInfoList(locationInfoList); + + keyManager.commitKey(keyArgs, keySession.getId()); + + OmKeyInfo key = keyManager.lookupKey(keyArgs, null); + Assert.assertEquals(key.getKeyName(), keyName); + List keyLocations = + key.getLatestVersionLocations().getLocationList(); + DatanodeDetails leader = + keyLocations.get(0).getPipeline().getFirstNode(); + DatanodeDetails follower1 = + keyLocations.get(0).getPipeline().getNodes().get(1); + DatanodeDetails follower2 = + keyLocations.get(0).getPipeline().getNodes().get(2); + Assert.assertNotEquals(leader, follower1); + Assert.assertNotEquals(follower1, follower2); + + // lookup key, leader as client + OmKeyInfo key1 = keyManager.lookupKey(keyArgs, leader.getNetworkName()); + Assert.assertEquals(leader, key1.getLatestVersionLocations() + .getLocationList().get(0).getPipeline().getClosestNode()); + + // lookup key, follower1 as client + OmKeyInfo key2 = keyManager.lookupKey(keyArgs, follower1.getNetworkName()); + Assert.assertEquals(follower1, key2.getLatestVersionLocations() + .getLocationList().get(0).getPipeline().getClosestNode()); + + // lookup key, follower2 as client + OmKeyInfo key3 = keyManager.lookupKey(keyArgs, follower2.getNetworkName()); + Assert.assertEquals(follower2, key3.getLatestVersionLocations() + .getLocationList().get(0).getPipeline().getClosestNode()); + + // lookup key, random node as client + OmKeyInfo key4 = keyManager.lookupKey(keyArgs, + "/d=default-drack/127.0.0.1"); + Assert.assertEquals(leader, key4.getLatestVersionLocations() + .getLocationList().get(0).getPipeline().getClosestNode()); + } + @Test public void testListStatus() throws IOException { String superDir = RandomStringUtils.randomAlphabetic(5); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmMetrics.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmMetrics.java index 5d739c2988a..a35c4742a71 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmMetrics.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmMetrics.java @@ -258,7 +258,7 @@ public void testKeyOps() throws IOException { Mockito.doReturn(null).when(mockKm).openKey(null); Mockito.doNothing().when(mockKm).deleteKey(null); - Mockito.doReturn(null).when(mockKm).lookupKey(null); + Mockito.doReturn(null).when(mockKm).lookupKey(null, ""); Mockito.doReturn(null).when(mockKm).listKeys(null, null, null, null, 0); Mockito.doNothing().when(mockKm).commitKey(any(OmKeyArgs.class), anyLong()); Mockito.doReturn(null).when(mockKm).initiateMultipartUpload( @@ -293,7 +293,7 @@ public void testKeyOps() throws IOException { // inject exception to test for Failure Metrics Mockito.doThrow(exception).when(mockKm).openKey(null); Mockito.doThrow(exception).when(mockKm).deleteKey(null); - Mockito.doThrow(exception).when(mockKm).lookupKey(null); + Mockito.doThrow(exception).when(mockKm).lookupKey(null, ""); Mockito.doThrow(exception).when(mockKm).listKeys( null, null, null, null, 0); Mockito.doThrow(exception).when(mockKm).commitKey(any(OmKeyArgs.class), diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java index 1259f715d66..1f8eb731c8f 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java @@ -127,10 +127,12 @@ void applyOpenKey(KeyArgs omKeyArgs, KeyInfo keyInfo, long clientID) * DistributedStorageHandler will use to access the data on datanode. * * @param args the args of the key provided by client. + * @param clientAddress a hint to key manager, order the datanode in returned + * pipeline by distance between client and datanode. * @return a OmKeyInfo instance client uses to talk to container. * @throws IOException */ - OmKeyInfo lookupKey(OmKeyArgs args) throws IOException; + OmKeyInfo lookupKey(OmKeyArgs args, String clientAddress) throws IOException; /** * Renames an existing key within a bucket. diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java index 99e3e660f64..222b1490a9e 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java @@ -43,6 +43,7 @@ import org.apache.hadoop.fs.FileEncryptionInfo; import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; @@ -663,7 +664,8 @@ public void commitKey(OmKeyArgs args, long clientID) throws IOException { } @Override - public OmKeyInfo lookupKey(OmKeyArgs args) throws IOException { + public OmKeyInfo lookupKey(OmKeyArgs args, String clientAddress) + throws IOException { Preconditions.checkNotNull(args); String volumeName = args.getVolumeName(); String bucketName = args.getBucketName(); @@ -718,6 +720,7 @@ public OmKeyInfo lookupKey(OmKeyArgs args) throws IOException { }); } } + sortDatanodeInPipeline(value, clientAddress); return value; } catch (IOException ex) { LOG.debug("Get key failed for volume:{} bucket:{} key:{}", @@ -1855,7 +1858,8 @@ public OpenKeySession createFile(OmKeyArgs args, boolean isOverWrite, * invalid arguments */ @Override - public OmKeyInfo lookupFile(OmKeyArgs args) throws IOException { + public OmKeyInfo lookupFile(OmKeyArgs args, String clientAddress) + throws IOException { Preconditions.checkNotNull(args, "Key args can not be null"); String volumeName = args.getVolumeName(); String bucketName = args.getBucketName(); @@ -1865,6 +1869,7 @@ public OmKeyInfo lookupFile(OmKeyArgs args) throws IOException { try { OzoneFileStatus fileStatus = getFileStatus(args); if (fileStatus.isFile()) { + sortDatanodeInPipeline(fileStatus.getKeyInfo(), clientAddress); return fileStatus.getKeyInfo(); } //if key is not of type file or if key is not found we throw an exception @@ -2052,4 +2057,31 @@ private FileEncryptionInfo getFileEncryptionInfo(OmBucketInfo bucketInfo) return encInfo; } + private void sortDatanodeInPipeline(OmKeyInfo keyInfo, String clientMachine) { + if (keyInfo != null && clientMachine != null && !clientMachine.isEmpty()) { + for (OmKeyLocationInfoGroup key : keyInfo.getKeyLocationVersions()) { + key.getLocationList().forEach(k -> { + List nodes = k.getPipeline().getNodes(); + List nodeList = new ArrayList<>(); + nodes.stream().forEach(node -> + nodeList.add(node.getNetworkName())); + try { + List sortedNodes = scmClient.getBlockClient() + .sortDatanodes(nodeList, clientMachine); + k.getPipeline().setNodesInOrder(sortedNodes); + LOG.debug("Sort datanodes {} for client {}, return {}", nodes, + clientMachine, sortedNodes); + } catch (IOException e) { + LOG.warn("Unable to sort datanodes based on distance to " + + "client, volume=" + keyInfo.getVolumeName() + + ", bucket=" + keyInfo.getBucketName() + + ", key=" + keyInfo.getKeyName() + + ", client=" + clientMachine + + ", datanodes=" + nodes.toString() + + ", exception=" + e.getMessage()); + } + }); + } + } + } } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java index 8032b6d06c1..7b2a83fe54f 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java @@ -2347,7 +2347,7 @@ public OmKeyInfo lookupKey(OmKeyArgs args) throws IOException { boolean auditSuccess = true; try { metrics.incNumKeyLookups(); - return keyManager.lookupKey(args); + return keyManager.lookupKey(args, getClientAddress()); } catch (Exception ex) { metrics.incNumKeyLookupFails(); auditSuccess = false; @@ -2550,6 +2550,14 @@ private void unregisterMXBean() { } } + private static String getClientAddress() { + String clientMachine = Server.getRemoteAddress(); + if (clientMachine == null) { //not a RPC client + clientMachine = ""; + } + return clientMachine; + } + @Override public String getRpcPort() { return "" + omRpcAddress.getPort(); @@ -2975,7 +2983,7 @@ public OmKeyInfo lookupFile(OmKeyArgs args) throws IOException { boolean auditSuccess = true; try { metrics.incNumLookupFile(); - return keyManager.lookupFile(args); + return keyManager.lookupFile(args, getClientAddress()); } catch (Exception ex) { metrics.incNumLookupFileFails(); auditSuccess = false; diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/fs/OzoneManagerFS.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/fs/OzoneManagerFS.java index bff883dc7fe..647931af0d0 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/fs/OzoneManagerFS.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/fs/OzoneManagerFS.java @@ -38,7 +38,16 @@ public interface OzoneManagerFS extends IOzoneAcl { OpenKeySession createFile(OmKeyArgs args, boolean isOverWrite, boolean isRecursive) throws IOException; - OmKeyInfo lookupFile(OmKeyArgs args) throws IOException; + /** + * Look up a file. Return the info of the file to client side. + * + * @param args the args of the key provided by client. + * @param clientAddress a hint to key manager, order the datanode in returned + * pipeline by distance between client and datanode. + * @return a OmKeyInfo instance client uses to talk to container. + * @throws IOException + */ + OmKeyInfo lookupFile(OmKeyArgs args, String clientAddress) throws IOException; List listStatus(OmKeyArgs keyArgs, boolean recursive, String startKey, long numEntries) throws IOException; diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ScmBlockLocationTestIngClient.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ScmBlockLocationTestIngClient.java index e546d79cb70..b8534de5f5b 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ScmBlockLocationTestIngClient.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ScmBlockLocationTestIngClient.java @@ -182,6 +182,12 @@ public ScmInfo getScmInfo() throws IOException { return builder.build(); } + @Override + public List sortDatanodes(List nodes, + String clientMachine) throws IOException { + return null; + } + @Override public void close() throws IOException {