diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
index 22a4787d71d..b3b35f1c617 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
@@ -124,10 +124,6 @@ public final class OzoneConfigKeys {
public static final Class extends ClientProtocol>
OZONE_CLIENT_PROTOCOL_REST = RestClient.class;
- public static final String OZONE_REST_SERVERS = "ozone.rest.servers";
- public static final String OZONE_REST_CLIENT_PORT = "ozone.rest.client.port";
- public static final int OZONE_REST_CLIENT_PORT_DEFAULT = 9864;
-
// This defines the overall connection limit for the connection pool used in
// RestClient.
public static final String OZONE_REST_CLIENT_HTTP_CONNECTION_MAX =
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientFactory.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientFactory.java
index 389035af109..193d80dcaf2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientFactory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientFactory.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.ozone.client;
+import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.client.protocol.ClientProtocol;
@@ -34,6 +35,12 @@ import static org.apache.hadoop.ozone.OzoneConfigKeys
.OZONE_CLIENT_PROTOCOL_REST;
import static org.apache.hadoop.ozone.OzoneConfigKeys
.OZONE_CLIENT_PROTOCOL_RPC;
+import static org.apache.hadoop.ozone.ksm.KSMConfigKeys
+ .OZONE_KSM_HTTP_ADDRESS_KEY;
+import static org.apache.hadoop.ozone.ksm.KSMConfigKeys
+ .OZONE_KSM_HTTP_BIND_PORT_DEFAULT;
+import static org.apache.hadoop.ozone.ksm.KSMConfigKeys.OZONE_KSM_ADDRESS_KEY;
+import static org.apache.hadoop.ozone.ksm.KSMConfigKeys.OZONE_KSM_PORT_DEFAULT;
/**
* Factory class to create different types of OzoneClients.
@@ -49,112 +56,240 @@ import static org.apache.hadoop.ozone.OzoneConfigKeys
*/
public final class OzoneClientFactory {
- private enum ClientType {
- RPC, REST
- }
+ private static final Logger LOG = LoggerFactory.getLogger(
+ OzoneClientFactory.class);
/**
* Private constructor, class is not meant to be initialized.
*/
private OzoneClientFactory(){}
- private static final Logger LOG = LoggerFactory.getLogger(
- OzoneClientFactory.class);
-
- private static Configuration configuration;
/**
- * Returns an OzoneClient which will use protocol defined through
- * ozone.client.protocol
to perform client operations.
+ * Constructs and return an OzoneClient with default configuration.
+ *
* @return OzoneClient
+ *
* @throws IOException
*/
public static OzoneClient getClient() throws IOException {
- return getClient(null);
+ LOG.info("Creating OzoneClient with default configuration.");
+ return getClient(new OzoneConfiguration());
}
/**
- * Returns an OzoneClient which will use RPC protocol to perform
- * client operations.
+ * Constructs and return an OzoneClient based on the configuration object.
+ * Protocol type is decided by ozone.client.protocol
.
+ *
+ * @param config
+ * Configuration to be used for OzoneClient creation
+ *
* @return OzoneClient
+ *
* @throws IOException
*/
- public static OzoneClient getRpcClient() throws IOException {
- return getClient(ClientType.RPC);
- }
-
- /**
- * Returns an OzoneClient which will use REST protocol to perform
- * client operations.
- * @return OzoneClient
- * @throws IOException
- */
- public static OzoneClient getRestClient() throws IOException {
- return getClient(ClientType.REST);
- }
-
- /**
- * Returns OzoneClient with protocol type set base on ClientType.
- * @param clientType
- * @return OzoneClient
- * @throws IOException
- */
- private static OzoneClient getClient(ClientType clientType)
+ public static OzoneClient getClient(Configuration config)
throws IOException {
+ Preconditions.checkNotNull(config);
+ Class extends ClientProtocol> clazz = (Class extends ClientProtocol>)
+ config.getClass(OZONE_CLIENT_PROTOCOL, OZONE_CLIENT_PROTOCOL_RPC);
+ return getClient(getClientProtocol(clazz, config), config);
+ }
+
+ /**
+ * Returns an OzoneClient which will use RPC protocol.
+ *
+ * @param ksmHost
+ * hostname of KeySpaceManager to connect.
+ *
+ * @return OzoneClient
+ *
+ * @throws IOException
+ */
+ public static OzoneClient getRpcClient(String ksmHost)
+ throws IOException {
+ return getRpcClient(ksmHost, OZONE_KSM_PORT_DEFAULT,
+ new OzoneConfiguration());
+ }
+
+ /**
+ * Returns an OzoneClient which will use RPC protocol.
+ *
+ * @param ksmHost
+ * hostname of KeySpaceManager to connect.
+ *
+ * @param ksmRpcPort
+ * RPC port of KeySpaceManager.
+ *
+ * @return OzoneClient
+ *
+ * @throws IOException
+ */
+ public static OzoneClient getRpcClient(String ksmHost, Integer ksmRpcPort)
+ throws IOException {
+ return getRpcClient(ksmHost, ksmRpcPort, new OzoneConfiguration());
+ }
+
+ /**
+ * Returns an OzoneClient which will use RPC protocol.
+ *
+ * @param ksmHost
+ * hostname of KeySpaceManager to connect.
+ *
+ * @param ksmRpcPort
+ * RPC port of KeySpaceManager.
+ *
+ * @param config
+ * Configuration to be used for OzoneClient creation
+ *
+ * @return OzoneClient
+ *
+ * @throws IOException
+ */
+ public static OzoneClient getRpcClient(String ksmHost, Integer ksmRpcPort,
+ Configuration config)
+ throws IOException {
+ Preconditions.checkNotNull(ksmHost);
+ Preconditions.checkNotNull(ksmRpcPort);
+ Preconditions.checkNotNull(config);
+ config.set(OZONE_KSM_ADDRESS_KEY, ksmHost + ":" + ksmRpcPort);
+ return getRpcClient(config);
+ }
+
+ /**
+ * Returns an OzoneClient which will use RPC protocol.
+ *
+ * @param config
+ * used for OzoneClient creation
+ *
+ * @return OzoneClient
+ *
+ * @throws IOException
+ */
+ public static OzoneClient getRpcClient(Configuration config)
+ throws IOException {
+ Preconditions.checkNotNull(config);
+ return getClient(getClientProtocol(OZONE_CLIENT_PROTOCOL_RPC, config),
+ config);
+ }
+
+ /**
+ * Returns an OzoneClient which will use REST protocol.
+ *
+ * @param ksmHost
+ * hostname of KeySpaceManager to connect.
+ *
+ * @return OzoneClient
+ *
+ * @throws IOException
+ */
+ public static OzoneClient getRestClient(String ksmHost)
+ throws IOException {
+ return getRestClient(ksmHost, OZONE_KSM_HTTP_BIND_PORT_DEFAULT);
+ }
+
+ /**
+ * Returns an OzoneClient which will use REST protocol.
+ *
+ * @param ksmHost
+ * hostname of KeySpaceManager to connect.
+ *
+ * @param ksmHttpPort
+ * HTTP port of KeySpaceManager.
+ *
+ * @return OzoneClient
+ *
+ * @throws IOException
+ */
+ public static OzoneClient getRestClient(String ksmHost, Integer ksmHttpPort)
+ throws IOException {
+ return getRestClient(ksmHost, ksmHttpPort, new OzoneConfiguration());
+ }
+
+ /**
+ * Returns an OzoneClient which will use REST protocol.
+ *
+ * @param ksmHost
+ * hostname of KeySpaceManager to connect.
+ *
+ * @param ksmHttpPort
+ * HTTP port of KeySpaceManager.
+ *
+ * @param config
+ * Configuration to be used for OzoneClient creation
+ *
+ * @return OzoneClient
+ *
+ * @throws IOException
+ */
+ public static OzoneClient getRestClient(String ksmHost, Integer ksmHttpPort,
+ Configuration config)
+ throws IOException {
+ Preconditions.checkNotNull(ksmHost);
+ Preconditions.checkNotNull(ksmHttpPort);
+ Preconditions.checkNotNull(config);
+ config.set(OZONE_KSM_HTTP_ADDRESS_KEY, ksmHost + ":" + ksmHttpPort);
+ return getRestClient(config);
+ }
+
+ /**
+ * Returns an OzoneClient which will use REST protocol.
+ *
+ * @param config
+ * Configuration to be used for OzoneClient creation
+ *
+ * @return OzoneClient
+ *
+ * @throws IOException
+ */
+ public static OzoneClient getRestClient(Configuration config)
+ throws IOException {
+ Preconditions.checkNotNull(config);
+ return getClient(getClientProtocol(OZONE_CLIENT_PROTOCOL_REST, config),
+ config);
+ }
+
+ /**
+ * Creates OzoneClient with the given ClientProtocol and Configuration.
+ *
+ * @param clientProtocol
+ * Protocol to be used by the OzoneClient
+ *
+ * @param config
+ * Configuration to be used for OzoneClient creation
+ */
+ private static OzoneClient getClient(ClientProtocol clientProtocol,
+ Configuration config) {
OzoneClientInvocationHandler clientHandler =
- new OzoneClientInvocationHandler(getProtocolClass(clientType));
+ new OzoneClientInvocationHandler(clientProtocol);
ClientProtocol proxy = (ClientProtocol) Proxy.newProxyInstance(
OzoneClientInvocationHandler.class.getClassLoader(),
new Class>[]{ClientProtocol.class}, clientHandler);
- return new OzoneClient(configuration, proxy);
+ return new OzoneClient(config, proxy);
}
/**
- * Returns the configuration if it's already set, else creates a new
- * {@link OzoneConfiguration} and returns it.
+ * Returns an instance of Protocol class.
+ *
+ * @param protocolClass
+ * Class object of the ClientProtocol.
+ *
+ * @param config
+ * Configuration used to initialize ClientProtocol.
+ *
+ * @return ClientProtocol
*
- * @return Configuration
- */
- private static synchronized Configuration getConfiguration() {
- if(configuration == null) {
- setConfiguration(new OzoneConfiguration());
- }
- return configuration;
- }
-
- /**
- * Based on the clientType, client protocol instance is created.
- * If clientType is null, ozone.client.protocol
property
- * will be used to decide the protocol to be used.
- * @param clientType type of client protocol to be created
- * @return ClientProtocol implementation
* @throws IOException
*/
- private static ClientProtocol getProtocolClass(ClientType clientType)
+ private static ClientProtocol getClientProtocol(
+ Class extends ClientProtocol> protocolClass, Configuration config)
throws IOException {
- Class extends ClientProtocol> protocolClass = null;
- if(clientType != null) {
- switch (clientType) {
- case RPC:
- protocolClass = OZONE_CLIENT_PROTOCOL_RPC;
- break;
- case REST:
- protocolClass = OZONE_CLIENT_PROTOCOL_REST;
- break;
- default:
- LOG.warn("Invalid ClientProtocol type, falling back to RPC.");
- protocolClass = OZONE_CLIENT_PROTOCOL_RPC;
- break;
- }
- } else {
- protocolClass = (Class)
- getConfiguration().getClass(
- OZONE_CLIENT_PROTOCOL, OZONE_CLIENT_PROTOCOL_RPC);
- }
try {
+ LOG.info("Using {} as client protocol.",
+ protocolClass.getCanonicalName());
Constructor extends ClientProtocol> ctor =
protocolClass.getConstructor(Configuration.class);
- return ctor.newInstance(getConfiguration());
+ return ctor.newInstance(config);
} catch (Exception e) {
final String message = "Couldn't create protocol " + protocolClass;
if (e.getCause() instanceof IOException) {
@@ -165,13 +300,4 @@ public final class OzoneClientFactory {
}
}
- /**
- * Sets the configuration, which will be used while creating OzoneClient.
- *
- * @param conf
- */
- public static void setConfiguration(Configuration conf) {
- configuration = conf;
- }
-
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rest/DefaultRestServerSelector.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rest/DefaultRestServerSelector.java
new file mode 100644
index 00000000000..93b3417b4b5
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rest/DefaultRestServerSelector.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.client.rest;
+
+import org.apache.hadoop.ozone.ksm.helpers.ServiceInfo;
+
+import java.util.List;
+import java.util.Random;
+
+/**
+ * Default selector randomly picks one of the REST Server from the list.
+ */
+public class DefaultRestServerSelector implements RestServerSelector {
+
+ @Override
+ public ServiceInfo getRestServer(List restServices) {
+ return restServices.get(
+ new Random().nextInt(restServices.size()));
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rest/RestClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rest/RestClient.java
index aabaa32de04..56fadd24f2b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rest/RestClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rest/RestClient.java
@@ -18,9 +18,12 @@
package org.apache.hadoop.ozone.client.rest;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.ozone.OzoneAcl;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConsts;
@@ -36,14 +39,16 @@ import org.apache.hadoop.ozone.client.VolumeArgs;
import org.apache.hadoop.ozone.client.io.OzoneInputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.client.protocol.ClientProtocol;
-
-import java.io.IOException;
import org.apache.hadoop.ozone.client.rest.headers.Header;
import org.apache.hadoop.ozone.client.rest.response.BucketInfo;
import org.apache.hadoop.ozone.client.rest.response.KeyInfo;
import org.apache.hadoop.ozone.client.rest.response.VolumeInfo;
import org.apache.hadoop.ozone.client.rpc.RpcClient;
import org.apache.hadoop.ozone.ksm.KSMConfigKeys;
+import org.apache.hadoop.ozone.ksm.helpers.ServiceInfo;
+import org.apache.hadoop.ozone.protocol.proto
+ .KeySpaceManagerProtocolProtos.ServicePort;
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Time;
import org.apache.http.HttpEntity;
@@ -64,19 +69,20 @@ import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
+import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.text.ParseException;
-import java.util.ArrayList;
import java.util.List;
-import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import static java.net.HttpURLConnection.HTTP_CREATED;
import static java.net.HttpURLConnection.HTTP_OK;
@@ -108,13 +114,7 @@ public class RestClient implements ClientProtocol {
try {
Preconditions.checkNotNull(conf);
this.conf = conf;
- int port = conf.getInt(OzoneConfigKeys.OZONE_REST_CLIENT_PORT,
- OzoneConfigKeys.OZONE_REST_CLIENT_PORT_DEFAULT);
- URIBuilder uriBuilder = new URIBuilder()
- .setScheme("http")
- .setHost(getOzoneRestHandlerHost())
- .setPort(port);
- this.ozoneRestUri = uriBuilder.build();
+
long socketTimeout = conf.getTimeDuration(
OzoneConfigKeys.OZONE_CLIENT_SOCKET_TIMEOUT,
OzoneConfigKeys.OZONE_CLIENT_SOCKET_TIMEOUT_DEFAULT,
@@ -129,7 +129,8 @@ public class RestClient implements ClientProtocol {
int maxConnectionPerRoute = conf.getInt(
OzoneConfigKeys.OZONE_REST_CLIENT_HTTP_CONNECTION_PER_ROUTE_MAX,
- OzoneConfigKeys.OZONE_REST_CLIENT_HTTP_CONNECTION_PER_ROUTE_MAX_DEFAULT
+ OzoneConfigKeys
+ .OZONE_REST_CLIENT_HTTP_CONNECTION_PER_ROUTE_MAX_DEFAULT
);
/*
@@ -152,26 +153,55 @@ public class RestClient implements ClientProtocol {
this.ugi = UserGroupInformation.getCurrentUser();
this.userRights = conf.getEnum(KSMConfigKeys.OZONE_KSM_USER_RIGHTS,
KSMConfigKeys.OZONE_KSM_USER_RIGHTS_DEFAULT);
+
+ // TODO: Add new configuration parameter to configure RestServerSelector.
+ RestServerSelector defaultSelector = new DefaultRestServerSelector();
+ InetSocketAddress restServer = getOzoneRestServerAddress(defaultSelector);
+ URIBuilder uriBuilder = new URIBuilder()
+ .setScheme("http")
+ .setHost(restServer.getHostName())
+ .setPort(restServer.getPort());
+ this.ozoneRestUri = uriBuilder.build();
+
} catch (URISyntaxException e) {
throw new IOException(e);
}
}
- /**
- * Returns the REST server host to connect to.
- *
- * @return hostname of REST server
- */
- private String getOzoneRestHandlerHost() {
- List servers = new ArrayList<>(conf.getTrimmedStringCollection(
- OzoneConfigKeys.OZONE_REST_SERVERS));
- if(servers.isEmpty()) {
- throw new IllegalArgumentException(OzoneConfigKeys.OZONE_REST_SERVERS +
- " must be defined. See" +
- " https://wiki.apache.org/hadoop/Ozone#Configuration for" +
- " details on configuring Ozone.");
+ private InetSocketAddress getOzoneRestServerAddress(
+ RestServerSelector selector) throws IOException {
+ String httpAddress = conf.get(KSMConfigKeys.OZONE_KSM_HTTP_ADDRESS_KEY);
+
+ if (httpAddress == null) {
+ throw new IllegalArgumentException(
+ KSMConfigKeys.OZONE_KSM_HTTP_ADDRESS_KEY + " must be defined. See" +
+ " https://wiki.apache.org/hadoop/Ozone#Configuration for" +
+ " details on configuring Ozone.");
+ }
+
+ HttpGet httpGet = new HttpGet("http://" + httpAddress + "/serviceList");
+ HttpEntity entity = executeHttpRequest(httpGet);
+ try {
+ String serviceListJson = EntityUtils.toString(entity);
+
+ ObjectMapper objectMapper = new ObjectMapper();
+ TypeReference> serviceInfoReference =
+ new TypeReference>() {
+ };
+ List services = objectMapper.readValue(
+ serviceListJson, serviceInfoReference);
+
+ List dataNodeInfos = services.stream().filter(
+ a -> a.getNodeType().equals(OzoneProtos.NodeType.DATANODE))
+ .collect(Collectors.toList());
+
+ ServiceInfo restServer = selector.getRestServer(dataNodeInfos);
+
+ return NetUtils.createSocketAddr(restServer.getHostname() + ":" +
+ restServer.getPort(ServicePort.Type.HTTP));
+ } finally {
+ EntityUtils.consume(entity);
}
- return servers.get(new Random().nextInt(servers.size()));
}
@Override
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rest/RestServerSelector.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rest/RestServerSelector.java
new file mode 100644
index 00000000000..54e219b92bc
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rest/RestServerSelector.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.client.rest;
+
+import org.apache.hadoop.ozone.ksm.helpers.ServiceInfo;
+
+import java.util.List;
+
+/**
+ * The implementor of this interface should select the REST server which will
+ * be used by the client to connect to Ozone Cluster, given list of
+ * REST Servers/DataNodes (DataNodes are the ones which hosts REST Service).
+ */
+public interface RestServerSelector {
+
+ /**
+ * Returns the REST Service which will be used by the client for connection.
+ *
+ * @param restServices list of available REST servers
+ * @return ServiceInfo
+ */
+ ServiceInfo getRestServer(List restServices);
+
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
index 94038e2407e..abcb7be9176 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.ozone.ksm.helpers.KsmKeyArgs;
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo;
import org.apache.hadoop.ozone.ksm.helpers.KsmVolumeArgs;
import org.apache.hadoop.ozone.ksm.helpers.OpenKeySession;
+import org.apache.hadoop.ozone.ksm.helpers.ServiceInfo;
import org.apache.hadoop.ozone.ksm.protocolPB
.KeySpaceManagerProtocolClientSideTranslatorPB;
import org.apache.hadoop.ozone.ksm.protocolPB
@@ -54,6 +55,8 @@ import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.ozone.OzoneAcl;
import org.apache.hadoop.ozone.client.OzoneClientUtils;
import org.apache.hadoop.ozone.ksm.KSMConfigKeys;
+import org.apache.hadoop.ozone.protocol.proto
+ .KeySpaceManagerProtocolProtos.ServicePort;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
import org.apache.hadoop.ozone.protocolPB.KSMPBHelper;
import org.apache.hadoop.scm.ScmConfigKeys;
@@ -123,8 +126,7 @@ public class RpcClient implements ClientProtocol {
long scmVersion =
RPC.getProtocolVersion(StorageContainerLocationProtocolPB.class);
- InetSocketAddress scmAddress =
- OzoneClientUtils.getScmAddressForClients(conf);
+ InetSocketAddress scmAddress = getScmAddressForClient();
RPC.setProtocolEngine(conf, StorageContainerLocationProtocolPB.class,
ProtobufRpcEngine.class);
this.storageContainerLocationClient =
@@ -150,6 +152,15 @@ public class RpcClient implements ClientProtocol {
}
}
+ private InetSocketAddress getScmAddressForClient() throws IOException {
+ List services = keySpaceManagerClient.getServiceList();
+ ServiceInfo scmInfo = services.stream().filter(
+ a -> a.getNodeType().equals(OzoneProtos.NodeType.SCM))
+ .collect(Collectors.toList()).get(0);
+ return NetUtils.createSocketAddr(scmInfo.getHostname()+ ":" +
+ scmInfo.getPort(ServicePort.Type.RPC));
+ }
+
@Override
public void createVolume(String volumeName) throws IOException {
createVolume(volumeName, VolumeArgs.newBuilder().build());
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/tools/Corona.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/tools/Corona.java
index 71da4d68ab7..ccf0aef7d51 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/tools/Corona.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/tools/Corona.java
@@ -187,8 +187,7 @@ public final class Corona extends Configured implements Tool {
numberOfVolumesCreated = new AtomicInteger();
numberOfBucketsCreated = new AtomicInteger();
numberOfKeysAdded = new AtomicLong();
- OzoneClientFactory.setConfiguration(conf);
- ozoneClient = OzoneClientFactory.getClient();
+ ozoneClient = OzoneClientFactory.getClient(conf);
objectStore = ozoneClient.getObjectStore();
for (CoronaOps ops : CoronaOps.values()) {
histograms.add(ops.ordinal(), new Histogram(new UniformReservoir()));
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/client/rest/TestOzoneRestClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/client/rest/TestOzoneRestClient.java
index 2bc5818ea7a..c5d80c0962f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/client/rest/TestOzoneRestClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/client/rest/TestOzoneRestClient.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.ozone.client.rest;
import org.apache.hadoop.conf.OzoneConfiguration;
import org.apache.hadoop.fs.StorageType;
-import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.ozone.MiniOzoneClassicCluster;
import org.apache.hadoop.ozone.OzoneAcl;
import org.apache.hadoop.ozone.OzoneConfigKeys;
@@ -46,6 +45,7 @@ import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.io.IOException;
+import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
@@ -77,15 +77,11 @@ public class TestOzoneRestClient {
OzoneConsts.OZONE_HANDLER_DISTRIBUTED);
cluster = new MiniOzoneClassicCluster.Builder(conf).numDataNodes(1)
.setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
- DataNode datanode = cluster.getDataNodes().get(0);
- conf.set(OzoneConfigKeys.OZONE_CLIENT_PROTOCOL,
- "org.apache.hadoop.ozone.client.rest.RestClient");
- conf.set(OzoneConfigKeys.OZONE_REST_SERVERS,
- datanode.getDatanodeHostname());
- conf.set(OzoneConfigKeys.OZONE_REST_CLIENT_PORT,
- Integer.toString(datanode.getInfoPort()));
- OzoneClientFactory.setConfiguration(conf);
- ozClient = OzoneClientFactory.getClient();
+
+ InetSocketAddress ksmHttpAddress = cluster.getKeySpaceManager()
+ .getHttpServer().getHttpAddress();
+ ozClient = OzoneClientFactory.getRestClient(ksmHttpAddress.getHostName(),
+ ksmHttpAddress.getPort(), conf);
store = ozClient.getObjectStore();
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java
index 3e461cdb419..4dae5e7423b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java
@@ -93,10 +93,7 @@ public class TestOzoneRpcClient {
conf.setInt(ScmConfigKeys.OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE, 1);
cluster = new MiniOzoneClassicCluster.Builder(conf).numDataNodes(10)
.setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
- conf.set("ozone.client.protocol",
- "org.apache.hadoop.ozone.client.rpc.RpcClient");
- OzoneClientFactory.setConfiguration(conf);
- ozClient = OzoneClientFactory.getClient();
+ ozClient = OzoneClientFactory.getRpcClient(conf);
store = ozClient.getObjectStore();
storageContainerLocationClient =
cluster.createStorageContainerLocationClient();
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerHandler.java
index 1007f3f40d5..989499e981a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerHandler.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerHandler.java
@@ -59,8 +59,7 @@ public class TestCloseContainerHandler {
cluster.waitOzoneReady();
//the easiest way to create an open container is creating a key
- OzoneClientFactory.setConfiguration(conf);
- OzoneClient client = OzoneClientFactory.getClient();
+ OzoneClient client = OzoneClientFactory.getClient(conf);
ObjectStore objectStore = client.getObjectStore();
objectStore.createVolume("test");
objectStore.getVolume("test").createBucket("test");