HDFS-10363. Ozone: Introduce new config keys for SCM service endpoints. Contributed by Arpit Agarwal.

This commit is contained in:
Anu Engineer 2016-05-19 11:02:40 -07:00 committed by Owen O'Malley
parent 6c61902389
commit 3beee0b3d1
8 changed files with 633 additions and 148 deletions

View File

@ -17,14 +17,7 @@
*/
package org.apache.hadoop.hdfs.server.datanode;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_TRACE_ENABLED_KEY;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_TRACE_ENABLED_DEFAULT;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_HANDLER_TYPE_DEFAULT;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_HANDLER_TYPE_KEY;
import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_LOCATION_RPC_ADDRESS_DEFAULT;
import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_LOCATION_RPC_ADDRESS_KEY;
import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_LOCATION_RPC_BIND_HOST_KEY;
import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_LOCATION_RPC_DEFAULT_PORT;
import static org.apache.hadoop.ozone.OzoneConfigKeys.*;
import static com.sun.jersey.api.core.ResourceConfig.PROPERTY_CONTAINER_REQUEST_FILTERS;
import static com.sun.jersey.api.core.ResourceConfig.FEATURE_TRACE;
@ -37,6 +30,7 @@
import com.sun.jersey.api.container.ContainerFactory;
import com.sun.jersey.api.core.ApplicationAdapter;
import org.apache.hadoop.ozone.OzoneClientUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -91,11 +85,8 @@ public ObjectStoreHandler(Configuration conf) throws IOException {
ProtobufRpcEngine.class);
long version =
RPC.getProtocolVersion(StorageContainerLocationProtocolPB.class);
InetSocketAddress address = conf.getSocketAddr(
DFS_CONTAINER_LOCATION_RPC_BIND_HOST_KEY,
DFS_CONTAINER_LOCATION_RPC_ADDRESS_KEY,
DFS_CONTAINER_LOCATION_RPC_ADDRESS_DEFAULT,
DFS_CONTAINER_LOCATION_RPC_DEFAULT_PORT);
InetSocketAddress address =
OzoneClientUtils.getScmAddressForClients(conf);
this.storageContainerLocationClient =
new StorageContainerLocationProtocolClientSideTranslatorPB(
RPC.getProxy(StorageContainerLocationProtocolPB.class, version,

View File

@ -0,0 +1,239 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone;
import com.google.common.base.Optional;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.NetUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Map;
import static org.apache.hadoop.ozone.OzoneConfigKeys.*;
/**
* Utility methods for Ozone and Container Clients.
*
* The methods to retrieve SCM service endpoints assume there is a single
* SCM service instance. This will change when we switch to replicated service
* instances for redundancy.
*/
@InterfaceAudience.Public
@InterfaceStability.Unstable
public final class OzoneClientUtils {
private static final Logger LOG = LoggerFactory.getLogger(
OzoneClientUtils.class);
/**
* The service ID of the solitary Ozone SCM service.
*/
public static final String OZONE_SCM_SERVICE_ID = "OzoneScmService";
public static final String OZONE_SCM_SERVICE_INSTANCE_ID =
"OzoneScmServiceInstance";
private OzoneClientUtils() {
// Never constructed
}
/**
* Retrieve the socket address that should be used by clients to connect
* to the SCM.
*
* @param conf
* @return Target InetSocketAddress for the SCM client endpoint.
*/
public static InetSocketAddress getScmAddressForClients(Configuration conf) {
final Optional<String> host = getHostNameFromConfigKeys(conf,
OZONE_SCM_CLIENT_ADDRESS_KEY);
if (!host.isPresent()) {
throw new IllegalArgumentException(OZONE_SCM_CLIENT_ADDRESS_KEY +
" must be defined. See" +
" https://wiki.apache.org/hadoop/Ozone#Configuration for details" +
" on configuring Ozone.");
}
final Optional<Integer> port = getPortNumberFromConfigKeys(conf,
OZONE_SCM_CLIENT_ADDRESS_KEY);
return NetUtils.createSocketAddr(host.get() + ":" +
port.or(OZONE_SCM_CLIENT_PORT_DEFAULT));
}
/**
* Retrieve the socket address that should be used by DataNodes to connect
* to the SCM.
*
* @param conf
* @return Target InetSocketAddress for the SCM service endpoint.
*/
public static InetSocketAddress getScmAddressForDataNodes(
Configuration conf) {
// We try the following settings in decreasing priority to retrieve the
// target host.
// - OZONE_SCM_DATANODE_ADDRESS_KEY
// - OZONE_SCM_CLIENT_ADDRESS_KEY
//
final Optional<String> host = getHostNameFromConfigKeys(conf,
OZONE_SCM_DATANODE_ADDRESS_KEY, OZONE_SCM_CLIENT_ADDRESS_KEY);
if (!host.isPresent()) {
throw new IllegalArgumentException(OZONE_SCM_CLIENT_ADDRESS_KEY +
" must be defined. See" +
" https://wiki.apache.org/hadoop/Ozone#Configuration for details" +
" on configuring Ozone.");
}
// If no port number is specified then we'll just try the defaultBindPort.
final Optional<Integer> port = getPortNumberFromConfigKeys(conf,
OZONE_SCM_DATANODE_ADDRESS_KEY);
return NetUtils.createSocketAddr(host.get() + ":" +
port.or(OZONE_SCM_DATANODE_PORT_DEFAULT));
}
/**
* Retrieve the socket address that should be used by clients to connect
* to the SCM.
*
* @param conf
* @return Target InetSocketAddress for the SCM client endpoint.
*/
public static InetSocketAddress getScmClientBindAddress(
Configuration conf) {
final Optional<String> host = getHostNameFromConfigKeys(conf,
OZONE_SCM_CLIENT_BIND_HOST_KEY);
final Optional<Integer> port = getPortNumberFromConfigKeys(conf,
OZONE_SCM_CLIENT_ADDRESS_KEY);
return NetUtils.createSocketAddr(
host.or(OZONE_SCM_CLIENT_BIND_HOST_DEFAULT) + ":" +
port.or(OZONE_SCM_CLIENT_PORT_DEFAULT));
}
/**
* Retrieve the socket address that should be used by DataNodes to connect
* to the SCM.
*
* @param conf
* @return Target InetSocketAddress for the SCM service endpoint.
*/
public static InetSocketAddress getScmDataNodeBindAddress(
Configuration conf) {
final Optional<String> host = getHostNameFromConfigKeys(conf,
OZONE_SCM_DATANODE_BIND_HOST_KEY);
// If no port number is specified then we'll just try the defaultBindPort.
final Optional<Integer> port = getPortNumberFromConfigKeys(conf,
OZONE_SCM_DATANODE_ADDRESS_KEY);
return NetUtils.createSocketAddr(
host.or(OZONE_SCM_DATANODE_BIND_HOST_DEFAULT) + ":" +
port.or(OZONE_SCM_DATANODE_PORT_DEFAULT));
}
/**
* Retrieve the hostname, trying the supplied config keys in order.
* Each config value may be absent, or if present in the format
* host:port (the :port part is optional).
*
* @param conf
* @param keys a list of configuration key names.
*
* @return first hostname component found from the given keys, or absent.
* @throws IllegalArgumentException if any values are not in the 'host'
* or host:port format.
*/
static Optional<String> getHostNameFromConfigKeys(
Configuration conf, String ... keys) {
for (final String key : keys) {
final String value = conf.getTrimmed(key);
if (value != null && !value.isEmpty()) {
String[] splits = value.split(":");
if(splits.length < 1 || splits.length > 2) {
throw new IllegalArgumentException(
"Invalid value " + value + " for config key " + key +
". It should be in 'host' or 'host:port' format");
}
return Optional.of(splits[0]);
}
}
return Optional.absent();
}
/**
* Retrieve the port number, trying the supplied config keys in order.
* Each config value may be absent, or if present in the format
* host:port (the :port part is optional).
*
* @param conf
* @param keys a list of configuration key names.
*
* @return first port number component found from the given keys, or absent.
* @throws IllegalArgumentException if any values are not in the 'host'
* or host:port format.
*/
static Optional<Integer> getPortNumberFromConfigKeys(
Configuration conf, String ... keys) {
for (final String key : keys) {
final String value = conf.getTrimmed(key);
if (value != null && !value.isEmpty()) {
String[] splits = value.split(":");
if(splits.length < 1 || splits.length > 2) {
throw new IllegalArgumentException(
"Invalid value " + value + " for config key " + key +
". It should be in 'host' or 'host:port' format");
}
if (splits.length == 2) {
return Optional.of(Integer.parseInt(splits[1]));
}
}
}
return Optional.absent();
}
/**
* Return the list of service addresses for the Ozone SCM. This method is used
* by the DataNodes to determine the service instances to connect to.
*
* @param conf
* @return list of SCM service addresses.
*/
public static Map<String, ? extends Map<String, InetSocketAddress>>
getScmServiceRpcAddresses(Configuration conf) {
final Map<String, InetSocketAddress> serviceInstances = new HashMap<>();
serviceInstances.put(OZONE_SCM_SERVICE_INSTANCE_ID,
getScmAddressForDataNodes(conf));
final Map<String, Map<String, InetSocketAddress>> services =
new HashMap<>();
services.put(OZONE_SCM_SERVICE_ID, serviceInstances);
return services;
}
}

View File

@ -19,15 +19,17 @@
package org.apache.hadoop.ozone;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* This class contains constants for configuration keys used in Ozone.
*/
@InterfaceAudience.Private
@InterfaceAudience.Public
@InterfaceStability.Unstable
public final class OzoneConfigKeys {
public static final String DFS_CONTAINER_IPC_PORT =
"dfs.container.ipc";
public static final int DFS_CONTAINER_IPC_PORT_DEFAULT = 50011;
public static final int DFS_CONTAINER_IPC_PORT_DEFAULT = 50011;
public static final String OZONE_LOCALSTORAGE_ROOT =
"ozone.localstorage.root";
public static final String OZONE_LOCALSTORAGE_ROOT_DEFAULT = "/tmp/ozone";
@ -37,16 +39,6 @@ public final class OzoneConfigKeys {
public static final String OZONE_HANDLER_TYPE_KEY =
"ozone.handler.type";
public static final String OZONE_HANDLER_TYPE_DEFAULT = "distributed";
public static final String DFS_CONTAINER_LOCATION_RPC_ADDRESS_KEY =
"dfs.container.location.rpc-address";
public static final int DFS_CONTAINER_LOCATION_RPC_DEFAULT_PORT = 50200;
public static final String DFS_CONTAINER_LOCATION_RPC_ADDRESS_DEFAULT =
"0.0.0.0:" + DFS_CONTAINER_LOCATION_RPC_DEFAULT_PORT;
public static final String DFS_CONTAINER_LOCATION_RPC_BIND_HOST_KEY =
"dfs.container.rpc-bind-host";
public static final String DFS_CONTAINER_LOCATION_HANDLER_COUNT_KEY =
"dfs.container.handler.count";
public static final int DFS_CONTAINER_HANDLER_COUNT_DEFAULT = 10;
public static final String OZONE_TRACE_ENABLED_KEY =
"ozone.trace.enabled";
public static final boolean OZONE_TRACE_ENABLED_DEFAULT = false;
@ -57,7 +49,26 @@ public final class OzoneConfigKeys {
public static final String OZONE_KEY_CACHE = "ozone.key.cache.size";
public static final int OZONE_KEY_CACHE_DEFAULT = 1024;
public static final int OZONE_SCM_CLIENT_PORT_DEFAULT = 9860;
public static final int OZONE_SCM_DATANODE_PORT_DEFAULT = 9861;
public static final String OZONE_SCM_CLIENT_ADDRESS_KEY =
"ozone.scm.client.address";
public static final String OZONE_SCM_CLIENT_BIND_HOST_KEY =
"ozone.scm.client.bind.host";
public static final String OZONE_SCM_CLIENT_BIND_HOST_DEFAULT =
"0.0.0.0";
public static final String OZONE_SCM_DATANODE_ADDRESS_KEY =
"ozone.scm.datanode.address";
public static final String OZONE_SCM_DATANODE_BIND_HOST_KEY =
"ozone.scm.datanode.bind.host";
public static final String OZONE_SCM_DATANODE_BIND_HOST_DEFAULT =
"0.0.0.0";
public static final String OZONE_SCM_HANDLER_COUNT_KEY =
"ozone.scm.handler.count.key";
public static final int OZONE_SCM_HANDLER_COUNT_DEFAULT = 10;
/**
* There is no need to instantiate this class.

View File

@ -20,20 +20,7 @@
import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH;
import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_BIND_HOST_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_BIND_HOST_KEY;
import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_HANDLER_COUNT_DEFAULT;
import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_LOCATION_HANDLER_COUNT_KEY;
import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_LOCATION_RPC_ADDRESS_DEFAULT;
import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_LOCATION_RPC_ADDRESS_KEY;
import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_LOCATION_RPC_BIND_HOST_KEY;
import static org.apache.hadoop.ozone.OzoneConfigKeys.*;
import static org.apache.hadoop.util.ExitUtil.terminate;
import java.io.IOException;
@ -47,19 +34,21 @@
import com.google.common.collect.Sets;
import com.google.protobuf.BlockingService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerData;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.CreateContainerRequestProto;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Result;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Type;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@ -73,7 +62,6 @@
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
@ -88,8 +76,8 @@
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.ozone.OzoneConfiguration;
import org.apache.hadoop.ozone.OzoneClientUtils;
import org.apache.hadoop.ozone.container.common.helpers.Pipeline;
import org.apache.hadoop.ozone.container.common.transport.client.XceiverClient;
import org.apache.hadoop.ozone.container.common.transport.client.XceiverClientManager;
@ -128,17 +116,13 @@ public class StorageContainerManager
private Pipeline singlePipeline;
/** The RPC server that listens to requests from DataNodes. */
private final RPC.Server serviceRpcServer;
private final InetSocketAddress serviceRpcAddress;
private final RPC.Server datanodeRpcServer;
private final InetSocketAddress datanodeRpcAddress;
/** The RPC server that listens to requests from clients. */
private final RPC.Server clientRpcServer;
private final InetSocketAddress clientRpcAddress;
/** The RPC server that listens to requests from nodes to find containers. */
private final RPC.Server storageRpcServer;
private final InetSocketAddress storageRpcAddress;
/**
* Creates a new StorageContainerManager. Configuration will be updated with
* information on the actual listening addresses used for RPC servers.
@ -157,58 +141,38 @@ public StorageContainerManager(OzoneConfiguration conf)
RPC.setProtocolEngine(conf, StorageContainerLocationProtocolPB.class,
ProtobufRpcEngine.class);
int maxDataLength = conf.getInt(IPC_MAXIMUM_DATA_LENGTH,
final int handlerCount = conf.getInt(
OZONE_SCM_HANDLER_COUNT_KEY, OZONE_SCM_HANDLER_COUNT_DEFAULT);
final int maxDataLength = conf.getInt(IPC_MAXIMUM_DATA_LENGTH,
IPC_MAXIMUM_DATA_LENGTH_DEFAULT);
BlockingService dnProtoPbService =
DatanodeProtocolProtos
.DatanodeProtocolService
.newReflectiveBlockingService(
BlockingService dnProtoPbService = DatanodeProtocolProtos.
DatanodeProtocolService.newReflectiveBlockingService(
new DatanodeProtocolServerSideTranslatorPB(this, maxDataLength));
InetSocketAddress serviceRpcAddr = NameNode.getServiceAddress(conf, false);
serviceRpcServer = startRpcServer(conf, serviceRpcAddr,
DatanodeProtocolPB.class, dnProtoPbService,
DFS_NAMENODE_SERVICE_RPC_BIND_HOST_KEY,
DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY,
DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT);
serviceRpcAddress = updateListenAddress(conf,
DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, serviceRpcAddr, serviceRpcServer);
final InetSocketAddress datanodeRpcAddr =
OzoneClientUtils.getScmDataNodeBindAddress(conf);
datanodeRpcServer = startRpcServer(conf, datanodeRpcAddr,
DatanodeProtocolPB.class, dnProtoPbService, handlerCount);
datanodeRpcAddress = updateListenAddress(conf,
OZONE_SCM_DATANODE_ADDRESS_KEY, datanodeRpcAddr, datanodeRpcServer);
LOG.info(buildRpcServerStartMessage("Service RPC server",
serviceRpcAddress));
InetSocketAddress rpcAddr = DFSUtilClient.getNNAddress(conf);
clientRpcServer = startRpcServer(conf, rpcAddr,
DatanodeProtocolPB.class, dnProtoPbService,
DFS_NAMENODE_RPC_BIND_HOST_KEY,
DFS_NAMENODE_HANDLER_COUNT_KEY,
DFS_NAMENODE_HANDLER_COUNT_DEFAULT);
clientRpcAddress = updateListenAddress(conf,
DFS_NAMENODE_RPC_ADDRESS_KEY, rpcAddr, clientRpcServer);
conf.set(FS_DEFAULT_NAME_KEY, DFSUtilClient.getNNUri(clientRpcAddress)
.toString());
LOG.info(buildRpcServerStartMessage("RPC server", clientRpcAddress));
datanodeRpcAddress));
BlockingService storageProtoPbService =
StorageContainerLocationProtocolProtos
.StorageContainerLocationProtocolService
.newReflectiveBlockingService(
new StorageContainerLocationProtocolServerSideTranslatorPB(this));
.StorageContainerLocationProtocolService
.newReflectiveBlockingService(
new StorageContainerLocationProtocolServerSideTranslatorPB(this));
InetSocketAddress storageRpcAddr = NetUtils.createSocketAddr(
conf.getTrimmed(DFS_CONTAINER_LOCATION_RPC_ADDRESS_KEY,
DFS_CONTAINER_LOCATION_RPC_ADDRESS_DEFAULT),
-1, DFS_CONTAINER_LOCATION_RPC_ADDRESS_KEY);
storageRpcServer = startRpcServer(conf, storageRpcAddr,
final InetSocketAddress clientRpcAddr =
OzoneClientUtils.getScmClientBindAddress(conf);
clientRpcServer = startRpcServer(conf, clientRpcAddr,
StorageContainerLocationProtocolPB.class, storageProtoPbService,
DFS_CONTAINER_LOCATION_RPC_BIND_HOST_KEY,
DFS_CONTAINER_LOCATION_HANDLER_COUNT_KEY,
DFS_CONTAINER_HANDLER_COUNT_DEFAULT);
storageRpcAddress = updateListenAddress(conf,
DFS_CONTAINER_LOCATION_RPC_ADDRESS_KEY,
storageRpcAddr, storageRpcServer);
handlerCount);
clientRpcAddress = updateListenAddress(conf,
OZONE_SCM_CLIENT_ADDRESS_KEY, clientRpcAddr, clientRpcServer);
LOG.info(buildRpcServerStartMessage(
"StorageContainerLocationProtocol RPC server", storageRpcAddress));
"StorageContainerLocationProtocol RPC server", clientRpcAddress));
}
@Override
@ -216,7 +180,7 @@ public Set<LocatedContainer> getStorageContainerLocations(Set<String> keys)
throws IOException {
LOG.trace("getStorageContainerLocations keys = {}", keys);
Pipeline pipeline = initSingleContainerPipeline();
List<DatanodeDescriptor> liveNodes = new ArrayList<DatanodeDescriptor>();
List<DatanodeDescriptor> liveNodes = new ArrayList<>();
blockManager.getDatanodeManager().fetchDatanodes(liveNodes, null, false);
if (liveNodes.isEmpty()) {
throw new IOException("Storage container locations not found.");
@ -385,13 +349,13 @@ public DatanodeInfo[] getDatanodeReport(DatanodeReportType type) {
}
/**
* Returns listen address of StorageContainerLocation RPC server.
* Returns listen address of client RPC server.
*
* @return listen address of StorageContainerLocation RPC server
* @return listen address of client RPC server
*/
@VisibleForTesting
public InetSocketAddress getStorageContainerLocationRpcAddress() {
return storageRpcAddress;
public InetSocketAddress getClientRpcAddress() {
return clientRpcAddress;
}
/**
@ -399,10 +363,7 @@ public InetSocketAddress getStorageContainerLocationRpcAddress() {
*/
public void start() {
clientRpcServer.start();
if (serviceRpcServer != null) {
serviceRpcServer.start();
}
storageRpcServer.start();
datanodeRpcServer.start();
}
/**
@ -412,11 +373,8 @@ public void stop() {
if (clientRpcServer != null) {
clientRpcServer.stop();
}
if (serviceRpcServer != null) {
serviceRpcServer.stop();
}
if (storageRpcServer != null) {
storageRpcServer.stop();
if (datanodeRpcServer != null) {
datanodeRpcServer.stop();
}
IOUtils.closeStream(ns);
}
@ -427,10 +385,7 @@ public void stop() {
public void join() {
try {
clientRpcServer.join();
if (serviceRpcServer != null) {
serviceRpcServer.join();
}
storageRpcServer.join();
datanodeRpcServer.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.info("Interrupted during StorageContainerManager join.");
@ -497,7 +452,7 @@ private synchronized Pipeline initSingleContainerPipeline()
private static String buildRpcServerStartMessage(String description,
InetSocketAddress addr) {
return addr != null ? String.format("%s is listening at %s",
description, NetUtils.getHostPortString(addr)) :
description, addr.getHostString() + ":" + addr.getPort()) :
String.format("%s not started", description);
}
@ -527,59 +482,47 @@ private static Pipeline newPipelineFromNodes(List<DatanodeDescriptor> nodes,
* @param addr configured address of RPC server
* @param protocol RPC protocol provided by RPC server
* @param instance RPC protocol implementation instance
* @param bindHostKey configuration key for setting explicit bind host. If
* the property is not configured, then the bind host is taken from addr.
* @param handlerCountKey configuration key for RPC server handler count
* @param handlerCountDefault default RPC server handler count if unconfigured
* @return RPC server, or null if addr is null
* @param handlerCount RPC server handler count
*
* @return RPC server
* @throws IOException if there is an I/O error while creating RPC server
*/
private static RPC.Server startRpcServer(OzoneConfiguration conf,
InetSocketAddress addr, Class<?> protocol, BlockingService instance,
String bindHostKey, String handlerCountKey, int handlerCountDefault)
int handlerCount)
throws IOException {
if (addr == null) {
return null;
}
String bindHost = conf.getTrimmed(bindHostKey);
if (bindHost == null || bindHost.isEmpty()) {
bindHost = addr.getHostName();
}
int numHandlers = conf.getInt(handlerCountKey, handlerCountDefault);
RPC.Server rpcServer = new RPC.Builder(conf)
.setProtocol(protocol)
.setInstance(instance)
.setBindAddress(bindHost)
.setBindAddress(addr.getHostString())
.setPort(addr.getPort())
.setNumHandlers(numHandlers)
.setNumHandlers(handlerCount)
.setVerbose(false)
.setSecretManager(null)
.build();
DFSUtil.addPBProtocol(conf, protocol, instance, rpcServer);
return rpcServer;
}
/**
* After starting an RPC server, updates configuration with the actual
* listening address of that server. The listening address may be different
* listening address of that server. The listening address may be different
* from the configured address if, for example, the configured address uses
* port 0 to request use of an ephemeral port.
*
* @param conf configuration to update
* @param rpcAddressKey configuration key for RPC server address
* @param addr configured address
* @param rpcServer started RPC server. If null, then the server was not
* started, and this method is a no-op.
* @param rpcServer started RPC server.
*/
private static InetSocketAddress updateListenAddress(OzoneConfiguration conf,
String rpcAddressKey, InetSocketAddress addr, RPC.Server rpcServer) {
if (rpcServer == null) {
return null;
}
InetSocketAddress listenAddr = rpcServer.getListenerAddress();
InetSocketAddress updatedAddr = new InetSocketAddress(
addr.getHostName(), listenAddr.getPort());
conf.set(rpcAddressKey, NetUtils.getHostPortString(updatedAddr));
addr.getHostString(), listenAddr.getPort());
conf.set(rpcAddressKey,
addr.getHostString() + ":" + listenAddr.getPort());
return updatedAddr;
}

View File

@ -26,6 +26,7 @@
import org.apache.commons.cli.ParseException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.ozone.OzoneConfiguration;
import org.apache.hadoop.ozone.web.exceptions.OzoneException;
import org.apache.hadoop.ozone.web.ozShell.bucket.UpdateBucketHandler;
import org.apache.hadoop.ozone.web.ozShell.keys.DeleteKeyHandler;
@ -97,7 +98,7 @@ public class Shell extends Configured implements Tool {
*/
public static void main(String[] argv) throws Exception {
Shell shell = new Shell();
Configuration conf = new Configuration();
Configuration conf = new OzoneConfiguration();
conf.setQuietMode(false);
shell.setConf(conf);
int res = 0;

View File

@ -23,5 +23,91 @@
<!-- there. If ozone-site.xml does not already exist, create it. -->
<configuration>
<property>
<name>ozone.enabled</name>
<value>false</value>
<description>
Status of the Ozone Object Storage service is enabled.
Set to true to enable Ozone.
Set to false to disable Ozone.
</description>
</property>
<property>
<name>ozone.handler.type</name>
<value>local</value>
<description>
The second key dfs.storage.handler.type tells ozone which storage
handler to use. The possible values are:
ozone - The Ozone distributed storage handler.
local - Local Storage handler strictly for testing.
</description>
</property>
<property>
<name>ozone.scm.client.address</name>
<value></value>
<description>
The address of the Ozone SCM client service. This is a required
setting.
It is a string in the host:port format. The port number is optional
and defaults to 9860.
</description>
</property>
<property>
<name>ozone.scm.datanode.address</name>
<value></value>
<description>
The address of the Ozone SCM service used for internal communication
between the DataNodes and the SCM.
It is a string in the host:port format. The port number is optional
and defaults to 9861.
This setting is optional. If unspecified then the hostname portion
is picked from the ozone.scm.client.address setting and the
default service port of 9861 is chosen.
</description>
</property>
<property>
<name>ozone.scm.client.bind.host</name>
<value>0.0.0.0</value>
<description>
The hostname or IP address used by the SCM client endpoint to bind.
This setting is used by the SCM only and never used by clients.
The setting can be useful in multi-homed setups to restrict the
availability of the SCM client service to a specific interface.
The default is appropriate for most clusters.
</description>
</property>
<property>
<name>ozone.scm.internal.bind.host</name>
<value>0.0.0.0</value>
<description>
The hostname or IP address used by the SCM DataNode handler service
to bind. This setting is used by the SCM only and never used by
clients or Data Nodes.
The setting can be useful in multi-homed setups to restrict the
availability of the SCM to a specific interface.
The default is appropriate for most clusters.
</description>
</property>
<property>
<name>ozone.scm.handler.count.key</name>
<value>20</value>
<description>
The number of RPC handler threads for each SCM service endpoint.
The default is appropriate for small clusters (tens of nodes).
</description>
</property>
</configuration>

View File

@ -17,9 +17,6 @@
*/
package org.apache.hadoop.ozone;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY;
import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_LOCATION_RPC_ADDRESS_KEY;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
@ -105,11 +102,8 @@ public Builder numDataNodes(int val) {
@Override
public MiniOzoneCluster build() throws IOException {
// Even though this won't start a NameNode, some of the logic in
// MiniDFSCluster expects to find the default file system configured with
// an HDFS URI.
conf.set(FS_DEFAULT_NAME_KEY, "hdfs://127.0.0.1:0");
conf.set(DFS_CONTAINER_LOCATION_RPC_ADDRESS_KEY, "127.0.0.1:0");
conf.set(OzoneConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY, "127.0.0.1:0");
conf.set(OzoneConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY, "127.0.0.1:0");
StorageContainerManager scm = new StorageContainerManager(conf);
scm.start();
return new MiniOzoneCluster(this, scm);
@ -169,7 +163,7 @@ public OzoneClient createOzoneClient() throws OzoneException {
createStorageContainerLocationClient() throws IOException {
long version = RPC.getProtocolVersion(
StorageContainerLocationProtocolPB.class);
InetSocketAddress address = scm.getStorageContainerLocationRpcAddress();
InetSocketAddress address = scm.getClientRpcAddress();
LOG.info(
"Creating StorageContainerLocationProtocol RPC client with address {}",
address);

View File

@ -0,0 +1,220 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone;
import org.apache.hadoop.conf.Configuration;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.Rule;
import org.junit.rules.Timeout;
import java.net.InetSocketAddress;
import static org.apache.hadoop.ozone.OzoneConfigKeys.*;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;
/**
* This test class verifies the parsing of SCM endpoint config settings.
* The parsing logic is in {@link OzoneClientUtils}.
*/
public class TestOzoneClientUtils {
@Rule
public Timeout timeout = new Timeout(300000);
@Rule
public ExpectedException thrown= ExpectedException.none();
/**
* Verify client endpoint lookup failure if it is not configured.
*/
@Test
public void testMissingScmClientAddress() {
final Configuration conf = new OzoneConfiguration();
thrown.expect(IllegalArgumentException.class);
OzoneClientUtils.getScmAddressForClients(conf);
}
/**
* Verify that the client endpoint can be correctly parsed from
* configuration.
*/
@Test
public void testGetScmClientAddress() {
final Configuration conf = new OzoneConfiguration();
// First try a client address with just a host name. Verify it falls
// back to the default port.
conf.set(OZONE_SCM_CLIENT_ADDRESS_KEY, "1.2.3.4");
InetSocketAddress addr = OzoneClientUtils.getScmAddressForClients(conf);
assertThat(addr.getHostString(), is("1.2.3.4"));
assertThat(addr.getPort(), is(OZONE_SCM_CLIENT_PORT_DEFAULT));
// Next try a client address with a host name and port. Verify both
// are used correctly.
conf.set(OZONE_SCM_CLIENT_ADDRESS_KEY, "1.2.3.4:100");
addr = OzoneClientUtils.getScmAddressForClients(conf);
assertThat(addr.getHostString(), is("1.2.3.4"));
assertThat(addr.getPort(), is(100));
}
/**
* Verify DataNode endpoint lookup failure if neither the client nor
* datanode endpoint are configured.
*/
@Test
public void testMissingScmDataNodeAddress() {
final Configuration conf = new OzoneConfiguration();
thrown.expect(IllegalArgumentException.class);
OzoneClientUtils.getScmAddressForDataNodes(conf);
}
/**
* Verify that the datanode endpoint is parsed correctly.
* This tests the logic used by the DataNodes to determine which address
* to connect to.
*/
@Test
public void testGetScmDataNodeAddress() {
final Configuration conf = new OzoneConfiguration();
// First try a client address with just a host name. Verify it falls
// back to the default port.
conf.set(OZONE_SCM_CLIENT_ADDRESS_KEY, "1.2.3.4");
InetSocketAddress addr = OzoneClientUtils.getScmAddressForDataNodes(conf);
assertThat(addr.getHostString(), is("1.2.3.4"));
assertThat(addr.getPort(), is(OZONE_SCM_DATANODE_PORT_DEFAULT));
// Next try a client address with just a host name and port. Verify the port
// is ignored and the default DataNode port is used.
conf.set(OZONE_SCM_CLIENT_ADDRESS_KEY, "1.2.3.4:100");
addr = OzoneClientUtils.getScmAddressForDataNodes(conf);
assertThat(addr.getHostString(), is("1.2.3.4"));
assertThat(addr.getPort(), is(OZONE_SCM_DATANODE_PORT_DEFAULT));
// Set both OZONE_SCM_CLIENT_ADDRESS_KEY and OZONE_SCM_DATANODE_ADDRESS_KEY.
// Verify that the latter overrides and the port number is still the default.
conf.set(OZONE_SCM_CLIENT_ADDRESS_KEY, "1.2.3.4:100");
conf.set(OZONE_SCM_DATANODE_ADDRESS_KEY, "5.6.7.8");
addr = OzoneClientUtils.getScmAddressForDataNodes(conf);
assertThat(addr.getHostString(), is("5.6.7.8"));
assertThat(addr.getPort(), is(OZONE_SCM_DATANODE_PORT_DEFAULT));
// Set both OZONE_SCM_CLIENT_ADDRESS_KEY and OZONE_SCM_DATANODE_ADDRESS_KEY.
// Verify that the latter overrides and the port number from the latter is
// used.
conf.set(OZONE_SCM_CLIENT_ADDRESS_KEY, "1.2.3.4:100");
conf.set(OZONE_SCM_DATANODE_ADDRESS_KEY, "5.6.7.8:200");
addr = OzoneClientUtils.getScmAddressForDataNodes(conf);
assertThat(addr.getHostString(), is("5.6.7.8"));
assertThat(addr.getPort(), is(200));
}
/**
* Verify that the client endpoint bind address is computed correctly.
* This tests the logic used by the SCM to determine its own bind address.
*/
@Test
public void testScmClientBindHostDefault() {
final Configuration conf = new OzoneConfiguration();
// The bind host should be 0.0.0.0 unless OZONE_SCM_CLIENT_BIND_HOST_KEY
// is set differently.
conf.set(OZONE_SCM_CLIENT_ADDRESS_KEY, "1.2.3.4");
InetSocketAddress addr = OzoneClientUtils.getScmClientBindAddress(conf);
assertThat(addr.getHostString(), is("0.0.0.0"));
assertThat(addr.getPort(), is(OZONE_SCM_CLIENT_PORT_DEFAULT));
// The bind host should be 0.0.0.0 unless OZONE_SCM_CLIENT_BIND_HOST_KEY
// is set differently. The port number from OZONE_SCM_CLIENT_ADDRESS_KEY
// should be respected.
conf.set(OZONE_SCM_CLIENT_ADDRESS_KEY, "1.2.3.4:100");
conf.set(OZONE_SCM_DATANODE_ADDRESS_KEY, "1.2.3.4:200");
addr = OzoneClientUtils.getScmClientBindAddress(conf);
assertThat(addr.getHostString(), is("0.0.0.0"));
assertThat(addr.getPort(), is(100));
// OZONE_SCM_CLIENT_BIND_HOST_KEY should be respected.
// Port number should be default if none is specified via
// OZONE_SCM_DATANODE_ADDRESS_KEY.
conf.set(OZONE_SCM_CLIENT_ADDRESS_KEY, "1.2.3.4");
conf.set(OZONE_SCM_DATANODE_ADDRESS_KEY, "1.2.3.4");
conf.set(OZONE_SCM_CLIENT_BIND_HOST_KEY, "5.6.7.8");
addr = OzoneClientUtils.getScmClientBindAddress(conf);
assertThat(addr.getHostString(), is("5.6.7.8"));
assertThat(addr.getPort(), is(OZONE_SCM_CLIENT_PORT_DEFAULT));
// OZONE_SCM_CLIENT_BIND_HOST_KEY should be respected.
// Port number from OZONE_SCM_CLIENT_ADDRESS_KEY should be
// respected.
conf.set(OZONE_SCM_CLIENT_ADDRESS_KEY, "1.2.3.4:100");
conf.set(OZONE_SCM_DATANODE_ADDRESS_KEY, "1.2.3.4:200");
conf.set(OZONE_SCM_CLIENT_BIND_HOST_KEY, "5.6.7.8");
addr = OzoneClientUtils.getScmClientBindAddress(conf);
assertThat(addr.getHostString(), is("5.6.7.8"));
assertThat(addr.getPort(), is(100));
}
/**
* Verify that the DataNode endpoint bind address is computed correctly.
* This tests the logic used by the SCM to determine its own bind address.
*/
@Test
public void testScmDataNodeBindHostDefault() {
final Configuration conf = new OzoneConfiguration();
// The bind host should be 0.0.0.0 unless OZONE_SCM_DATANODE_BIND_HOST_KEY
// is set differently.
conf.set(OZONE_SCM_CLIENT_ADDRESS_KEY, "1.2.3.4");
InetSocketAddress addr = OzoneClientUtils.getScmDataNodeBindAddress(conf);
assertThat(addr.getHostString(), is("0.0.0.0"));
assertThat(addr.getPort(), is(OZONE_SCM_DATANODE_PORT_DEFAULT));
// The bind host should be 0.0.0.0 unless OZONE_SCM_DATANODE_BIND_HOST_KEY
// is set differently. The port number from OZONE_SCM_DATANODE_ADDRESS_KEY
// should be respected.
conf.set(OZONE_SCM_CLIENT_ADDRESS_KEY, "1.2.3.4:100");
conf.set(OZONE_SCM_DATANODE_ADDRESS_KEY, "1.2.3.4:200");
addr = OzoneClientUtils.getScmDataNodeBindAddress(conf);
assertThat(addr.getHostString(), is("0.0.0.0"));
assertThat(addr.getPort(), is(200));
// OZONE_SCM_DATANODE_BIND_HOST_KEY should be respected.
// Port number should be default if none is specified via
// OZONE_SCM_DATANODE_ADDRESS_KEY.
conf.set(OZONE_SCM_CLIENT_ADDRESS_KEY, "1.2.3.4:100");
conf.set(OZONE_SCM_DATANODE_ADDRESS_KEY, "1.2.3.4");
conf.set(OZONE_SCM_DATANODE_BIND_HOST_KEY, "5.6.7.8");
addr = OzoneClientUtils.getScmDataNodeBindAddress(conf);
assertThat(addr.getHostString(), is("5.6.7.8"));
assertThat(addr.getPort(), is(OZONE_SCM_DATANODE_PORT_DEFAULT));
// OZONE_SCM_DATANODE_BIND_HOST_KEY should be respected.
// Port number from OZONE_SCM_DATANODE_ADDRESS_KEY should be
// respected.
conf.set(OZONE_SCM_CLIENT_ADDRESS_KEY, "1.2.3.4:100");
conf.set(OZONE_SCM_DATANODE_ADDRESS_KEY, "1.2.3.4:200");
conf.set(OZONE_SCM_DATANODE_BIND_HOST_KEY, "5.6.7.8");
addr = OzoneClientUtils.getScmDataNodeBindAddress(conf);
assertThat(addr.getHostString(), is("5.6.7.8"));
assertThat(addr.getPort(), is(200));
}
}