HDFS-12871. Ozone: Service Discovery: Adding REST server details in ServiceList. Contributed by Nanda Kumar.
This commit is contained in:
parent
3965f1ec99
commit
5e31b920f0
|
@ -22,6 +22,7 @@ import com.google.common.base.Preconditions;
|
||||||
import com.google.protobuf.BlockingService;
|
import com.google.protobuf.BlockingService;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.hdfs.DFSUtil;
|
import org.apache.hadoop.hdfs.DFSUtil;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
|
||||||
import org.apache.hadoop.ipc.Client;
|
import org.apache.hadoop.ipc.Client;
|
||||||
import org.apache.hadoop.ipc.ProtobufRpcEngine;
|
import org.apache.hadoop.ipc.ProtobufRpcEngine;
|
||||||
import org.apache.hadoop.ipc.RPC;
|
import org.apache.hadoop.ipc.RPC;
|
||||||
|
@ -53,8 +54,13 @@ import org.apache.hadoop.ozone.protocolPB
|
||||||
.KeySpaceManagerProtocolServerSideTranslatorPB;
|
.KeySpaceManagerProtocolServerSideTranslatorPB;
|
||||||
import org.apache.hadoop.scm.ScmInfo;
|
import org.apache.hadoop.scm.ScmInfo;
|
||||||
import org.apache.hadoop.scm.protocol.ScmBlockLocationProtocol;
|
import org.apache.hadoop.scm.protocol.ScmBlockLocationProtocol;
|
||||||
import org.apache.hadoop.scm.protocolPB.ScmBlockLocationProtocolClientSideTranslatorPB;
|
import org.apache.hadoop.scm.protocol.StorageContainerLocationProtocol;
|
||||||
|
import org.apache.hadoop.scm.protocolPB
|
||||||
|
.ScmBlockLocationProtocolClientSideTranslatorPB;
|
||||||
import org.apache.hadoop.scm.protocolPB.ScmBlockLocationProtocolPB;
|
import org.apache.hadoop.scm.protocolPB.ScmBlockLocationProtocolPB;
|
||||||
|
import org.apache.hadoop.scm.protocolPB
|
||||||
|
.StorageContainerLocationProtocolClientSideTranslatorPB;
|
||||||
|
import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolPB;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.util.GenericOptionsParser;
|
import org.apache.hadoop.util.GenericOptionsParser;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
|
@ -66,6 +72,7 @@ import java.io.IOException;
|
||||||
import java.io.PrintStream;
|
import java.io.PrintStream;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.EnumSet;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -80,6 +87,8 @@ import static org.apache.hadoop.ozone.ksm.KSMConfigKeys
|
||||||
import static org.apache.hadoop.ozone.protocol.proto
|
import static org.apache.hadoop.ozone.protocol.proto
|
||||||
.KeySpaceManagerProtocolProtos.KeySpaceManagerService
|
.KeySpaceManagerProtocolProtos.KeySpaceManagerService
|
||||||
.newReflectiveBlockingService;
|
.newReflectiveBlockingService;
|
||||||
|
import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos
|
||||||
|
.NodeState.HEALTHY;
|
||||||
import static org.apache.hadoop.util.ExitUtil.terminate;
|
import static org.apache.hadoop.util.ExitUtil.terminate;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -133,6 +142,7 @@ public class KeySpaceManager extends ServiceRuntimeInfoImpl
|
||||||
private final KeySpaceManagerHttpServer httpServer;
|
private final KeySpaceManagerHttpServer httpServer;
|
||||||
private final KSMStorage ksmStorage;
|
private final KSMStorage ksmStorage;
|
||||||
private final ScmBlockLocationProtocol scmBlockClient;
|
private final ScmBlockLocationProtocol scmBlockClient;
|
||||||
|
private final StorageContainerLocationProtocol scmContainerClient;
|
||||||
private ObjectName ksmInfoBeanName;
|
private ObjectName ksmInfoBeanName;
|
||||||
|
|
||||||
private KeySpaceManager(OzoneConfiguration conf) throws IOException {
|
private KeySpaceManager(OzoneConfiguration conf) throws IOException {
|
||||||
|
@ -140,6 +150,7 @@ public class KeySpaceManager extends ServiceRuntimeInfoImpl
|
||||||
configuration = conf;
|
configuration = conf;
|
||||||
ksmStorage = new KSMStorage(conf);
|
ksmStorage = new KSMStorage(conf);
|
||||||
scmBlockClient = getScmBlockClient(configuration);
|
scmBlockClient = getScmBlockClient(configuration);
|
||||||
|
scmContainerClient = getScmContainerClient(configuration);
|
||||||
if (ksmStorage.getState() != StorageState.INITIALIZED) {
|
if (ksmStorage.getState() != StorageState.INITIALIZED) {
|
||||||
throw new KSMException("KSM not initialized.",
|
throw new KSMException("KSM not initialized.",
|
||||||
ResultCodes.KSM_NOT_INITIALIZED);
|
ResultCodes.KSM_NOT_INITIALIZED);
|
||||||
|
@ -199,6 +210,29 @@ public class KeySpaceManager extends ServiceRuntimeInfoImpl
|
||||||
return scmBlockLocationClient;
|
return scmBlockLocationClient;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a scm container client.
|
||||||
|
*
|
||||||
|
* @return {@link StorageContainerLocationProtocol}
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
private static StorageContainerLocationProtocol getScmContainerClient(
|
||||||
|
OzoneConfiguration conf) throws IOException {
|
||||||
|
RPC.setProtocolEngine(conf, StorageContainerLocationProtocolPB.class,
|
||||||
|
ProtobufRpcEngine.class);
|
||||||
|
long scmVersion =
|
||||||
|
RPC.getProtocolVersion(StorageContainerLocationProtocolPB.class);
|
||||||
|
InetSocketAddress scmAddr = OzoneClientUtils.getScmAddressForClients(
|
||||||
|
conf);
|
||||||
|
StorageContainerLocationProtocolClientSideTranslatorPB scmContainerClient =
|
||||||
|
new StorageContainerLocationProtocolClientSideTranslatorPB(
|
||||||
|
RPC.getProxy(StorageContainerLocationProtocolPB.class, scmVersion,
|
||||||
|
scmAddr, UserGroupInformation.getCurrentUser(), conf,
|
||||||
|
NetUtils.getDefaultSocketFactory(conf),
|
||||||
|
Client.getRpcTimeout(conf)));
|
||||||
|
return scmContainerClient;
|
||||||
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public ScmInfo getScmInfo() throws IOException {
|
public ScmInfo getScmInfo() throws IOException {
|
||||||
return scmBlockClient.getScmInfo();
|
return scmBlockClient.getScmInfo();
|
||||||
|
@ -813,6 +847,7 @@ public class KeySpaceManager extends ServiceRuntimeInfoImpl
|
||||||
.setValue(httpServer.getHttpsAddress().getPort())
|
.setValue(httpServer.getHttpsAddress().getPort())
|
||||||
.build());
|
.build());
|
||||||
}
|
}
|
||||||
|
services.add(ksmServiceInfoBuilder.build());
|
||||||
|
|
||||||
// For client we have to return SCM with container protocol port,
|
// For client we have to return SCM with container protocol port,
|
||||||
// not block protocol.
|
// not block protocol.
|
||||||
|
@ -824,11 +859,33 @@ public class KeySpaceManager extends ServiceRuntimeInfoImpl
|
||||||
.addServicePort(ServicePort.newBuilder()
|
.addServicePort(ServicePort.newBuilder()
|
||||||
.setType(ServicePort.Type.RPC)
|
.setType(ServicePort.Type.RPC)
|
||||||
.setValue(scmAddr.getPort()).build());
|
.setValue(scmAddr.getPort()).build());
|
||||||
|
|
||||||
// TODO: REST servers (datanode) details to be added later.
|
|
||||||
|
|
||||||
services.add(ksmServiceInfoBuilder.build());
|
|
||||||
services.add(scmServiceInfoBuilder.build());
|
services.add(scmServiceInfoBuilder.build());
|
||||||
|
|
||||||
|
List<OzoneProtos.Node> nodes = scmContainerClient.queryNode(
|
||||||
|
EnumSet.of(HEALTHY), OzoneProtos.QueryScope.CLUSTER, "")
|
||||||
|
.getNodesList();
|
||||||
|
|
||||||
|
for (OzoneProtos.Node node : nodes) {
|
||||||
|
HdfsProtos.DatanodeIDProto datanode = node.getNodeID();
|
||||||
|
|
||||||
|
ServiceInfo.Builder dnServiceInfoBuilder = ServiceInfo.newBuilder()
|
||||||
|
.setNodeType(OzoneProtos.NodeType.DATANODE)
|
||||||
|
.setHostname(datanode.getHostName());
|
||||||
|
|
||||||
|
dnServiceInfoBuilder.addServicePort(ServicePort.newBuilder()
|
||||||
|
.setType(ServicePort.Type.HTTP)
|
||||||
|
.setValue(datanode.getInfoPort())
|
||||||
|
.build());
|
||||||
|
|
||||||
|
if (datanode.hasInfoSecurePort() && datanode.getInfoSecurePort() > 0) {
|
||||||
|
dnServiceInfoBuilder.addServicePort(ServicePort.newBuilder()
|
||||||
|
.setType(ServicePort.Type.HTTPS)
|
||||||
|
.setValue(datanode.getInfoSecurePort())
|
||||||
|
.build());
|
||||||
|
}
|
||||||
|
services.add(dnServiceInfoBuilder.build());
|
||||||
|
}
|
||||||
|
|
||||||
metrics.incNumGetServiceLists();
|
metrics.incNumGetServiceLists();
|
||||||
// For now there is no exception that can can happen in this call,
|
// For now there is no exception that can can happen in this call,
|
||||||
// so failure metrics is not handled. In future if there is any need to
|
// so failure metrics is not handled. In future if there is any need to
|
||||||
|
|
|
@ -21,13 +21,15 @@ package org.apache.hadoop.ozone.ksm;
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import com.fasterxml.jackson.core.type.TypeReference;
|
import com.fasterxml.jackson.core.type.TypeReference;
|
||||||
import org.apache.hadoop.conf.OzoneConfiguration;
|
import org.apache.hadoop.conf.OzoneConfiguration;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
import org.apache.hadoop.net.NetUtils;
|
import org.apache.hadoop.net.NetUtils;
|
||||||
import org.apache.hadoop.ozone.MiniOzoneClassicCluster;
|
import org.apache.hadoop.ozone.MiniOzoneClassicCluster;
|
||||||
import org.apache.hadoop.ozone.MiniOzoneCluster;
|
import org.apache.hadoop.ozone.MiniOzoneCluster;
|
||||||
import org.apache.hadoop.ozone.OzoneConsts;
|
import org.apache.hadoop.ozone.OzoneConsts;
|
||||||
import org.apache.hadoop.ozone.client.OzoneClientUtils;
|
import org.apache.hadoop.ozone.client.OzoneClientUtils;
|
||||||
import org.apache.hadoop.ozone.ksm.helpers.ServiceInfo;
|
import org.apache.hadoop.ozone.ksm.helpers.ServiceInfo;
|
||||||
import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos;
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
|
.KeySpaceManagerProtocolProtos.ServicePort;
|
||||||
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
|
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
|
||||||
import org.apache.http.HttpResponse;
|
import org.apache.http.HttpResponse;
|
||||||
import org.apache.http.client.HttpClient;
|
import org.apache.http.client.HttpClient;
|
||||||
|
@ -97,9 +99,9 @@ public class TestKeySpaceManagerRestInterface {
|
||||||
|
|
||||||
Assert.assertEquals(ksmAddress.getHostName(), ksmInfo.getHostname());
|
Assert.assertEquals(ksmAddress.getHostName(), ksmInfo.getHostname());
|
||||||
Assert.assertEquals(ksmAddress.getPort(),
|
Assert.assertEquals(ksmAddress.getPort(),
|
||||||
ksmInfo.getPort(KeySpaceManagerProtocolProtos.ServicePort.Type.RPC));
|
ksmInfo.getPort(ServicePort.Type.RPC));
|
||||||
Assert.assertEquals(server.getHttpAddress().getPort(),
|
Assert.assertEquals(server.getHttpAddress().getPort(),
|
||||||
ksmInfo.getPort(KeySpaceManagerProtocolProtos.ServicePort.Type.HTTP));
|
ksmInfo.getPort(ServicePort.Type.HTTP));
|
||||||
|
|
||||||
InetSocketAddress scmAddress =
|
InetSocketAddress scmAddress =
|
||||||
OzoneClientUtils.getScmAddressForClients(conf);
|
OzoneClientUtils.getScmAddressForClients(conf);
|
||||||
|
@ -107,7 +109,33 @@ public class TestKeySpaceManagerRestInterface {
|
||||||
|
|
||||||
Assert.assertEquals(scmAddress.getHostName(), scmInfo.getHostname());
|
Assert.assertEquals(scmAddress.getHostName(), scmInfo.getHostname());
|
||||||
Assert.assertEquals(scmAddress.getPort(),
|
Assert.assertEquals(scmAddress.getPort(),
|
||||||
scmInfo.getPort(KeySpaceManagerProtocolProtos.ServicePort.Type.RPC));
|
scmInfo.getPort(ServicePort.Type.RPC));
|
||||||
|
|
||||||
|
ServiceInfo datanodeInfo = serviceMap.get(OzoneProtos.NodeType.DATANODE);
|
||||||
|
DataNode datanode = ((MiniOzoneClassicCluster) cluster)
|
||||||
|
.getDataNodes().get(0);
|
||||||
|
Assert.assertEquals(datanode.getDatanodeHostname(),
|
||||||
|
datanodeInfo.getHostname());
|
||||||
|
|
||||||
|
Map<ServicePort.Type, Integer> ports = datanodeInfo.getPorts();
|
||||||
|
for(ServicePort.Type type : ports.keySet()) {
|
||||||
|
switch (type) {
|
||||||
|
case HTTP:
|
||||||
|
Assert.assertEquals(datanode.getInfoPort(),
|
||||||
|
(int) ports.get(type));
|
||||||
|
break;
|
||||||
|
case HTTPS:
|
||||||
|
Assert.assertEquals(datanode.getInfoSecurePort(),
|
||||||
|
(int) ports.get(type));
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
// KSM only sends Datanode's info port details
|
||||||
|
// i.e. HTTP or HTTPS
|
||||||
|
// Other ports are not expected as of now.
|
||||||
|
Assert.fail();
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue