HDDS-2007. Make ozone fs shell command work with OM HA service ids (#1360)
This commit is contained in:
parent
1843c4688a
commit
e22a324f87
|
@ -110,6 +110,35 @@ public final class OzoneClientFactory {
|
|||
return getRpcClient(omHost, omRpcPort, new OzoneConfiguration());
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns an OzoneClient which will use RPC protocol.
|
||||
*
|
||||
* @param omHost
|
||||
* hostname of OzoneManager to connect.
|
||||
*
|
||||
* @param omRpcPort
|
||||
* RPC port of OzoneManager.
|
||||
*
|
||||
* @param omServiceId
|
||||
* Service ID of OzoneManager HA cluster.
|
||||
*
|
||||
* @param config
|
||||
* Configuration to be used for OzoneClient creation
|
||||
*
|
||||
* @return OzoneClient
|
||||
*
|
||||
* @throws IOException
|
||||
*/
|
||||
public static OzoneClient getRpcClient(String omHost, Integer omRpcPort,
|
||||
String omServiceId, Configuration config) throws IOException {
|
||||
Preconditions.checkNotNull(omHost);
|
||||
Preconditions.checkNotNull(omRpcPort);
|
||||
Preconditions.checkNotNull(omServiceId);
|
||||
Preconditions.checkNotNull(config);
|
||||
config.set(OZONE_OM_ADDRESS_KEY, omHost + ":" + omRpcPort);
|
||||
return getRpcClient(omServiceId, config);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns an OzoneClient which will use RPC protocol.
|
||||
*
|
||||
|
@ -136,6 +165,28 @@ public final class OzoneClientFactory {
|
|||
return getRpcClient(config);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns an OzoneClient which will use RPC protocol.
|
||||
*
|
||||
* @param omServiceId
|
||||
* Service ID of OzoneManager HA cluster.
|
||||
*
|
||||
* @param config
|
||||
* Configuration to be used for OzoneClient creation
|
||||
*
|
||||
* @return OzoneClient
|
||||
*
|
||||
* @throws IOException
|
||||
*/
|
||||
public static OzoneClient getRpcClient(String omServiceId,
|
||||
Configuration config) throws IOException {
|
||||
Preconditions.checkNotNull(omServiceId);
|
||||
Preconditions.checkNotNull(config);
|
||||
// Won't set OZONE_OM_ADDRESS_KEY here since service id is passed directly,
|
||||
// leaving OZONE_OM_ADDRESS_KEY value as is.
|
||||
return getClient(getClientProtocol(config, omServiceId), config);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns an OzoneClient which will use RPC protocol.
|
||||
*
|
||||
|
@ -185,8 +236,24 @@ public final class OzoneClientFactory {
|
|||
*/
|
||||
private static ClientProtocol getClientProtocol(Configuration config)
|
||||
throws IOException {
|
||||
return getClientProtocol(config, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns an instance of Protocol class.
|
||||
*
|
||||
*
|
||||
* @param config
|
||||
* Configuration used to initialize ClientProtocol.
|
||||
*
|
||||
* @return ClientProtocol
|
||||
*
|
||||
* @throws IOException
|
||||
*/
|
||||
private static ClientProtocol getClientProtocol(Configuration config,
|
||||
String omServiceId) throws IOException {
|
||||
try {
|
||||
return new RpcClient(config);
|
||||
return new RpcClient(config, omServiceId);
|
||||
} catch (Exception e) {
|
||||
final String message = "Couldn't create RpcClient protocol";
|
||||
LOG.error(message + " exception: ", e);
|
||||
|
|
|
@ -144,10 +144,11 @@ public class RpcClient implements ClientProtocol {
|
|||
|
||||
/**
|
||||
* Creates RpcClient instance with the given configuration.
|
||||
* @param conf
|
||||
* @param conf Configuration
|
||||
* @param omServiceId OM HA Service ID, set this to null if not HA
|
||||
* @throws IOException
|
||||
*/
|
||||
public RpcClient(Configuration conf) throws IOException {
|
||||
public RpcClient(Configuration conf, String omServiceId) throws IOException {
|
||||
Preconditions.checkNotNull(conf);
|
||||
this.conf = new OzoneConfiguration(conf);
|
||||
this.ugi = UserGroupInformation.getCurrentUser();
|
||||
|
@ -158,7 +159,7 @@ public class RpcClient implements ClientProtocol {
|
|||
|
||||
this.ozoneManagerClient = TracingUtil.createProxy(
|
||||
new OzoneManagerProtocolClientSideTranslatorPB(
|
||||
this.conf, clientId.toString(), ugi),
|
||||
this.conf, clientId.toString(), omServiceId, ugi),
|
||||
OzoneManagerProtocol.class, conf
|
||||
);
|
||||
long scmVersion =
|
||||
|
|
|
@ -60,6 +60,7 @@ import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_HTTP_BIND_HOST_KE
|
|||
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_HTTP_BIND_PORT_DEFAULT;
|
||||
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_NODES_KEY;
|
||||
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_PORT_DEFAULT;
|
||||
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SERVICE_IDS_KEY;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -136,6 +137,29 @@ public final class OmUtils {
|
|||
host.get() + ":" + getOmRpcPort(conf));
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true if OZONE_OM_SERVICE_IDS_KEY is defined and not empty.
|
||||
* @param conf Configuration
|
||||
* @return true if OZONE_OM_SERVICE_IDS_KEY is defined and not empty;
|
||||
* else false.
|
||||
*/
|
||||
public static boolean isServiceIdsDefined(Configuration conf) {
|
||||
String val = conf.get(OZONE_OM_SERVICE_IDS_KEY);
|
||||
return val != null && val.length() > 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true if HA for OzoneManager is configured for the given service id.
|
||||
* @param conf Configuration
|
||||
* @param serviceId OM HA cluster service ID
|
||||
* @return true if HA is configured in the configuration; else false.
|
||||
*/
|
||||
public static boolean isOmHAServiceId(Configuration conf, String serviceId) {
|
||||
Collection<String> omServiceIds = conf.getTrimmedStringCollection(
|
||||
OZONE_OM_SERVICE_IDS_KEY);
|
||||
return omServiceIds.contains(serviceId);
|
||||
}
|
||||
|
||||
public static int getOmRpcPort(Configuration conf) {
|
||||
// If no port number is specified then we'll just try the defaultBindPort.
|
||||
final Optional<Integer> port = getPortNumberFromConfigKeys(conf,
|
||||
|
|
|
@ -39,12 +39,12 @@ import java.io.IOException;
|
|||
import java.net.InetSocketAddress;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
|
||||
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SERVICE_IDS_KEY;
|
||||
|
||||
/**
|
||||
* A failover proxy provider implementation which allows clients to configure
|
||||
|
@ -70,31 +70,33 @@ public class OMFailoverProxyProvider implements
|
|||
private final UserGroupInformation ugi;
|
||||
private final Text delegationTokenService;
|
||||
|
||||
private final String omServiceId;
|
||||
|
||||
public OMFailoverProxyProvider(OzoneConfiguration configuration,
|
||||
UserGroupInformation ugi) throws IOException {
|
||||
UserGroupInformation ugi, String omServiceId) throws IOException {
|
||||
this.conf = configuration;
|
||||
this.omVersion = RPC.getProtocolVersion(OzoneManagerProtocolPB.class);
|
||||
this.ugi = ugi;
|
||||
loadOMClientConfigs(conf);
|
||||
this.omServiceId = omServiceId;
|
||||
loadOMClientConfigs(conf, this.omServiceId);
|
||||
this.delegationTokenService = computeDelegationTokenService();
|
||||
|
||||
currentProxyIndex = 0;
|
||||
currentProxyOMNodeId = omNodeIDList.get(currentProxyIndex);
|
||||
}
|
||||
|
||||
private void loadOMClientConfigs(Configuration config) throws IOException {
|
||||
public OMFailoverProxyProvider(OzoneConfiguration configuration,
|
||||
UserGroupInformation ugi) throws IOException {
|
||||
this(configuration, ugi, null);
|
||||
}
|
||||
|
||||
private void loadOMClientConfigs(Configuration config, String omSvcId)
|
||||
throws IOException {
|
||||
this.omProxies = new HashMap<>();
|
||||
this.omProxyInfos = new HashMap<>();
|
||||
this.omNodeIDList = new ArrayList<>();
|
||||
|
||||
Collection<String> omServiceIds = config.getTrimmedStringCollection(
|
||||
OZONE_OM_SERVICE_IDS_KEY);
|
||||
|
||||
if (omServiceIds.size() > 1) {
|
||||
throw new IllegalArgumentException("Multi-OM Services is not supported." +
|
||||
" Please configure only one OM Service ID in " +
|
||||
OZONE_OM_SERVICE_IDS_KEY);
|
||||
}
|
||||
Collection<String> omServiceIds = Collections.singletonList(omSvcId);
|
||||
|
||||
for (String serviceId : OmUtils.emptyAsSingletonNull(omServiceIds)) {
|
||||
Collection<String> omNodeIds = OmUtils.getOMNodeIds(config, serviceId);
|
||||
|
|
|
@ -191,8 +191,10 @@ public final class OzoneManagerProtocolClientSideTranslatorPB
|
|||
* cluster.
|
||||
*/
|
||||
public OzoneManagerProtocolClientSideTranslatorPB(OzoneConfiguration conf,
|
||||
String clientId, UserGroupInformation ugi) throws IOException {
|
||||
this.omFailoverProxyProvider = new OMFailoverProxyProvider(conf, ugi);
|
||||
String clientId, String omServiceId, UserGroupInformation ugi)
|
||||
throws IOException {
|
||||
this.omFailoverProxyProvider = new OMFailoverProxyProvider(conf, ugi,
|
||||
omServiceId);
|
||||
|
||||
int maxRetries = conf.getInt(
|
||||
OzoneConfigKeys.OZONE_CLIENT_RETRY_MAX_ATTEMPTS_KEY,
|
||||
|
|
|
@ -14,10 +14,13 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
OZONE-SITE.XML_ozone.om.nodes=om1,om2,om3
|
||||
OZONE-SITE.XML_ozone.om.address.om1=om1
|
||||
OZONE-SITE.XML_ozone.om.address.om2=om2
|
||||
OZONE-SITE.XML_ozone.om.address.om3=om3
|
||||
CORE-SITE.XML_fs.o3fs.impl=org.apache.hadoop.fs.ozone.OzoneFileSystem
|
||||
CORE-SITE.XML_fs.defaultFS=o3fs://bucket.volume.id1
|
||||
OZONE-SITE.XML_ozone.om.service.ids=id1
|
||||
OZONE-SITE.XML_ozone.om.nodes.id1=om1,om2,om3
|
||||
OZONE-SITE.XML_ozone.om.address.id1.om1=om1
|
||||
OZONE-SITE.XML_ozone.om.address.id1.om2=om2
|
||||
OZONE-SITE.XML_ozone.om.address.id1.om3=om3
|
||||
OZONE-SITE.XML_ozone.om.ratis.enable=true
|
||||
OZONE-SITE.XML_ozone.scm.names=scm
|
||||
OZONE-SITE.XML_ozone.enabled=True
|
||||
|
|
|
@ -96,6 +96,13 @@ public interface MiniOzoneCluster {
|
|||
*/
|
||||
void waitTobeOutOfSafeMode() throws TimeoutException, InterruptedException;
|
||||
|
||||
/**
|
||||
* Returns OzoneManager Service ID.
|
||||
*
|
||||
* @return Service ID String
|
||||
*/
|
||||
String getServiceId();
|
||||
|
||||
/**
|
||||
* Returns {@link StorageContainerManager} associated with this
|
||||
* {@link MiniOzoneCluster} instance.
|
||||
|
|
|
@ -128,6 +128,11 @@ public class MiniOzoneClusterImpl implements MiniOzoneCluster {
|
|||
return conf;
|
||||
}
|
||||
|
||||
public String getServiceId() {
|
||||
// Non-HA cluster doesn't have OM Service Id.
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Waits for the Ozone cluster to be ready for processing requests.
|
||||
*/
|
||||
|
|
|
@ -22,6 +22,8 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
|||
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
|
||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.ozone.client.OzoneClient;
|
||||
import org.apache.hadoop.ozone.client.OzoneClientFactory;
|
||||
import org.apache.hadoop.ozone.om.OMConfigKeys;
|
||||
import org.apache.hadoop.ozone.om.OMStorage;
|
||||
import org.apache.hadoop.ozone.om.OzoneManager;
|
||||
|
@ -52,6 +54,7 @@ public final class MiniOzoneHAClusterImpl extends MiniOzoneClusterImpl {
|
|||
|
||||
private Map<String, OzoneManager> ozoneManagerMap;
|
||||
private List<OzoneManager> ozoneManagers;
|
||||
private String omServiceId;
|
||||
|
||||
// Active OMs denote OMs which are up and running
|
||||
private List<OzoneManager> activeOMs;
|
||||
|
@ -74,12 +77,19 @@ public final class MiniOzoneHAClusterImpl extends MiniOzoneClusterImpl {
|
|||
List<OzoneManager> activeOMList,
|
||||
List<OzoneManager> inactiveOMList,
|
||||
StorageContainerManager scm,
|
||||
List<HddsDatanodeService> hddsDatanodes) {
|
||||
List<HddsDatanodeService> hddsDatanodes,
|
||||
String omServiceId) {
|
||||
super(conf, scm, hddsDatanodes);
|
||||
this.ozoneManagerMap = omMap;
|
||||
this.ozoneManagers = new ArrayList<>(omMap.values());
|
||||
this.activeOMs = activeOMList;
|
||||
this.inactiveOMs = inactiveOMList;
|
||||
this.omServiceId = omServiceId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getServiceId() {
|
||||
return omServiceId;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -91,6 +101,11 @@ public final class MiniOzoneHAClusterImpl extends MiniOzoneClusterImpl {
|
|||
return this.ozoneManagers.get(0);
|
||||
}
|
||||
|
||||
@Override
|
||||
public OzoneClient getRpcClient() throws IOException {
|
||||
return OzoneClientFactory.getRpcClient(getServiceId(), getConf());
|
||||
}
|
||||
|
||||
public boolean isOMActive(String omNodeId) {
|
||||
return activeOMs.contains(ozoneManagerMap.get(omNodeId));
|
||||
}
|
||||
|
@ -188,7 +203,7 @@ public final class MiniOzoneHAClusterImpl extends MiniOzoneClusterImpl {
|
|||
|
||||
final List<HddsDatanodeService> hddsDatanodes = createHddsDatanodes(scm);
|
||||
MiniOzoneHAClusterImpl cluster = new MiniOzoneHAClusterImpl(
|
||||
conf, omMap, activeOMs, inactiveOMs, scm, hddsDatanodes);
|
||||
conf, omMap, activeOMs, inactiveOMs, scm, hddsDatanodes, omServiceId);
|
||||
if (startDataNodes) {
|
||||
cluster.startHddsDatanodes();
|
||||
}
|
||||
|
|
|
@ -79,7 +79,7 @@ public interface RatisTestHelper {
|
|||
|
||||
public ClientProtocol newOzoneClient()
|
||||
throws IOException {
|
||||
return new RpcClient(conf);
|
||||
return new RpcClient(conf, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -949,7 +949,7 @@ public abstract class TestOzoneRpcClientAbstract {
|
|||
Configuration configuration = cluster.getConf();
|
||||
configuration.setBoolean(OzoneConfigKeys.OZONE_CLIENT_VERIFY_CHECKSUM,
|
||||
verifyChecksum);
|
||||
RpcClient client = new RpcClient(configuration);
|
||||
RpcClient client = new RpcClient(configuration, null);
|
||||
OzoneInputStream is = client.getKey(volumeName, bucketName, keyName);
|
||||
is.read(new byte[100]);
|
||||
is.close();
|
||||
|
|
|
@ -52,6 +52,7 @@ public class TestOMRatisSnapshots {
|
|||
private OzoneConfiguration conf;
|
||||
private String clusterId;
|
||||
private String scmId;
|
||||
private String omServiceId;
|
||||
private int numOfOMs = 3;
|
||||
private static final long SNAPSHOT_THRESHOLD = 50;
|
||||
private static final int LOG_PURGE_GAP = 50;
|
||||
|
@ -74,6 +75,7 @@ public class TestOMRatisSnapshots {
|
|||
conf = new OzoneConfiguration();
|
||||
clusterId = UUID.randomUUID().toString();
|
||||
scmId = UUID.randomUUID().toString();
|
||||
omServiceId = "om-service-test1";
|
||||
conf.setLong(
|
||||
OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_AUTO_TRIGGER_THRESHOLD_KEY,
|
||||
SNAPSHOT_THRESHOLD);
|
||||
|
@ -86,7 +88,8 @@ public class TestOMRatisSnapshots {
|
|||
.setNumOfActiveOMs(2)
|
||||
.build();
|
||||
cluster.waitForClusterToBeReady();
|
||||
objectStore = OzoneClientFactory.getRpcClient(conf).getObjectStore();
|
||||
objectStore = OzoneClientFactory.getRpcClient(omServiceId, conf)
|
||||
.getObjectStore();
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -101,6 +101,7 @@ public class TestOzoneManagerHA {
|
|||
private OzoneConfiguration conf;
|
||||
private String clusterId;
|
||||
private String scmId;
|
||||
private String omServiceId;
|
||||
private int numOfOMs = 3;
|
||||
private static final long SNAPSHOT_THRESHOLD = 50;
|
||||
private static final int LOG_PURGE_GAP = 50;
|
||||
|
@ -123,6 +124,7 @@ public class TestOzoneManagerHA {
|
|||
conf = new OzoneConfiguration();
|
||||
clusterId = UUID.randomUUID().toString();
|
||||
scmId = UUID.randomUUID().toString();
|
||||
omServiceId = "om-service-test1";
|
||||
conf.setBoolean(OZONE_ACL_ENABLED, true);
|
||||
conf.set(OzoneConfigKeys.OZONE_ADMINISTRATORS,
|
||||
OZONE_ADMINISTRATORS_WILDCARD);
|
||||
|
@ -136,11 +138,12 @@ public class TestOzoneManagerHA {
|
|||
cluster = (MiniOzoneHAClusterImpl) MiniOzoneCluster.newHABuilder(conf)
|
||||
.setClusterId(clusterId)
|
||||
.setScmId(scmId)
|
||||
.setOMServiceId("om-service-test1")
|
||||
.setOMServiceId(omServiceId)
|
||||
.setNumOfOzoneManagers(numOfOMs)
|
||||
.build();
|
||||
cluster.waitForClusterToBeReady();
|
||||
objectStore = OzoneClientFactory.getRpcClient(conf).getObjectStore();
|
||||
objectStore = OzoneClientFactory.getRpcClient(omServiceId, conf)
|
||||
.getObjectStore();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -758,7 +761,7 @@ public class TestOzoneManagerHA {
|
|||
|
||||
// Get the ObjectStore and FailoverProxyProvider for OM at index i
|
||||
final ObjectStore store = OzoneClientFactory.getRpcClient(
|
||||
omHostName, rpcPort, conf).getObjectStore();
|
||||
omHostName, rpcPort, omServiceId, conf).getObjectStore();
|
||||
final OMFailoverProxyProvider proxyProvider =
|
||||
store.getClientProxy().getOMProxyProvider();
|
||||
|
||||
|
|
|
@ -49,6 +49,7 @@ public class TestOzoneManagerSnapshotProvider {
|
|||
private OzoneConfiguration conf;
|
||||
private String clusterId;
|
||||
private String scmId;
|
||||
private String omServiceId;
|
||||
private int numOfOMs = 3;
|
||||
|
||||
@Rule
|
||||
|
@ -62,16 +63,18 @@ public class TestOzoneManagerSnapshotProvider {
|
|||
conf = new OzoneConfiguration();
|
||||
clusterId = UUID.randomUUID().toString();
|
||||
scmId = UUID.randomUUID().toString();
|
||||
omServiceId = "om-service-test1";
|
||||
conf.setBoolean(OMConfigKeys.OZONE_OM_HTTP_ENABLED_KEY, true);
|
||||
conf.setBoolean(OMConfigKeys.OZONE_OM_RATIS_ENABLE_KEY, true);
|
||||
cluster = (MiniOzoneHAClusterImpl) MiniOzoneCluster.newHABuilder(conf)
|
||||
.setClusterId(clusterId)
|
||||
.setScmId(scmId)
|
||||
.setOMServiceId("om-service-test1")
|
||||
.setOMServiceId(omServiceId)
|
||||
.setNumOfOzoneManagers(numOfOMs)
|
||||
.build();
|
||||
cluster.waitForClusterToBeReady();
|
||||
objectStore = OzoneClientFactory.getRpcClient(conf).getObjectStore();
|
||||
objectStore = OzoneClientFactory.getRpcClient(omServiceId, conf)
|
||||
.getObjectStore();
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -111,7 +111,7 @@ public class TestS3Shell {
|
|||
.build();
|
||||
conf.setInt(OZONE_REPLICATION, ReplicationFactor.THREE.getValue());
|
||||
conf.setQuietMode(false);
|
||||
client = new RpcClient(conf);
|
||||
client = new RpcClient(conf, null);
|
||||
cluster.waitForClusterToBeReady();
|
||||
}
|
||||
|
||||
|
|
|
@ -100,7 +100,7 @@ public class ReconControllerModule extends AbstractModule {
|
|||
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
|
||||
ozoneManagerClient = new
|
||||
OzoneManagerProtocolClientSideTranslatorPB(
|
||||
ozoneConfiguration, clientId.toString(), ugi);
|
||||
ozoneConfiguration, clientId.toString(), null, ugi);
|
||||
} catch (IOException ioEx) {
|
||||
LOG.error("Error in provisioning OzoneManagerProtocol ", ioEx);
|
||||
}
|
||||
|
|
|
@ -36,6 +36,7 @@ import org.apache.hadoop.hdds.client.ReplicationType;
|
|||
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
||||
import org.apache.hadoop.hdds.security.x509.SecurityConfig;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.ozone.OmUtils;
|
||||
import org.apache.hadoop.ozone.OzoneConfigKeys;
|
||||
import org.apache.hadoop.ozone.client.ObjectStore;
|
||||
import org.apache.hadoop.ozone.client.OzoneBucket;
|
||||
|
@ -115,6 +116,29 @@ public class BasicOzoneClientAdapterImpl implements OzoneClientAdapter {
|
|||
conf = new OzoneConfiguration(hadoopConf);
|
||||
}
|
||||
|
||||
if (omHost == null && OmUtils.isServiceIdsDefined(conf)) {
|
||||
// When the host name or service id isn't given
|
||||
// but ozone.om.service.ids is defined, declare failure.
|
||||
|
||||
// This is a safety precaution that prevents the client from
|
||||
// accidentally failing over to an unintended OM.
|
||||
throw new IllegalArgumentException("Service ID or host name must not"
|
||||
+ " be omitted when ozone.om.service.ids is defined.");
|
||||
}
|
||||
|
||||
if (omPort != -1) {
|
||||
// When the port number is specified, perform the following check
|
||||
if (OmUtils.isOmHAServiceId(conf, omHost)) {
|
||||
// If omHost is a service id, it shouldn't use a port
|
||||
throw new IllegalArgumentException("Port " + omPort +
|
||||
" specified in URI but host '" + omHost + "' is "
|
||||
+ "a logical (HA) OzoneManager and does not use port information.");
|
||||
}
|
||||
} else {
|
||||
// When port number is not specified, read it from config
|
||||
omPort = OmUtils.getOmRpcPort(conf);
|
||||
}
|
||||
|
||||
SecurityConfig secConfig = new SecurityConfig(conf);
|
||||
|
||||
if (secConfig.isSecurityEnabled()) {
|
||||
|
@ -129,7 +153,12 @@ public class BasicOzoneClientAdapterImpl implements OzoneClientAdapter {
|
|||
int replicationCountConf = conf.getInt(OzoneConfigKeys.OZONE_REPLICATION,
|
||||
OzoneConfigKeys.OZONE_REPLICATION_DEFAULT);
|
||||
|
||||
if (StringUtils.isNotEmpty(omHost) && omPort != -1) {
|
||||
if (OmUtils.isOmHAServiceId(conf, omHost)) {
|
||||
// omHost is listed as one of the service ids in the config,
|
||||
// thus we should treat omHost as omServiceId
|
||||
this.ozoneClient =
|
||||
OzoneClientFactory.getRpcClient(omHost, conf);
|
||||
} else if (StringUtils.isNotEmpty(omHost) && omPort != -1) {
|
||||
this.ozoneClient =
|
||||
OzoneClientFactory.getRpcClient(omHost, omPort, conf);
|
||||
} else {
|
||||
|
|
|
@ -112,6 +112,11 @@ public class BasicOzoneFileSystem extends FileSystem {
|
|||
"Invalid scheme provided in " + name);
|
||||
|
||||
String authority = name.getAuthority();
|
||||
if (authority == null) {
|
||||
// authority is null when fs.defaultFS is not a qualified o3fs URI and
|
||||
// o3fs:/// is passed to the client. matcher will NPE if authority is null
|
||||
throw new IllegalArgumentException(URI_EXCEPTION_TEXT);
|
||||
}
|
||||
|
||||
Matcher matcher = URL_SCHEMA_PATTERN.matcher(authority);
|
||||
|
||||
|
@ -126,7 +131,7 @@ public class BasicOzoneFileSystem extends FileSystem {
|
|||
int omPort = -1;
|
||||
if (!isEmpty(remaining)) {
|
||||
String[] parts = remaining.split(":");
|
||||
// Array length should be either 1(host) or 2(host:port)
|
||||
// Array length should be either 1(hostname or service id) or 2(host:port)
|
||||
if (parts.length > 2) {
|
||||
throw new IllegalArgumentException(getUriExceptionText(conf));
|
||||
}
|
||||
|
@ -137,9 +142,6 @@ public class BasicOzoneFileSystem extends FileSystem {
|
|||
} catch (NumberFormatException e) {
|
||||
throw new IllegalArgumentException(getUriExceptionText(conf));
|
||||
}
|
||||
} else {
|
||||
// If port number is not specified, read it from config
|
||||
omPort = OmUtils.getOmRpcPort(conf);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,348 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with this
|
||||
* work for additional information regarding copyright ownership. The ASF
|
||||
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations under
|
||||
* the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.ozone;
|
||||
|
||||
import org.apache.commons.lang3.RandomStringUtils;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.FsShell;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdds.HddsConfigKeys;
|
||||
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
||||
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
|
||||
import org.apache.hadoop.ozone.MiniOzoneCluster;
|
||||
import org.apache.hadoop.ozone.MiniOzoneHAClusterImpl;
|
||||
import org.apache.hadoop.ozone.OmUtils;
|
||||
import org.apache.hadoop.ozone.OzoneConfigKeys;
|
||||
import org.apache.hadoop.ozone.OzoneConsts;
|
||||
import org.apache.hadoop.ozone.client.ObjectStore;
|
||||
import org.apache.hadoop.ozone.client.OzoneClientFactory;
|
||||
import org.apache.hadoop.ozone.client.OzoneVolume;
|
||||
import org.apache.hadoop.ozone.om.OMConfigKeys;
|
||||
import org.apache.hadoop.ozone.om.OMStorage;
|
||||
import org.apache.hadoop.ozone.om.OzoneManager;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.util.ToolRunner;
|
||||
import org.apache.ratis.util.LifeCycle;
|
||||
import org.hamcrest.core.StringContains;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.apache.hadoop.hdds.HddsUtils.getHostName;
|
||||
import static org.apache.hadoop.hdds.HddsUtils.getHostPort;
|
||||
|
||||
/**
|
||||
* Test client-side URI handling with Ozone Manager HA.
|
||||
*/
|
||||
public class TestOzoneFsHAURLs {
|
||||
public static final Logger LOG = LoggerFactory.getLogger(
|
||||
TestOzoneFsHAURLs.class);
|
||||
|
||||
private OzoneConfiguration conf;
|
||||
private MiniOzoneCluster cluster;
|
||||
private String omId;
|
||||
private String omServiceId;
|
||||
private String clusterId;
|
||||
private String scmId;
|
||||
private OzoneManager om;
|
||||
private int numOfOMs;
|
||||
|
||||
private String volumeName;
|
||||
private String bucketName;
|
||||
private String rootPath;
|
||||
|
||||
private final String o3fsImplKey =
|
||||
"fs." + OzoneConsts.OZONE_URI_SCHEME + ".impl";
|
||||
private final String o3fsImplValue =
|
||||
"org.apache.hadoop.fs.ozone.OzoneFileSystem";
|
||||
|
||||
private static final long LEADER_ELECTION_TIMEOUT = 500L;
|
||||
|
||||
@Before
|
||||
public void init() throws Exception {
|
||||
conf = new OzoneConfiguration();
|
||||
omId = UUID.randomUUID().toString();
|
||||
omServiceId = "om-service-test1";
|
||||
numOfOMs = 3;
|
||||
clusterId = UUID.randomUUID().toString();
|
||||
scmId = UUID.randomUUID().toString();
|
||||
final String path = GenericTestUtils.getTempPath(omId);
|
||||
java.nio.file.Path metaDirPath = java.nio.file.Paths.get(path, "om-meta");
|
||||
conf.setBoolean(OzoneConfigKeys.OZONE_ENABLED, true);
|
||||
conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, metaDirPath.toString());
|
||||
conf.set(ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY, "127.0.0.1:0");
|
||||
conf.setBoolean(OMConfigKeys.OZONE_OM_RATIS_ENABLE_KEY, true);
|
||||
conf.setTimeDuration(
|
||||
OMConfigKeys.OZONE_OM_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_KEY,
|
||||
LEADER_ELECTION_TIMEOUT, TimeUnit.MILLISECONDS);
|
||||
|
||||
OMStorage omStore = new OMStorage(conf);
|
||||
omStore.setClusterId(clusterId);
|
||||
omStore.setScmId(scmId);
|
||||
// writes the version file properties
|
||||
omStore.initialize();
|
||||
|
||||
// Start the cluster
|
||||
cluster = MiniOzoneCluster.newHABuilder(conf)
|
||||
.setClusterId(clusterId)
|
||||
.setScmId(scmId)
|
||||
.setOMServiceId(omServiceId)
|
||||
.setNumOfOzoneManagers(numOfOMs)
|
||||
.build();
|
||||
cluster.waitForClusterToBeReady();
|
||||
|
||||
om = cluster.getOzoneManager();
|
||||
Assert.assertEquals(LifeCycle.State.RUNNING, om.getOmRatisServerState());
|
||||
|
||||
volumeName = "volume" + RandomStringUtils.randomNumeric(5);
|
||||
ObjectStore objectStore =
|
||||
OzoneClientFactory.getRpcClient(omServiceId, conf).getObjectStore();
|
||||
objectStore.createVolume(volumeName);
|
||||
|
||||
OzoneVolume retVolumeinfo = objectStore.getVolume(volumeName);
|
||||
bucketName = "bucket" + RandomStringUtils.randomNumeric(5);
|
||||
retVolumeinfo.createBucket(bucketName);
|
||||
|
||||
rootPath = String.format("%s://%s.%s.%s/", OzoneConsts.OZONE_URI_SCHEME,
|
||||
bucketName, volumeName, omServiceId);
|
||||
// Set fs.defaultFS
|
||||
conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath);
|
||||
FileSystem fs = FileSystem.get(conf);
|
||||
// Create some dirs
|
||||
Path root = new Path("/");
|
||||
Path dir1 = new Path(root, "dir1");
|
||||
Path dir12 = new Path(dir1, "dir12");
|
||||
Path dir2 = new Path(root, "dir2");
|
||||
fs.mkdirs(dir12);
|
||||
fs.mkdirs(dir2);
|
||||
}
|
||||
|
||||
@After
|
||||
public void shutdown() {
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the leader OM's RPC address in the MiniOzoneHACluster
|
||||
*/
|
||||
private String getLeaderOMNodeAddr() {
|
||||
String leaderOMNodeAddr = null;
|
||||
Collection<String> omNodeIds = OmUtils.getOMNodeIds(conf, omServiceId);
|
||||
assert(omNodeIds.size() == numOfOMs);
|
||||
MiniOzoneHAClusterImpl haCluster = (MiniOzoneHAClusterImpl) cluster;
|
||||
// Note: this loop may be implemented inside MiniOzoneHAClusterImpl
|
||||
for (String omNodeId : omNodeIds) {
|
||||
// Find the leader OM
|
||||
if (!haCluster.getOzoneManager(omNodeId).isLeader()) {
|
||||
continue;
|
||||
}
|
||||
// ozone.om.address.omServiceId.omNode
|
||||
String leaderOMNodeAddrKey = OmUtils.addKeySuffixes(
|
||||
OMConfigKeys.OZONE_OM_ADDRESS_KEY, omServiceId, omNodeId);
|
||||
leaderOMNodeAddr = conf.get(leaderOMNodeAddrKey);
|
||||
LOG.info("Found leader OM: nodeId=" + omNodeId + ", " +
|
||||
leaderOMNodeAddrKey + "=" + leaderOMNodeAddr);
|
||||
// Leader found, no need to continue loop
|
||||
break;
|
||||
}
|
||||
// There has to be a leader
|
||||
assert(leaderOMNodeAddr != null);
|
||||
return leaderOMNodeAddr;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get host name from an address. This uses getHostName() internally.
|
||||
* @param addr Address with port number
|
||||
* @return Host name
|
||||
*/
|
||||
private String getHostFromAddress(String addr) {
|
||||
Optional<String> hostOptional = getHostName(addr);
|
||||
assert(hostOptional.isPresent());
|
||||
return hostOptional.get();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get port number from an address. This uses getHostPort() internally.
|
||||
* @param addr Address with port
|
||||
* @return Port number
|
||||
*/
|
||||
private int getPortFromAddress(String addr) {
|
||||
Optional<Integer> portOptional = getHostPort(addr);
|
||||
assert(portOptional.isPresent());
|
||||
return portOptional.get();
|
||||
}
|
||||
|
||||
/**
|
||||
* Test OM HA URLs with qualified fs.defaultFS.
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testWithQualifiedDefaultFS() throws Exception {
|
||||
OzoneConfiguration clientConf = new OzoneConfiguration(conf);
|
||||
clientConf.setQuietMode(false);
|
||||
clientConf.set(o3fsImplKey, o3fsImplValue);
|
||||
// fs.defaultFS = o3fs://bucketName.volumeName.omServiceId/
|
||||
clientConf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath);
|
||||
|
||||
// Pick leader OM's RPC address and assign it to ozone.om.address for
|
||||
// the test case: ozone fs -ls o3fs://bucket.volume.om1/
|
||||
String leaderOMNodeAddr = getLeaderOMNodeAddr();
|
||||
// ozone.om.address was set to service id in MiniOzoneHAClusterImpl
|
||||
clientConf.set(OMConfigKeys.OZONE_OM_ADDRESS_KEY, leaderOMNodeAddr);
|
||||
|
||||
FsShell shell = new FsShell(clientConf);
|
||||
int res;
|
||||
try {
|
||||
// Test case 1: ozone fs -ls /
|
||||
// Expectation: Success.
|
||||
res = ToolRunner.run(shell, new String[] {"-ls", "/"});
|
||||
// Check return value, should be 0 (success)
|
||||
Assert.assertEquals(res, 0);
|
||||
|
||||
// Test case 2: ozone fs -ls o3fs:///
|
||||
// Expectation: Success. fs.defaultFS is a fully qualified path.
|
||||
res = ToolRunner.run(shell, new String[] {"-ls", "o3fs:///"});
|
||||
Assert.assertEquals(res, 0);
|
||||
|
||||
// Test case 3: ozone fs -ls o3fs://bucket.volume/
|
||||
// Expectation: Fail. Must have service id or host name when HA is enabled
|
||||
String unqualifiedPath1 = String.format("%s://%s.%s/",
|
||||
OzoneConsts.OZONE_URI_SCHEME, bucketName, volumeName);
|
||||
try (GenericTestUtils.SystemErrCapturer capture =
|
||||
new GenericTestUtils.SystemErrCapturer()) {
|
||||
res = ToolRunner.run(shell, new String[] {"-ls", unqualifiedPath1});
|
||||
// Check stderr, inspired by testDFSWithInvalidCommmand
|
||||
Assert.assertThat("Command did not print the error message " +
|
||||
"correctly for test case: ozone fs -ls o3fs://bucket.volume/",
|
||||
capture.getOutput(), StringContains.containsString(
|
||||
"-ls: Service ID or host name must not"
|
||||
+ " be omitted when ozone.om.service.ids is defined."));
|
||||
}
|
||||
// Check return value, should be -1 (failure)
|
||||
Assert.assertEquals(res, -1);
|
||||
|
||||
// Test case 4: ozone fs -ls o3fs://bucket.volume.om1/
|
||||
// Expectation: Success. The client should use the port number
|
||||
// set in ozone.om.address.
|
||||
String qualifiedPath1 = String.format("%s://%s.%s.%s/",
|
||||
OzoneConsts.OZONE_URI_SCHEME, bucketName, volumeName,
|
||||
getHostFromAddress(leaderOMNodeAddr));
|
||||
res = ToolRunner.run(shell, new String[] {"-ls", qualifiedPath1});
|
||||
// Note: this test case will fail if the port is not from the leader node
|
||||
Assert.assertEquals(res, 0);
|
||||
|
||||
// Test case 5: ozone fs -ls o3fs://bucket.volume.om1:port/
|
||||
// Expectation: Success.
|
||||
String qualifiedPath2 = String.format("%s://%s.%s.%s/",
|
||||
OzoneConsts.OZONE_URI_SCHEME, bucketName, volumeName,
|
||||
leaderOMNodeAddr);
|
||||
res = ToolRunner.run(shell, new String[] {"-ls", qualifiedPath2});
|
||||
Assert.assertEquals(res, 0);
|
||||
|
||||
// Test case 6: ozone fs -ls o3fs://bucket.volume.id1/
|
||||
// Expectation: Success.
|
||||
String qualifiedPath3 = String.format("%s://%s.%s.%s/",
|
||||
OzoneConsts.OZONE_URI_SCHEME, bucketName, volumeName, omServiceId);
|
||||
res = ToolRunner.run(shell, new String[] {"-ls", qualifiedPath3});
|
||||
Assert.assertEquals(res, 0);
|
||||
|
||||
// Test case 7: ozone fs -ls o3fs://bucket.volume.id1:port/
|
||||
// Expectation: Fail. Service ID does not use port information.
|
||||
// Use the port number from leader OM (doesn't really matter)
|
||||
String unqualifiedPath2 = String.format("%s://%s.%s.%s:%d/",
|
||||
OzoneConsts.OZONE_URI_SCHEME, bucketName, volumeName,
|
||||
omServiceId, getPortFromAddress(leaderOMNodeAddr));
|
||||
try (GenericTestUtils.SystemErrCapturer capture =
|
||||
new GenericTestUtils.SystemErrCapturer()) {
|
||||
res = ToolRunner.run(shell, new String[] {"-ls", unqualifiedPath2});
|
||||
// Check stderr
|
||||
Assert.assertThat("Command did not print the error message " +
|
||||
"correctly for test case: "
|
||||
+ "ozone fs -ls o3fs://bucket.volume.id1:port/",
|
||||
capture.getOutput(), StringContains.containsString(
|
||||
"does not use port information"));
|
||||
}
|
||||
// Check return value, should be -1 (failure)
|
||||
Assert.assertEquals(res, -1);
|
||||
} finally {
|
||||
shell.close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper function for testOtherDefaultFS(),
|
||||
* run fs -ls o3fs:/// against different fs.defaultFS input.
|
||||
*
|
||||
* @param defaultFS Desired fs.defaultFS to be used in the test
|
||||
* @throws Exception
|
||||
*/
|
||||
private void testWithDefaultFS(String defaultFS) throws Exception {
|
||||
OzoneConfiguration clientConf = new OzoneConfiguration(conf);
|
||||
clientConf.setQuietMode(false);
|
||||
clientConf.set(o3fsImplKey, o3fsImplValue);
|
||||
// fs.defaultFS = file:///
|
||||
clientConf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY,
|
||||
defaultFS);
|
||||
|
||||
FsShell shell = new FsShell(clientConf);
|
||||
try {
|
||||
// Test case: ozone fs -ls o3fs:///
|
||||
// Expectation: Fail. fs.defaultFS is not a qualified o3fs URI.
|
||||
int res = ToolRunner.run(shell, new String[] {"-ls", "o3fs:///"});
|
||||
Assert.assertEquals(res, -1);
|
||||
} finally {
|
||||
shell.close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test OM HA URLs with some unqualified fs.defaultFS.
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testOtherDefaultFS() throws Exception {
|
||||
// Test scenarios where fs.defaultFS isn't a fully qualified o3fs
|
||||
|
||||
// fs.defaultFS = file:///
|
||||
testWithDefaultFS(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_DEFAULT);
|
||||
|
||||
// fs.defaultFS = hdfs://ns1/
|
||||
testWithDefaultFS("hdfs://ns1/");
|
||||
|
||||
// fs.defaultFS = o3fs:///
|
||||
String unqualifiedFs1 = String.format(
|
||||
"%s:///", OzoneConsts.OZONE_URI_SCHEME);
|
||||
testWithDefaultFS(unqualifiedFs1);
|
||||
|
||||
// fs.defaultFS = o3fs://bucketName.volumeName/
|
||||
String unqualifiedFs2 = String.format("%s://%s.%s/",
|
||||
OzoneConsts.OZONE_URI_SCHEME, bucketName, volumeName);
|
||||
testWithDefaultFS(unqualifiedFs2);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue