HDFS-12869. Ozone: Service Discovery: RPC endpoint in KSM for getServiceList. Contributed by Nanda kumar.
This commit is contained in:
parent
79432ee6cf
commit
4b9f66a19e
|
@ -0,0 +1,191 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
* <p>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.ozone.ksm.helpers;
|
||||||
|
|
||||||
|
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
|
import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos;
|
||||||
|
import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos
|
||||||
|
.ServicePort;
|
||||||
|
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeType;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* ServiceInfo holds the config details of Ozone services.
|
||||||
|
*/
|
||||||
|
public final class ServiceInfo {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Type of node/service.
|
||||||
|
*/
|
||||||
|
private final NodeType nodeType;
|
||||||
|
/**
|
||||||
|
* Hostname of the node in which the service is running.
|
||||||
|
*/
|
||||||
|
private final String hostname;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* List of ports the service listens to.
|
||||||
|
*/
|
||||||
|
private final Map<ServicePort.Type, ServicePort> portsMap;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructs the ServiceInfo for the {@code nodeType}.
|
||||||
|
* @param nodeType type of node/service
|
||||||
|
* @param hostname hostname of the service
|
||||||
|
* @param ports list of ports the service listens to
|
||||||
|
*/
|
||||||
|
private ServiceInfo(
|
||||||
|
NodeType nodeType, String hostname, List<ServicePort> ports) {
|
||||||
|
Preconditions.checkNotNull(nodeType);
|
||||||
|
Preconditions.checkNotNull(hostname);
|
||||||
|
this.nodeType = nodeType;
|
||||||
|
this.hostname = hostname;
|
||||||
|
this.portsMap = new HashMap<>();
|
||||||
|
for (ServicePort port : ports) {
|
||||||
|
portsMap.put(port.getType(), port);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the type of node/service.
|
||||||
|
* @return node type
|
||||||
|
*/
|
||||||
|
public NodeType getNodeType() {
|
||||||
|
return nodeType;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the hostname of the service.
|
||||||
|
* @return hostname
|
||||||
|
*/
|
||||||
|
public String getHostname() {
|
||||||
|
return hostname;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the list of port which the service listens to.
|
||||||
|
* @return List<ServicePort>
|
||||||
|
*/
|
||||||
|
public List<ServicePort> getPorts() {
|
||||||
|
return portsMap.values().parallelStream().collect(Collectors.toList());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the port for given type, null if the service doesn't support
|
||||||
|
* the type.
|
||||||
|
*
|
||||||
|
* @param type the type of port.
|
||||||
|
* ex: RPC, HTTP, HTTPS, etc..
|
||||||
|
*/
|
||||||
|
public int getPort(ServicePort.Type type) {
|
||||||
|
return portsMap.get(type).getValue();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Converts {@link ServiceInfo} to KeySpaceManagerProtocolProtos.ServiceInfo.
|
||||||
|
*
|
||||||
|
* @return KeySpaceManagerProtocolProtos.ServiceInfo
|
||||||
|
*/
|
||||||
|
public KeySpaceManagerProtocolProtos.ServiceInfo getProtobuf() {
|
||||||
|
KeySpaceManagerProtocolProtos.ServiceInfo.Builder builder =
|
||||||
|
KeySpaceManagerProtocolProtos.ServiceInfo.newBuilder();
|
||||||
|
builder.setNodeType(nodeType)
|
||||||
|
.setHostname(hostname)
|
||||||
|
.addAllServicePorts(portsMap.values());
|
||||||
|
return builder.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Converts KeySpaceManagerProtocolProtos.ServiceInfo to {@link ServiceInfo}.
|
||||||
|
*
|
||||||
|
* @return {@link ServiceInfo}
|
||||||
|
*/
|
||||||
|
public static ServiceInfo getFromProtobuf(
|
||||||
|
KeySpaceManagerProtocolProtos.ServiceInfo serviceInfo) {
|
||||||
|
return new ServiceInfo(serviceInfo.getNodeType(),
|
||||||
|
serviceInfo.getHostname(),
|
||||||
|
serviceInfo.getServicePortsList());
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a new builder to build {@link ServiceInfo}.
|
||||||
|
* @return {@link ServiceInfo.Builder}
|
||||||
|
*/
|
||||||
|
public static Builder newBuilder() {
|
||||||
|
return new Builder();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Builder used to build/construct {@link ServiceInfo}.
|
||||||
|
*/
|
||||||
|
public static class Builder {
|
||||||
|
|
||||||
|
private NodeType node;
|
||||||
|
private String host;
|
||||||
|
private List<ServicePort> ports = new ArrayList<>();
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets the node/service type.
|
||||||
|
* @param nodeType type of node
|
||||||
|
* @return the builder
|
||||||
|
*/
|
||||||
|
public Builder setNodeType(NodeType nodeType) {
|
||||||
|
node = nodeType;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets the hostname of the service.
|
||||||
|
* @param hostname service hostname
|
||||||
|
* @return the builder
|
||||||
|
*/
|
||||||
|
public Builder setHostname(String hostname) {
|
||||||
|
host = hostname;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Adds the service port to the service port list.
|
||||||
|
* @param servicePort RPC port
|
||||||
|
* @return the builder
|
||||||
|
*/
|
||||||
|
public Builder addServicePort(ServicePort servicePort) {
|
||||||
|
ports.add(servicePort);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Builds and returns {@link ServiceInfo} with the set values.
|
||||||
|
* @return {@link ServiceInfo}
|
||||||
|
*/
|
||||||
|
public ServiceInfo build() {
|
||||||
|
return new ServiceInfo(node, host, ports);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -24,6 +24,7 @@ import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo;
|
||||||
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfo;
|
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfo;
|
||||||
import org.apache.hadoop.ozone.ksm.helpers.KsmVolumeArgs;
|
import org.apache.hadoop.ozone.ksm.helpers.KsmVolumeArgs;
|
||||||
import org.apache.hadoop.ozone.ksm.helpers.OpenKeySession;
|
import org.apache.hadoop.ozone.ksm.helpers.OpenKeySession;
|
||||||
|
import org.apache.hadoop.ozone.ksm.helpers.ServiceInfo;
|
||||||
import org.apache.hadoop.ozone.protocol.proto
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
.KeySpaceManagerProtocolProtos.OzoneAclInfo;
|
.KeySpaceManagerProtocolProtos.OzoneAclInfo;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
@ -233,4 +234,12 @@ public interface KeySpaceManagerProtocol {
|
||||||
List<KsmKeyInfo> listKeys(String volumeName,
|
List<KsmKeyInfo> listKeys(String volumeName,
|
||||||
String bucketName, String startKeyName, String keyPrefix, int maxKeys)
|
String bucketName, String startKeyName, String keyPrefix, int maxKeys)
|
||||||
throws IOException;
|
throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns list of Ozone services with its configuration details.
|
||||||
|
*
|
||||||
|
* @return list of Ozone services
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
List<ServiceInfo> getServiceList() throws IOException;
|
||||||
}
|
}
|
||||||
|
|
|
@ -31,6 +31,7 @@ import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo;
|
||||||
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfo;
|
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfo;
|
||||||
import org.apache.hadoop.ozone.ksm.helpers.KsmVolumeArgs;
|
import org.apache.hadoop.ozone.ksm.helpers.KsmVolumeArgs;
|
||||||
import org.apache.hadoop.ozone.ksm.helpers.OpenKeySession;
|
import org.apache.hadoop.ozone.ksm.helpers.OpenKeySession;
|
||||||
|
import org.apache.hadoop.ozone.ksm.helpers.ServiceInfo;
|
||||||
import org.apache.hadoop.ozone.ksm.protocol.KeySpaceManagerProtocol;
|
import org.apache.hadoop.ozone.ksm.protocol.KeySpaceManagerProtocol;
|
||||||
import org.apache.hadoop.ozone.protocol.proto
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
.KeySpaceManagerProtocolProtos.AllocateBlockRequest;
|
.KeySpaceManagerProtocolProtos.AllocateBlockRequest;
|
||||||
|
@ -90,18 +91,24 @@ import org.apache.hadoop.ozone.protocol.proto
|
||||||
.KeySpaceManagerProtocolProtos.ListBucketsRequest;
|
.KeySpaceManagerProtocolProtos.ListBucketsRequest;
|
||||||
import org.apache.hadoop.ozone.protocol.proto
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
.KeySpaceManagerProtocolProtos.ListBucketsResponse;
|
.KeySpaceManagerProtocolProtos.ListBucketsResponse;
|
||||||
import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.ListKeysRequest;
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.ListKeysResponse;
|
.KeySpaceManagerProtocolProtos.ListKeysRequest;
|
||||||
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
|
.KeySpaceManagerProtocolProtos.ListKeysResponse;
|
||||||
import org.apache.hadoop.ozone.protocol.proto
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
.KeySpaceManagerProtocolProtos.VolumeInfo;
|
.KeySpaceManagerProtocolProtos.VolumeInfo;
|
||||||
import org.apache.hadoop.ozone.protocol.proto
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
.KeySpaceManagerProtocolProtos.Status;
|
.KeySpaceManagerProtocolProtos.Status;
|
||||||
import org.apache.hadoop.ozone.protocol.proto
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
.KeySpaceManagerProtocolProtos.OzoneAclInfo;
|
.KeySpaceManagerProtocolProtos.OzoneAclInfo;
|
||||||
import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
.ListVolumeRequest;
|
.KeySpaceManagerProtocolProtos.ListVolumeRequest;
|
||||||
import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
.ListVolumeResponse;
|
.KeySpaceManagerProtocolProtos.ListVolumeResponse;
|
||||||
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
|
.KeySpaceManagerProtocolProtos.ServiceListRequest;
|
||||||
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
|
.KeySpaceManagerProtocolProtos.ServiceListResponse;
|
||||||
|
|
||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
@ -705,6 +712,26 @@ public final class KeySpaceManagerProtocolClientSideTranslatorPB
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<ServiceInfo> getServiceList() throws IOException {
|
||||||
|
ServiceListRequest request = ServiceListRequest.newBuilder().build();
|
||||||
|
final ServiceListResponse resp;
|
||||||
|
try {
|
||||||
|
resp = rpcProxy.getServiceList(NULL_RPC_CONTROLLER, request);
|
||||||
|
} catch (ServiceException e) {
|
||||||
|
throw ProtobufHelper.getRemoteException(e);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (resp.getStatus() == Status.OK) {
|
||||||
|
return resp.getServiceInfoList().stream()
|
||||||
|
.map(ServiceInfo::getFromProtobuf)
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
} else {
|
||||||
|
throw new IOException("Getting service list failed, error: "
|
||||||
|
+ resp.getStatus());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Return the proxy object underlying this protocol translator.
|
* Return the proxy object underlying this protocol translator.
|
||||||
*
|
*
|
||||||
|
|
|
@ -318,6 +318,31 @@ message CommitKeyResponse {
|
||||||
required Status status = 1;
|
required Status status = 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
message ServiceListRequest {
|
||||||
|
}
|
||||||
|
|
||||||
|
message ServiceListResponse {
|
||||||
|
required Status status = 1;
|
||||||
|
repeated ServiceInfo serviceInfo = 2;
|
||||||
|
}
|
||||||
|
|
||||||
|
message ServicePort {
|
||||||
|
enum Type {
|
||||||
|
RPC = 1;
|
||||||
|
HTTP = 2;
|
||||||
|
HTTPS = 3;
|
||||||
|
RATIS = 4;
|
||||||
|
};
|
||||||
|
required Type type = 1;
|
||||||
|
required uint32 value = 2;
|
||||||
|
}
|
||||||
|
|
||||||
|
message ServiceInfo {
|
||||||
|
required NodeType nodeType = 1;
|
||||||
|
required string hostname = 2;
|
||||||
|
repeated ServicePort servicePorts = 3;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
The KSM service that takes care of Ozone namespace.
|
The KSM service that takes care of Ozone namespace.
|
||||||
*/
|
*/
|
||||||
|
@ -423,4 +448,10 @@ service KeySpaceManagerService {
|
||||||
*/
|
*/
|
||||||
rpc allocateBlock(AllocateBlockRequest)
|
rpc allocateBlock(AllocateBlockRequest)
|
||||||
returns(AllocateBlockResponse);
|
returns(AllocateBlockResponse);
|
||||||
|
|
||||||
|
/**
|
||||||
|
Returns list of Ozone services with its configuration details.
|
||||||
|
*/
|
||||||
|
rpc getServiceList(ServiceListRequest)
|
||||||
|
returns(ServiceListResponse);
|
||||||
}
|
}
|
||||||
|
|
|
@ -58,6 +58,7 @@ public class KSMMetrics {
|
||||||
private @Metric MutableCounterLong numVolumeLists;
|
private @Metric MutableCounterLong numVolumeLists;
|
||||||
private @Metric MutableCounterLong numKeyCommits;
|
private @Metric MutableCounterLong numKeyCommits;
|
||||||
private @Metric MutableCounterLong numAllocateBlockCalls;
|
private @Metric MutableCounterLong numAllocateBlockCalls;
|
||||||
|
private @Metric MutableCounterLong numGetServiceLists;
|
||||||
|
|
||||||
// Failure Metrics
|
// Failure Metrics
|
||||||
private @Metric MutableCounterLong numVolumeCreateFails;
|
private @Metric MutableCounterLong numVolumeCreateFails;
|
||||||
|
@ -77,6 +78,7 @@ public class KSMMetrics {
|
||||||
private @Metric MutableCounterLong numVolumeListFails;
|
private @Metric MutableCounterLong numVolumeListFails;
|
||||||
private @Metric MutableCounterLong numKeyCommitFails;
|
private @Metric MutableCounterLong numKeyCommitFails;
|
||||||
private @Metric MutableCounterLong numBlockAllocateCallFails;
|
private @Metric MutableCounterLong numBlockAllocateCallFails;
|
||||||
|
private @Metric MutableCounterLong numGetServiceListFails;
|
||||||
|
|
||||||
public KSMMetrics() {
|
public KSMMetrics() {
|
||||||
}
|
}
|
||||||
|
@ -148,6 +150,10 @@ public class KSMMetrics {
|
||||||
numVolumeLists.incr();
|
numVolumeLists.incr();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void incNumGetServiceLists() {
|
||||||
|
numGetServiceLists.incr();
|
||||||
|
}
|
||||||
|
|
||||||
public void incNumVolumeCreateFails() {
|
public void incNumVolumeCreateFails() {
|
||||||
numVolumeCreateFails.incr();
|
numVolumeCreateFails.incr();
|
||||||
}
|
}
|
||||||
|
@ -240,6 +246,10 @@ public class KSMMetrics {
|
||||||
numVolumeListFails.incr();
|
numVolumeListFails.incr();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void incNumGetServiceListFails() {
|
||||||
|
numGetServiceListFails.incr();
|
||||||
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public long getNumVolumeCreates() {
|
public long getNumVolumeCreates() {
|
||||||
return numVolumeCreates.value();
|
return numVolumeCreates.value();
|
||||||
|
@ -300,6 +310,11 @@ public class KSMMetrics {
|
||||||
return numKeyLists.value();
|
return numKeyLists.value();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public long getNumGetServiceLists() {
|
||||||
|
return numGetServiceLists.value();
|
||||||
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public long getNumVolumeCreateFails() {
|
public long getNumVolumeCreateFails() {
|
||||||
return numVolumeCreateFails.value();
|
return numVolumeCreateFails.value();
|
||||||
|
@ -410,6 +425,11 @@ public class KSMMetrics {
|
||||||
return numBlockAllocateCallFails.value();
|
return numBlockAllocateCallFails.value();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public long getNumGetServiceListFails() {
|
||||||
|
return numGetServiceListFails.value();
|
||||||
|
}
|
||||||
|
|
||||||
public void unRegister() {
|
public void unRegister() {
|
||||||
MetricsSystem ms = DefaultMetricsSystem.instance();
|
MetricsSystem ms = DefaultMetricsSystem.instance();
|
||||||
ms.unregisterSource(SOURCE_NAME);
|
ms.unregisterSource(SOURCE_NAME);
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
package org.apache.hadoop.ozone.ksm;
|
package org.apache.hadoop.ozone.ksm;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
import com.google.protobuf.BlockingService;
|
import com.google.protobuf.BlockingService;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.hdfs.DFSUtil;
|
import org.apache.hadoop.hdfs.DFSUtil;
|
||||||
|
@ -34,6 +35,7 @@ import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo;
|
||||||
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfo;
|
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfo;
|
||||||
import org.apache.hadoop.ozone.ksm.helpers.KsmVolumeArgs;
|
import org.apache.hadoop.ozone.ksm.helpers.KsmVolumeArgs;
|
||||||
import org.apache.hadoop.ozone.ksm.helpers.OpenKeySession;
|
import org.apache.hadoop.ozone.ksm.helpers.OpenKeySession;
|
||||||
|
import org.apache.hadoop.ozone.ksm.helpers.ServiceInfo;
|
||||||
import org.apache.hadoop.ozone.ksm.protocol.KeySpaceManagerProtocol;
|
import org.apache.hadoop.ozone.ksm.protocol.KeySpaceManagerProtocol;
|
||||||
import org.apache.hadoop.ozone.ksm.protocolPB.KeySpaceManagerProtocolPB;
|
import org.apache.hadoop.ozone.ksm.protocolPB.KeySpaceManagerProtocolPB;
|
||||||
import org.apache.hadoop.ozone.ksm.exceptions.KSMException.ResultCodes;
|
import org.apache.hadoop.ozone.ksm.exceptions.KSMException.ResultCodes;
|
||||||
|
@ -42,8 +44,11 @@ import org.apache.hadoop.metrics2.util.MBeans;
|
||||||
import org.apache.hadoop.net.NetUtils;
|
import org.apache.hadoop.net.NetUtils;
|
||||||
import org.apache.hadoop.ozone.client.OzoneClientUtils;
|
import org.apache.hadoop.ozone.client.OzoneClientUtils;
|
||||||
import org.apache.hadoop.conf.OzoneConfiguration;
|
import org.apache.hadoop.conf.OzoneConfiguration;
|
||||||
|
import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos
|
||||||
|
.ServicePort;
|
||||||
import org.apache.hadoop.ozone.protocol.proto
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
.KeySpaceManagerProtocolProtos.OzoneAclInfo;
|
.KeySpaceManagerProtocolProtos.OzoneAclInfo;
|
||||||
|
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
|
||||||
import org.apache.hadoop.ozone.protocolPB
|
import org.apache.hadoop.ozone.protocolPB
|
||||||
.KeySpaceManagerProtocolServerSideTranslatorPB;
|
.KeySpaceManagerProtocolServerSideTranslatorPB;
|
||||||
import org.apache.hadoop.scm.ScmInfo;
|
import org.apache.hadoop.scm.ScmInfo;
|
||||||
|
@ -60,6 +65,7 @@ import javax.management.ObjectName;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.PrintStream;
|
import java.io.PrintStream;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -116,6 +122,7 @@ public class KeySpaceManager extends ServiceRuntimeInfoImpl
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private final OzoneConfiguration configuration;
|
||||||
private final RPC.Server ksmRpcServer;
|
private final RPC.Server ksmRpcServer;
|
||||||
private final InetSocketAddress ksmRpcAddress;
|
private final InetSocketAddress ksmRpcAddress;
|
||||||
private final KSMMetadataManager metadataManager;
|
private final KSMMetadataManager metadataManager;
|
||||||
|
@ -125,11 +132,14 @@ public class KeySpaceManager extends ServiceRuntimeInfoImpl
|
||||||
private final KSMMetrics metrics;
|
private final KSMMetrics metrics;
|
||||||
private final KeySpaceManagerHttpServer httpServer;
|
private final KeySpaceManagerHttpServer httpServer;
|
||||||
private final KSMStorage ksmStorage;
|
private final KSMStorage ksmStorage;
|
||||||
|
private final ScmBlockLocationProtocol scmBlockClient;
|
||||||
private ObjectName ksmInfoBeanName;
|
private ObjectName ksmInfoBeanName;
|
||||||
|
|
||||||
private KeySpaceManager(OzoneConfiguration conf) throws IOException {
|
private KeySpaceManager(OzoneConfiguration conf) throws IOException {
|
||||||
|
Preconditions.checkNotNull(conf);
|
||||||
|
configuration = conf;
|
||||||
ksmStorage = new KSMStorage(conf);
|
ksmStorage = new KSMStorage(conf);
|
||||||
ScmBlockLocationProtocol scmBlockClient = getScmBlockClient(conf);
|
scmBlockClient = getScmBlockClient(configuration);
|
||||||
if (ksmStorage.getState() != StorageState.INITIALIZED) {
|
if (ksmStorage.getState() != StorageState.INITIALIZED) {
|
||||||
throw new KSMException("KSM not initialized.",
|
throw new KSMException("KSM not initialized.",
|
||||||
ResultCodes.KSM_NOT_INITIALIZED);
|
ResultCodes.KSM_NOT_INITIALIZED);
|
||||||
|
@ -145,32 +155,31 @@ public class KeySpaceManager extends ServiceRuntimeInfoImpl
|
||||||
final int handlerCount = conf.getInt(OZONE_KSM_HANDLER_COUNT_KEY,
|
final int handlerCount = conf.getInt(OZONE_KSM_HANDLER_COUNT_KEY,
|
||||||
OZONE_KSM_HANDLER_COUNT_DEFAULT);
|
OZONE_KSM_HANDLER_COUNT_DEFAULT);
|
||||||
|
|
||||||
RPC.setProtocolEngine(conf, KeySpaceManagerProtocolPB.class,
|
RPC.setProtocolEngine(configuration, KeySpaceManagerProtocolPB.class,
|
||||||
ProtobufRpcEngine.class);
|
ProtobufRpcEngine.class);
|
||||||
|
|
||||||
BlockingService ksmService = newReflectiveBlockingService(
|
BlockingService ksmService = newReflectiveBlockingService(
|
||||||
new KeySpaceManagerProtocolServerSideTranslatorPB(this));
|
new KeySpaceManagerProtocolServerSideTranslatorPB(this));
|
||||||
final InetSocketAddress ksmNodeRpcAddr = OzoneClientUtils.
|
final InetSocketAddress ksmNodeRpcAddr = OzoneClientUtils.
|
||||||
getKsmAddress(conf);
|
getKsmAddress(configuration);
|
||||||
ksmRpcServer = startRpcServer(conf, ksmNodeRpcAddr,
|
ksmRpcServer = startRpcServer(configuration, ksmNodeRpcAddr,
|
||||||
KeySpaceManagerProtocolPB.class, ksmService,
|
KeySpaceManagerProtocolPB.class, ksmService,
|
||||||
handlerCount);
|
handlerCount);
|
||||||
ksmRpcAddress = OzoneClientUtils.updateRPCListenAddress(conf,
|
ksmRpcAddress = OzoneClientUtils.updateRPCListenAddress(configuration,
|
||||||
OZONE_KSM_ADDRESS_KEY, ksmNodeRpcAddr, ksmRpcServer);
|
OZONE_KSM_ADDRESS_KEY, ksmNodeRpcAddr, ksmRpcServer);
|
||||||
metadataManager = new KSMMetadataManagerImpl(conf);
|
metadataManager = new KSMMetadataManagerImpl(configuration);
|
||||||
volumeManager = new VolumeManagerImpl(metadataManager, conf);
|
volumeManager = new VolumeManagerImpl(metadataManager, configuration);
|
||||||
bucketManager = new BucketManagerImpl(metadataManager);
|
bucketManager = new BucketManagerImpl(metadataManager);
|
||||||
metrics = KSMMetrics.create();
|
metrics = KSMMetrics.create();
|
||||||
keyManager = new KeyManagerImpl(
|
keyManager = new KeyManagerImpl(scmBlockClient, metadataManager,
|
||||||
getScmBlockClient(conf), metadataManager, conf);
|
configuration);
|
||||||
httpServer = new KeySpaceManagerHttpServer(conf);
|
httpServer = new KeySpaceManagerHttpServer(configuration);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a scm block client, used by putKey() and getKey().
|
* Create a scm block client, used by putKey() and getKey().
|
||||||
*
|
*
|
||||||
* @param conf
|
* @return {@link ScmBlockLocationProtocol}
|
||||||
* @return
|
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
private static ScmBlockLocationProtocol getScmBlockClient(
|
private static ScmBlockLocationProtocol getScmBlockClient(
|
||||||
|
@ -191,8 +200,8 @@ public class KeySpaceManager extends ServiceRuntimeInfoImpl
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public ScmInfo getScmInfo(OzoneConfiguration conf) throws IOException {
|
public ScmInfo getScmInfo() throws IOException {
|
||||||
return getScmBlockClient(conf).getScmInfo();
|
return scmBlockClient.getScmInfo();
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
|
@ -775,4 +784,51 @@ public class KeySpaceManager extends ServiceRuntimeInfoImpl
|
||||||
public String getRpcPort() {
|
public String getRpcPort() {
|
||||||
return "" + ksmRpcAddress.getPort();
|
return "" + ksmRpcAddress.getPort();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<ServiceInfo> getServiceList() throws IOException {
|
||||||
|
// When we implement multi-home this call has to be handled properly.
|
||||||
|
List<ServiceInfo> services = new ArrayList<>();
|
||||||
|
ServiceInfo.Builder ksmServiceInfoBuilder = ServiceInfo.newBuilder()
|
||||||
|
.setNodeType(OzoneProtos.NodeType.KSM)
|
||||||
|
.setHostname(ksmRpcAddress.getHostName())
|
||||||
|
.addServicePort(ServicePort.newBuilder()
|
||||||
|
.setType(ServicePort.Type.RPC)
|
||||||
|
.setValue(ksmRpcAddress.getPort())
|
||||||
|
.build());
|
||||||
|
if (httpServer.getHttpAddress() != null) {
|
||||||
|
ksmServiceInfoBuilder.addServicePort(ServicePort.newBuilder()
|
||||||
|
.setType(ServicePort.Type.HTTP)
|
||||||
|
.setValue(httpServer.getHttpAddress().getPort())
|
||||||
|
.build());
|
||||||
|
}
|
||||||
|
if (httpServer.getHttpsAddress() != null) {
|
||||||
|
ksmServiceInfoBuilder.addServicePort(ServicePort.newBuilder()
|
||||||
|
.setType(ServicePort.Type.HTTPS)
|
||||||
|
.setValue(httpServer.getHttpsAddress().getPort())
|
||||||
|
.build());
|
||||||
|
}
|
||||||
|
|
||||||
|
// For client we have to return SCM with container protocol port,
|
||||||
|
// not block protocol.
|
||||||
|
InetSocketAddress scmAddr = OzoneClientUtils.getScmAddressForClients(
|
||||||
|
configuration);
|
||||||
|
ServiceInfo.Builder scmServiceInfoBuilder = ServiceInfo.newBuilder()
|
||||||
|
.setNodeType(OzoneProtos.NodeType.SCM)
|
||||||
|
.setHostname(scmAddr.getHostName())
|
||||||
|
.addServicePort(ServicePort.newBuilder()
|
||||||
|
.setType(ServicePort.Type.RPC)
|
||||||
|
.setValue(scmAddr.getPort()).build());
|
||||||
|
|
||||||
|
// TODO: REST servers (datanode) details to be added later.
|
||||||
|
|
||||||
|
services.add(ksmServiceInfoBuilder.build());
|
||||||
|
services.add(scmServiceInfoBuilder.build());
|
||||||
|
metrics.incNumGetServiceLists();
|
||||||
|
// For now there is no exception that can can happen in this call,
|
||||||
|
// so failure metrics is not handled. In future if there is any need to
|
||||||
|
// handle exception in this method, we need to incorporate
|
||||||
|
// metrics.incNumGetServiceListFails()
|
||||||
|
return services;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -26,6 +26,7 @@ import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo;
|
||||||
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfo;
|
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfo;
|
||||||
import org.apache.hadoop.ozone.ksm.helpers.KsmVolumeArgs;
|
import org.apache.hadoop.ozone.ksm.helpers.KsmVolumeArgs;
|
||||||
import org.apache.hadoop.ozone.ksm.helpers.OpenKeySession;
|
import org.apache.hadoop.ozone.ksm.helpers.OpenKeySession;
|
||||||
|
import org.apache.hadoop.ozone.ksm.helpers.ServiceInfo;
|
||||||
import org.apache.hadoop.ozone.ksm.protocol.KeySpaceManagerProtocol;
|
import org.apache.hadoop.ozone.ksm.protocol.KeySpaceManagerProtocol;
|
||||||
import org.apache.hadoop.ozone.ksm.protocolPB.KeySpaceManagerProtocolPB;
|
import org.apache.hadoop.ozone.ksm.protocolPB.KeySpaceManagerProtocolPB;
|
||||||
import org.apache.hadoop.ozone.ksm.exceptions.KSMException;
|
import org.apache.hadoop.ozone.ksm.exceptions.KSMException;
|
||||||
|
@ -87,16 +88,23 @@ import org.apache.hadoop.ozone.protocol.proto
|
||||||
.KeySpaceManagerProtocolProtos.ListBucketsRequest;
|
.KeySpaceManagerProtocolProtos.ListBucketsRequest;
|
||||||
import org.apache.hadoop.ozone.protocol.proto
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
.KeySpaceManagerProtocolProtos.ListBucketsResponse;
|
.KeySpaceManagerProtocolProtos.ListBucketsResponse;
|
||||||
import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.ListKeysRequest;
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.ListKeysResponse;
|
.KeySpaceManagerProtocolProtos.ListKeysRequest;
|
||||||
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
|
.KeySpaceManagerProtocolProtos.ListKeysResponse;
|
||||||
import org.apache.hadoop.ozone.protocol.proto
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
.KeySpaceManagerProtocolProtos.Status;
|
.KeySpaceManagerProtocolProtos.Status;
|
||||||
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
|
.KeySpaceManagerProtocolProtos.ServiceListRequest;
|
||||||
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
|
.KeySpaceManagerProtocolProtos.ServiceListResponse;
|
||||||
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
|
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This class is the server-side translator that forwards requests received on
|
* This class is the server-side translator that forwards requests received on
|
||||||
|
@ -501,4 +509,19 @@ public class KeySpaceManagerProtocolServerSideTranslatorPB implements
|
||||||
}
|
}
|
||||||
return resp.build();
|
return resp.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ServiceListResponse getServiceList(RpcController controller,
|
||||||
|
ServiceListRequest request) throws ServiceException {
|
||||||
|
ServiceListResponse.Builder resp = ServiceListResponse.newBuilder();
|
||||||
|
try {
|
||||||
|
resp.addAllServiceInfo(impl.getServiceList().stream()
|
||||||
|
.map(ServiceInfo::getProtobuf)
|
||||||
|
.collect(Collectors.toList()));
|
||||||
|
resp.setStatus(Status.OK);
|
||||||
|
} catch (IOException e) {
|
||||||
|
resp.setStatus(exceptionToResponseStatus(e));
|
||||||
|
}
|
||||||
|
return resp.build();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,6 +21,7 @@ import org.apache.commons.lang.RandomStringUtils;
|
||||||
import org.apache.hadoop.fs.StorageType;
|
import org.apache.hadoop.fs.StorageType;
|
||||||
import org.apache.hadoop.hdfs.DFSUtil;
|
import org.apache.hadoop.hdfs.DFSUtil;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.ObjectStoreHandler;
|
import org.apache.hadoop.hdfs.server.datanode.ObjectStoreHandler;
|
||||||
|
import org.apache.hadoop.net.NetUtils;
|
||||||
import org.apache.hadoop.ozone.MiniOzoneClassicCluster;
|
import org.apache.hadoop.ozone.MiniOzoneClassicCluster;
|
||||||
import org.apache.hadoop.ozone.MiniOzoneCluster;
|
import org.apache.hadoop.ozone.MiniOzoneCluster;
|
||||||
import org.apache.hadoop.ozone.OzoneConfigKeys;
|
import org.apache.hadoop.ozone.OzoneConfigKeys;
|
||||||
|
@ -30,6 +31,10 @@ import org.apache.hadoop.ozone.common.BlockGroup;
|
||||||
import org.apache.hadoop.ozone.client.rest.OzoneException;
|
import org.apache.hadoop.ozone.client.rest.OzoneException;
|
||||||
import org.apache.hadoop.ozone.ksm.exceptions.KSMException;
|
import org.apache.hadoop.ozone.ksm.exceptions.KSMException;
|
||||||
import org.apache.hadoop.ozone.scm.SCMStorage;
|
import org.apache.hadoop.ozone.scm.SCMStorage;
|
||||||
|
import org.apache.hadoop.ozone.ksm.helpers.ServiceInfo;
|
||||||
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
|
.KeySpaceManagerProtocolProtos.ServicePort;
|
||||||
|
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
|
||||||
import org.apache.hadoop.ozone.web.handlers.BucketArgs;
|
import org.apache.hadoop.ozone.web.handlers.BucketArgs;
|
||||||
import org.apache.hadoop.ozone.web.handlers.KeyArgs;
|
import org.apache.hadoop.ozone.web.handlers.KeyArgs;
|
||||||
import org.apache.hadoop.ozone.web.handlers.UserArgs;
|
import org.apache.hadoop.ozone.web.handlers.UserArgs;
|
||||||
|
@ -66,6 +71,7 @@ import java.io.InputStream;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
import java.nio.file.Path;
|
import java.nio.file.Path;
|
||||||
import java.nio.file.Paths;
|
import java.nio.file.Paths;
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
import java.text.ParseException;
|
import java.text.ParseException;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
|
@ -80,6 +86,9 @@ import java.util.stream.Stream;
|
||||||
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OPEN_KEY_CLEANUP_SERVICE_INTERVAL_SECONDS;
|
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OPEN_KEY_CLEANUP_SERVICE_INTERVAL_SECONDS;
|
||||||
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS;
|
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS;
|
||||||
import static org.apache.hadoop.ozone.OzoneConsts.DELETING_KEY_PREFIX;
|
import static org.apache.hadoop.ozone.OzoneConsts.DELETING_KEY_PREFIX;
|
||||||
|
import static org.apache.hadoop.ozone.ksm.KSMConfigKeys.OZONE_KSM_ADDRESS_KEY;
|
||||||
|
import static org.apache.hadoop.scm.ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test Key Space Manager operation in distributed handler scenario.
|
* Test Key Space Manager operation in distributed handler scenario.
|
||||||
*/
|
*/
|
||||||
|
@ -1063,7 +1072,7 @@ public class TestKeySpaceManager {
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testGetScmInfo() throws IOException {
|
public void testGetScmInfo() throws IOException {
|
||||||
ScmInfo info = cluster.getKeySpaceManager().getScmInfo(conf);
|
ScmInfo info = cluster.getKeySpaceManager().getScmInfo();
|
||||||
Assert.assertEquals(clusterId, info.getClusterId());
|
Assert.assertEquals(clusterId, info.getClusterId());
|
||||||
Assert.assertEquals(scmId, info.getScmId());
|
Assert.assertEquals(scmId, info.getScmId());
|
||||||
}
|
}
|
||||||
|
@ -1185,4 +1194,29 @@ public class TestKeySpaceManager {
|
||||||
exception.expectMessage("SCM version info mismatch.");
|
exception.expectMessage("SCM version info mismatch.");
|
||||||
KeySpaceManager.createKSM(null, conf);
|
KeySpaceManager.createKSM(null, conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetServiceList() throws IOException {
|
||||||
|
long numGetServiceListCalls = ksmMetrics.getNumGetServiceLists();
|
||||||
|
List<ServiceInfo> services = cluster.getKeySpaceManager().getServiceList();
|
||||||
|
|
||||||
|
Assert.assertEquals(numGetServiceListCalls + 1,
|
||||||
|
ksmMetrics.getNumGetServiceLists());
|
||||||
|
|
||||||
|
ServiceInfo ksmInfo = services.stream().filter(
|
||||||
|
a -> a.getNodeType().equals(OzoneProtos.NodeType.KSM))
|
||||||
|
.collect(Collectors.toList()).get(0);
|
||||||
|
InetSocketAddress ksmAddress = new InetSocketAddress(ksmInfo.getHostname(),
|
||||||
|
ksmInfo.getPort(ServicePort.Type.RPC));
|
||||||
|
Assert.assertEquals(NetUtils.createSocketAddr(
|
||||||
|
conf.get(OZONE_KSM_ADDRESS_KEY)), ksmAddress);
|
||||||
|
|
||||||
|
ServiceInfo scmInfo = services.stream().filter(
|
||||||
|
a -> a.getNodeType().equals(OzoneProtos.NodeType.SCM))
|
||||||
|
.collect(Collectors.toList()).get(0);
|
||||||
|
InetSocketAddress scmAddress = new InetSocketAddress(scmInfo.getHostname(),
|
||||||
|
scmInfo.getPort(ServicePort.Type.RPC));
|
||||||
|
Assert.assertEquals(NetUtils.createSocketAddr(
|
||||||
|
conf.get(OZONE_SCM_CLIENT_ADDRESS_KEY)), scmAddress);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue