YARN-9086. [CSI] Run csi-driver-adaptor as aux service. Contributed by Weiwei Yang.

This commit is contained in:
Weiwei Yang 2019-01-29 14:53:08 +08:00
parent 2d06112b74
commit 085f0e8ae7
9 changed files with 728 additions and 158 deletions

View File

@ -0,0 +1,50 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.exceptions.YarnException;
/**
* csi-adaptor is a plugin, user can provide customized implementation
* according to this interface. NM will init and load this into a NM aux
* service, and it can run multiple csi-adaptor servers.
*
* User needs to implement all the methods defined in
* {@link CsiAdaptorProtocol}, and plus the methods in this interface.
*/
public interface CsiAdaptorPlugin extends CsiAdaptorProtocol {
/**
* A csi-adaptor implementation can init its state within this function.
* Configuration is available so the implementation can retrieve some
* customized configuration from yarn-site.xml.
* @param driverName the name of the csi-driver.
* @param conf configuration.
* @throws YarnException
*/
void init(String driverName, Configuration conf) throws YarnException;
/**
* Returns the driver name of the csi-driver this adaptor works with.
* The name should be consistent on all the places being used, ideally
* it should come from the value when init is done.
* @return the name of the csi-driver that this adaptor works with.
*/
String getDriverName();
}

View File

@ -3489,6 +3489,8 @@ public class YarnConfiguration extends Configuration {
".endpoint";
public static final String NM_CSI_ADAPTOR_ADDRESS_SUFFIX =
".address";
public static final String NM_CSI_ADAPTOR_CLASS =
".class";
/**
* One or more socket addresses for csi-adaptor.
* Multiple addresses are delimited by ",".

View File

@ -4102,6 +4102,14 @@
"yarn.nodemanager.csi-driver-adaptor.${NAME}.address"
The 1st property defines where the driver's endpoint is;
2nd property defines where the mapping csi-driver-adaptor's address is.
What's more, an optional csi-driver-adaptor class can be defined
for each csi-driver:
"yarn.nodemanager.csi-driver.${NAME}.class"
once given, the adaptor will be initiated with the given class instead
of the default implementation
org.apache.hadoop.yarn.csi.adaptor.DefaultCsiAdaptorImpl. User can plug
customized adaptor code for csi-driver with this configuration
if necessary.
</description>
<name>yarn.nodemanager.csi-driver.names</name>
<value></value>

View File

@ -0,0 +1,73 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.csi.adaptor;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.api.CsiAdaptorPlugin;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Desired csi-adaptor implementation is configurable, default to
* CsiAdaptorProtocolService. If user wants to have a different implementation,
* just to configure a different class for the csi-driver.
*/
public final class CsiAdaptorFactory {
private static final Logger LOG =
LoggerFactory.getLogger(CsiAdaptorFactory.class);
private CsiAdaptorFactory() {
// hide constructor for this factory class.
}
/**
* Load csi-driver-adaptor from configuration. If the configuration is not
* specified, the default implementation
* for the adaptor is {@link DefaultCsiAdaptorImpl}. If the configured class
* is not a valid variation of {@link CsiAdaptorPlugin} or the class cannot
* be found, this function will throw a RuntimeException.
* @param driverName
* @param conf
* @return CsiAdaptorPlugin
* @throws YarnException if unable to create the adaptor class.
* @throws RuntimeException if given class is not found or not
* an instance of {@link CsiAdaptorPlugin}
*/
public static CsiAdaptorPlugin getAdaptor(String driverName,
Configuration conf) throws YarnException {
// load configuration
String configName = YarnConfiguration.NM_CSI_ADAPTOR_PREFIX
+ driverName + YarnConfiguration.NM_CSI_ADAPTOR_CLASS;
Class<? extends CsiAdaptorPlugin> impl = conf.getClass(configName,
DefaultCsiAdaptorImpl.class, CsiAdaptorPlugin.class);
if (impl == null) {
throw new YarnException("Unable to init csi-adaptor from the"
+ " class specified via " + configName);
}
// init the adaptor
CsiAdaptorPlugin instance = ReflectionUtils.newInstance(impl, conf);
LOG.info("csi-adaptor initiated, implementation: "
+ impl.getCanonicalName());
return instance;
}
}

View File

@ -17,11 +17,11 @@
*/
package org.apache.hadoop.yarn.csi.adaptor;
import com.google.common.annotations.VisibleForTesting;
import csi.v0.Csi;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.CsiAdaptorProtocol;
import org.apache.hadoop.yarn.api.CsiAdaptorPlugin;
import org.apache.hadoop.yarn.api.protocolrecords.GetPluginInfoRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetPluginInfoResponse;
import org.apache.hadoop.yarn.api.protocolrecords.NodePublishVolumeRequest;
@ -30,27 +30,20 @@ import org.apache.hadoop.yarn.api.protocolrecords.NodeUnpublishVolumeRequest;
import org.apache.hadoop.yarn.api.protocolrecords.NodeUnpublishVolumeResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesResponse;
import org.apache.hadoop.yarn.csi.client.CsiClient;
import org.apache.hadoop.yarn.csi.client.CsiClientImpl;
import org.apache.hadoop.yarn.csi.translator.ProtoTranslatorFactory;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
import org.apache.hadoop.yarn.server.api.AuxiliaryService;
import org.apache.hadoop.yarn.util.csi.CsiConfigUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
/**
* This is a Hadoop RPC server, we uses the Hadoop RPC framework here
* because we need to stick to the security model current Hadoop supports.
*/
public class CsiAdaptorProtocolService extends AuxiliaryService
public class CsiAdaptorProtocolService extends AbstractService
implements CsiAdaptorProtocol {
private static final Logger LOG =
@ -58,35 +51,17 @@ public class CsiAdaptorProtocolService extends AuxiliaryService
private Server server;
private InetSocketAddress adaptorServiceAddress;
private CsiClient csiClient;
private String csiDriverName;
private CsiAdaptorPlugin serverImpl;
public CsiAdaptorProtocolService() {
public CsiAdaptorProtocolService(CsiAdaptorPlugin adaptorImpl) {
super(CsiAdaptorProtocolService.class.getName());
// TODO read this from configuration
this.csiDriverName = "ch.ctrox.csi.s3-driver";
}
public CsiAdaptorProtocolService(String driverName,
String domainSocketPath) {
super(CsiAdaptorProtocolService.class.getName());
this.csiClient = new CsiClientImpl(domainSocketPath);
this.csiDriverName = driverName;
}
@VisibleForTesting
public void setCsiClient(CsiClient client) {
this.csiClient = client;
this.serverImpl = adaptorImpl;
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
String driverEndpoint = CsiConfigUtils
.getCsiDriverEndpoint(csiDriverName, conf);
this.csiClient = new CsiClientImpl(driverEndpoint);
adaptorServiceAddress = CsiConfigUtils
.getCsiAdaptorAddressForDriver(csiDriverName, conf);
.getCsiAdaptorAddressForDriver(serverImpl.getDriverName(), conf);
super.serviceInit(conf);
}
@ -96,7 +71,7 @@ public class CsiAdaptorProtocolService extends AuxiliaryService
YarnRPC rpc = YarnRPC.create(conf);
this.server = rpc.getServer(
CsiAdaptorProtocol.class,
this, adaptorServiceAddress, conf, null, 1);
serverImpl, adaptorServiceAddress, conf, null, 1);
this.server.start();
LOG.info("{} started, listening on address: {}",
CsiAdaptorProtocolService.class.getName(),
@ -115,76 +90,25 @@ public class CsiAdaptorProtocolService extends AuxiliaryService
@Override
public GetPluginInfoResponse getPluginInfo(
GetPluginInfoRequest request) throws YarnException, IOException {
Csi.GetPluginInfoResponse response = csiClient.getPluginInfo();
return ProtoTranslatorFactory.getTranslator(
GetPluginInfoResponse.class, Csi.GetPluginInfoResponse.class)
.convertFrom(response);
return serverImpl.getPluginInfo(request);
}
@Override
public ValidateVolumeCapabilitiesResponse validateVolumeCapacity(
ValidateVolumeCapabilitiesRequest request) throws YarnException,
IOException {
Csi.ValidateVolumeCapabilitiesRequest req = ProtoTranslatorFactory
.getTranslator(ValidateVolumeCapabilitiesRequest.class,
Csi.ValidateVolumeCapabilitiesRequest.class)
.convertTo(request);
Csi.ValidateVolumeCapabilitiesResponse response =
csiClient.validateVolumeCapabilities(req);
return ProtoTranslatorFactory.getTranslator(
ValidateVolumeCapabilitiesResponse.class,
Csi.ValidateVolumeCapabilitiesResponse.class)
.convertFrom(response);
return serverImpl.validateVolumeCapacity(request);
}
@Override
public NodePublishVolumeResponse nodePublishVolume(
NodePublishVolumeRequest request) throws YarnException, IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Received nodePublishVolume call, request: {}",
request.toString());
}
Csi.NodePublishVolumeRequest req = ProtoTranslatorFactory
.getTranslator(NodePublishVolumeRequest.class,
Csi.NodePublishVolumeRequest.class).convertTo(request);
if (LOG.isDebugEnabled()) {
LOG.debug("Translate to CSI proto message: {}", req.toString());
}
csiClient.nodePublishVolume(req);
return NodePublishVolumeResponse.newInstance();
return serverImpl.nodePublishVolume(request);
}
@Override
public NodeUnpublishVolumeResponse nodeUnpublishVolume(
NodeUnpublishVolumeRequest request) throws YarnException, IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Received nodeUnpublishVolume call, request: {}",
request.toString());
}
Csi.NodeUnpublishVolumeRequest req = ProtoTranslatorFactory
.getTranslator(NodeUnpublishVolumeRequest.class,
Csi.NodeUnpublishVolumeRequest.class).convertTo(request);
if (LOG.isDebugEnabled()) {
LOG.debug("Translate to CSI proto message: {}", req.toString());
}
csiClient.nodeUnpublishVolume(req);
return NodeUnpublishVolumeResponse.newInstance();
}
@Override
public void initializeApplication(
ApplicationInitializationContext initAppContext) {
// do nothing
}
@Override
public void stopApplication(
ApplicationTerminationContext stopAppContext) {
// do nothing
}
@Override
public ByteBuffer getMetaData() {
return ByteBuffer.allocate(0);
return serverImpl.nodeUnpublishVolume(request);
}
}

View File

@ -0,0 +1,108 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.csi.adaptor;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.CsiAdaptorPlugin;
import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
import org.apache.hadoop.yarn.server.api.AuxiliaryService;
import org.apache.hadoop.yarn.util.csi.CsiConfigUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
/**
* NM manages csi-adaptors as a single NM AUX service, this service
* manages a set of rpc services and each of them serves one particular
* csi-driver. It loads all available drivers from configuration, and
* find a csi-driver-adaptor implementation class for each of them. At last
* it brings up all of them as a composite service.
*/
public class CsiAdaptorServices extends AuxiliaryService {
private static final Logger LOG =
LoggerFactory.getLogger(CsiAdaptorServices.class);
private List<CsiAdaptorProtocolService> serviceList;
protected CsiAdaptorServices() {
super(CsiAdaptorServices.class.getName());
serviceList = new ArrayList<>();
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
// load configuration and init adaptors
String[] names = CsiConfigUtils.getCsiDriverNames(conf);
if (names != null && names.length > 0) {
for (String driverName : names) {
LOG.info("Adding csi-driver-adaptor for csi-driver {}", driverName);
CsiAdaptorPlugin serviceImpl = CsiAdaptorFactory
.getAdaptor(driverName, conf);
serviceImpl.init(driverName, conf);
CsiAdaptorProtocolService service =
new CsiAdaptorProtocolService(serviceImpl);
serviceList.add(service);
service.serviceInit(conf);
}
}
super.serviceInit(conf);
}
@Override
protected void serviceStop() throws Exception {
if (serviceList != null && serviceList.size() > 0) {
for (CsiAdaptorProtocolService service : serviceList) {
try {
service.serviceStop();
} catch (Exception e) {
LOG.warn("Unable to stop service " + service.getName(), e);
}
}
}
}
@Override
protected void serviceStart() throws Exception {
if (serviceList != null && serviceList.size() > 0) {
for (CsiAdaptorProtocolService service : serviceList) {
service.serviceStart();
}
}
}
@Override
public void initializeApplication(
ApplicationInitializationContext initAppContext) {
// do nothing
}
@Override
public void stopApplication(
ApplicationTerminationContext stopAppContext) {
// do nothing
}
@Override
public ByteBuffer getMetaData() {
return ByteBuffer.allocate(0);
}
}

View File

@ -0,0 +1,132 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.csi.adaptor;
import csi.v0.Csi;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.CsiAdaptorPlugin;
import org.apache.hadoop.yarn.api.protocolrecords.GetPluginInfoRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetPluginInfoResponse;
import org.apache.hadoop.yarn.api.protocolrecords.NodePublishVolumeRequest;
import org.apache.hadoop.yarn.api.protocolrecords.NodePublishVolumeResponse;
import org.apache.hadoop.yarn.api.protocolrecords.NodeUnpublishVolumeRequest;
import org.apache.hadoop.yarn.api.protocolrecords.NodeUnpublishVolumeResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesResponse;
import org.apache.hadoop.yarn.csi.client.CsiClient;
import org.apache.hadoop.yarn.csi.client.CsiClientImpl;
import org.apache.hadoop.yarn.csi.translator.ProtoTranslatorFactory;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.csi.CsiConfigUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
/**
* The default implementation of csi-driver-adaptor service.
*/
public class DefaultCsiAdaptorImpl implements CsiAdaptorPlugin {
private static final Logger LOG =
LoggerFactory.getLogger(DefaultCsiAdaptorImpl.class);
private String driverName;
private CsiClient csiClient;
public DefaultCsiAdaptorImpl() {
// use default constructor for reflection
}
@Override
public void init(String driverName, Configuration conf)
throws YarnException {
// if the driver end point is invalid, following code will fail.
String driverEndpoint = CsiConfigUtils
.getCsiDriverEndpoint(driverName, conf);
LOG.info("This csi-adaptor is configured to contact with"
+ " the csi-driver {} via gRPC endpoint: {}",
driverName, driverEndpoint);
this.csiClient = new CsiClientImpl(driverEndpoint);
this.driverName = driverName;
}
@Override
public String getDriverName() {
return driverName;
}
@Override
public GetPluginInfoResponse getPluginInfo(
GetPluginInfoRequest request) throws YarnException, IOException {
Csi.GetPluginInfoResponse response = csiClient.getPluginInfo();
return ProtoTranslatorFactory.getTranslator(
GetPluginInfoResponse.class, Csi.GetPluginInfoResponse.class)
.convertFrom(response);
}
@Override
public ValidateVolumeCapabilitiesResponse validateVolumeCapacity(
ValidateVolumeCapabilitiesRequest request) throws YarnException,
IOException {
Csi.ValidateVolumeCapabilitiesRequest req = ProtoTranslatorFactory
.getTranslator(ValidateVolumeCapabilitiesRequest.class,
Csi.ValidateVolumeCapabilitiesRequest.class)
.convertTo(request);
Csi.ValidateVolumeCapabilitiesResponse response =
csiClient.validateVolumeCapabilities(req);
return ProtoTranslatorFactory.getTranslator(
ValidateVolumeCapabilitiesResponse.class,
Csi.ValidateVolumeCapabilitiesResponse.class)
.convertFrom(response);
}
@Override
public NodePublishVolumeResponse nodePublishVolume(
NodePublishVolumeRequest request) throws YarnException, IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Received nodePublishVolume call, request: {}",
request.toString());
}
Csi.NodePublishVolumeRequest req = ProtoTranslatorFactory
.getTranslator(NodePublishVolumeRequest.class,
Csi.NodePublishVolumeRequest.class).convertTo(request);
if (LOG.isDebugEnabled()) {
LOG.debug("Translate to CSI proto message: {}", req.toString());
}
csiClient.nodePublishVolume(req);
return NodePublishVolumeResponse.newInstance();
}
@Override
public NodeUnpublishVolumeResponse nodeUnpublishVolume(
NodeUnpublishVolumeRequest request) throws YarnException, IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Received nodeUnpublishVolume call, request: {}",
request.toString());
}
Csi.NodeUnpublishVolumeRequest req = ProtoTranslatorFactory
.getTranslator(NodeUnpublishVolumeRequest.class,
Csi.NodeUnpublishVolumeRequest.class).convertTo(request);
if (LOG.isDebugEnabled()) {
LOG.debug("Translate to CSI proto message: {}", req.toString());
}
csiClient.nodeUnpublishVolume(req);
return NodeUnpublishVolumeResponse.newInstance();
}
}

View File

@ -0,0 +1,85 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.csi.adaptor;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.CsiAdaptorPlugin;
import org.apache.hadoop.yarn.api.protocolrecords.GetPluginInfoRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetPluginInfoResponse;
import org.apache.hadoop.yarn.api.protocolrecords.NodePublishVolumeRequest;
import org.apache.hadoop.yarn.api.protocolrecords.NodePublishVolumeResponse;
import org.apache.hadoop.yarn.api.protocolrecords.NodeUnpublishVolumeRequest;
import org.apache.hadoop.yarn.api.protocolrecords.NodeUnpublishVolumeResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesResponse;
import org.apache.hadoop.yarn.exceptions.YarnException;
import java.io.IOException;
/**
* This class is used by {@link TestCsiAdaptorService} for testing.
* It gives some dummy implementation for a adaptor plugin, and used to
* verify the plugin can be properly loaded by NM and execution logic is
* as expected.
*
* This is created as a separated class instead of an inner class, because
* {@link CsiAdaptorServices} is loading classes using conf.getClass(),
* the utility class is unable to resolve inner classes.
*/
public class MockCsiAdaptor implements CsiAdaptorPlugin {
private String driverName;
@Override
public void init(String driverName, Configuration conf)
throws YarnException {
this.driverName = driverName;
}
@Override
public String getDriverName() {
return this.driverName;
}
@Override
public GetPluginInfoResponse getPluginInfo(
GetPluginInfoRequest request) throws YarnException, IOException {
return GetPluginInfoResponse.newInstance(driverName,
"1.0");
}
@Override
public ValidateVolumeCapabilitiesResponse validateVolumeCapacity(
ValidateVolumeCapabilitiesRequest request)
throws YarnException, IOException {
return ValidateVolumeCapabilitiesResponse.newInstance(true,
"verified via MockCsiAdaptor");
}
@Override
public NodePublishVolumeResponse nodePublishVolume(
NodePublishVolumeRequest request) throws YarnException, IOException {
return null;
}
@Override
public NodeUnpublishVolumeResponse nodeUnpublishVolume(
NodeUnpublishVolumeRequest request) throws YarnException, IOException {
return null;
}
}

View File

@ -27,13 +27,19 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.ServiceStateException;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.api.CsiAdaptorProtocol;
import org.apache.hadoop.yarn.api.CsiAdaptorPlugin;
import org.apache.hadoop.yarn.api.impl.pb.client.CsiAdaptorProtocolPBClientImpl;
import org.apache.hadoop.yarn.api.protocolrecords.GetPluginInfoRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetPluginInfoResponse;
import org.apache.hadoop.yarn.api.protocolrecords.NodePublishVolumeRequest;
import org.apache.hadoop.yarn.api.protocolrecords.NodePublishVolumeResponse;
import org.apache.hadoop.yarn.api.protocolrecords.NodeUnpublishVolumeRequest;
import org.apache.hadoop.yarn.api.protocolrecords.NodeUnpublishVolumeResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ValidateVolumeCapabilitiesRequestPBImpl;
import org.apache.hadoop.yarn.client.NMProxy;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.csi.client.ICsiClientTest;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.junit.AfterClass;
@ -72,6 +78,39 @@ public class TestCsiAdaptorService {
}
}
private interface FakeCsiAdaptor extends CsiAdaptorPlugin {
default void init(String driverName, Configuration conf)
throws YarnException {
return;
}
default String getDriverName() {
return null;
}
default GetPluginInfoResponse getPluginInfo(GetPluginInfoRequest request)
throws YarnException, IOException {
return null;
}
default ValidateVolumeCapabilitiesResponse validateVolumeCapacity(
ValidateVolumeCapabilitiesRequest request) throws YarnException,
IOException {
return null;
}
default NodePublishVolumeResponse nodePublishVolume(
NodePublishVolumeRequest request) throws YarnException, IOException {
return null;
}
default NodeUnpublishVolumeResponse nodeUnpublishVolume(
NodeUnpublishVolumeRequest request) throws YarnException, IOException{
return null;
}
}
@Test
public void testValidateVolume() throws IOException, YarnException {
ServerSocket ss = new ServerSocket(0);
@ -83,50 +122,52 @@ public class TestCsiAdaptorService {
address);
conf.set(
YarnConfiguration.NM_CSI_DRIVER_PREFIX + "test-driver.endpoint",
"unix:///tmp/test-driver.scok");
CsiAdaptorProtocolService service =
new CsiAdaptorProtocolService("test-driver", domainSocket);
service.init(conf);
service.start();
"unix:///tmp/test-driver.sock");
// inject a fake CSI client
// inject a fake CSI adaptor
// this client validates if the ValidateVolumeCapabilitiesRequest
// is integrity, and then reply a fake response
service.setCsiClient(new ICsiClientTest() {
CsiAdaptorPlugin plugin = new FakeCsiAdaptor() {
@Override
public Csi.GetPluginInfoResponse getPluginInfo() {
return Csi.GetPluginInfoResponse.newBuilder()
.setName("test-plugin")
.setVendorVersion("0.1")
.build();
public String getDriverName() {
return "test-driver";
}
@Override
public GetPluginInfoResponse getPluginInfo(GetPluginInfoRequest request) {
return GetPluginInfoResponse.newInstance("test-plugin", "0.1");
}
@Override
public Csi.ValidateVolumeCapabilitiesResponse validateVolumeCapabilities(
Csi.ValidateVolumeCapabilitiesRequest request) {
public ValidateVolumeCapabilitiesResponse validateVolumeCapacity(
ValidateVolumeCapabilitiesRequest request) throws YarnException,
IOException {
// validate we get all info from the request
Assert.assertEquals("volume-id-0000123", request.getVolumeId());
Assert.assertEquals(1, request.getVolumeCapabilitiesCount());
Assert.assertEquals(1, request.getVolumeCapabilities().size());
Assert.assertEquals(Csi.VolumeCapability.AccessMode
.newBuilder().setModeValue(5).build(),
request.getVolumeCapabilities(0).getAccessMode());
Assert.assertTrue(request.getVolumeCapabilities(0).hasMount());
Assert.assertEquals(2, request.getVolumeCapabilities(0)
.getMount().getMountFlagsCount());
Assert.assertTrue(request.getVolumeCapabilities(0)
.getMount().getMountFlagsList().contains("mountFlag1"));
Assert.assertTrue(request.getVolumeCapabilities(0)
.getMount().getMountFlagsList().contains("mountFlag2"));
Assert.assertEquals(2, request.getVolumeAttributesCount());
Assert.assertEquals("v1", request.getVolumeAttributesMap().get("k1"));
Assert.assertEquals("v2", request.getVolumeAttributesMap().get("k2"));
.newBuilder().setModeValue(5).build().getMode().name(),
request.getVolumeCapabilities().get(0).getAccessMode().name());
Assert.assertEquals(2, request.getVolumeCapabilities().get(0)
.getMountFlags().size());
Assert.assertTrue(request.getVolumeCapabilities().get(0)
.getMountFlags().contains("mountFlag1"));
Assert.assertTrue(request.getVolumeCapabilities().get(0)
.getMountFlags().contains("mountFlag2"));
Assert.assertEquals(2, request.getVolumeAttributes().size());
Assert.assertEquals("v1", request.getVolumeAttributes().get("k1"));
Assert.assertEquals("v2", request.getVolumeAttributes().get("k2"));
// return a fake result
return Csi.ValidateVolumeCapabilitiesResponse.newBuilder()
.setSupported(false)
.setMessage("this is a test")
.build();
return ValidateVolumeCapabilitiesResponse
.newInstance(false, "this is a test");
}
});
};
CsiAdaptorProtocolService service =
new CsiAdaptorProtocolService(plugin);
service.init(conf);
service.start();
try (CsiAdaptorProtocolPBClientImpl client =
new CsiAdaptorProtocolPBClientImpl(1L, address, new Configuration())) {
@ -161,50 +202,53 @@ public class TestCsiAdaptorService {
address);
conf.set(
YarnConfiguration.NM_CSI_DRIVER_PREFIX + "test-driver.endpoint",
"unix:///tmp/test-driver.scok");
CsiAdaptorProtocolService service =
new CsiAdaptorProtocolService("test-driver", domainSocket);
service.init(conf);
service.start();
"unix:///tmp/test-driver.sock");
// inject a fake CSI client
// inject a fake CSI adaptor
// this client validates if the ValidateVolumeCapabilitiesRequest
// is integrity, and then reply a fake response
service.setCsiClient(new ICsiClientTest() {
FakeCsiAdaptor plugin = new FakeCsiAdaptor() {
@Override
public Csi.GetPluginInfoResponse getPluginInfo() {
return Csi.GetPluginInfoResponse.newBuilder()
.setName("test-plugin")
.setVendorVersion("0.1")
.build();
public String getDriverName() {
return "test-driver";
}
@Override
public Csi.ValidateVolumeCapabilitiesResponse validateVolumeCapabilities(
Csi.ValidateVolumeCapabilitiesRequest request) {
public GetPluginInfoResponse getPluginInfo(
GetPluginInfoRequest request) throws YarnException, IOException {
return GetPluginInfoResponse.newInstance("test-plugin", "0.1");
}
@Override
public ValidateVolumeCapabilitiesResponse validateVolumeCapacity(
ValidateVolumeCapabilitiesRequest request)
throws YarnException, IOException {
// validate we get all info from the request
Assert.assertEquals("volume-id-0000123", request.getVolumeId());
Assert.assertEquals(1, request.getVolumeCapabilitiesCount());
Assert.assertEquals(Csi.VolumeCapability.AccessMode
.newBuilder().setModeValue(5).build(),
request.getVolumeCapabilities(0).getAccessMode());
Assert.assertTrue(request.getVolumeCapabilities(0).hasMount());
Assert.assertEquals(2, request.getVolumeCapabilities(0)
.getMount().getMountFlagsCount());
Assert.assertTrue(request.getVolumeCapabilities(0)
.getMount().getMountFlagsList().contains("mountFlag1"));
Assert.assertTrue(request.getVolumeCapabilities(0)
.getMount().getMountFlagsList().contains("mountFlag2"));
Assert.assertEquals(2, request.getVolumeAttributesCount());
Assert.assertEquals("v1", request.getVolumeAttributesMap().get("k1"));
Assert.assertEquals("v2", request.getVolumeAttributesMap().get("k2"));
Assert.assertEquals(1, request.getVolumeCapabilities().size());
Assert.assertEquals(
Csi.VolumeCapability.AccessMode.newBuilder().setModeValue(5)
.build().getMode().name(),
request.getVolumeCapabilities().get(0).getAccessMode().name());
Assert.assertEquals(2,
request.getVolumeCapabilities().get(0).getMountFlags().size());
Assert.assertTrue(request.getVolumeCapabilities().get(0).getMountFlags()
.contains("mountFlag1"));
Assert.assertTrue(request.getVolumeCapabilities().get(0).getMountFlags()
.contains("mountFlag2"));
Assert.assertEquals(2, request.getVolumeAttributes().size());
Assert.assertEquals("v1", request.getVolumeAttributes().get("k1"));
Assert.assertEquals("v2", request.getVolumeAttributes().get("k2"));
// return a fake result
return Csi.ValidateVolumeCapabilitiesResponse.newBuilder()
.setSupported(false)
.setMessage("this is a test")
.build();
return ValidateVolumeCapabilitiesResponse
.newInstance(false, "this is a test");
}
});
};
CsiAdaptorProtocolService service =
new CsiAdaptorProtocolService(plugin);
service.init(conf);
service.start();
YarnRPC rpc = YarnRPC.create(conf);
UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
@ -232,7 +276,7 @@ public class TestCsiAdaptorService {
public void testMissingConfiguration() {
Configuration conf = new Configuration();
CsiAdaptorProtocolService service =
new CsiAdaptorProtocolService("test-driver", domainSocket);
new CsiAdaptorProtocolService(new FakeCsiAdaptor() {});
service.init(conf);
}
@ -243,7 +287,7 @@ public class TestCsiAdaptorService {
+ "test-driver-0001.address",
"0.0.0.0:-100"); // this is an invalid address
CsiAdaptorProtocolService service =
new CsiAdaptorProtocolService("test-driver-0001", domainSocket);
new CsiAdaptorProtocolService(new FakeCsiAdaptor() {});
service.init(conf);
}
@ -254,7 +298,151 @@ public class TestCsiAdaptorService {
+ "test-driver-0001.address",
"192.0.1:8999"); // this is an invalid ip address
CsiAdaptorProtocolService service =
new CsiAdaptorProtocolService("test-driver-0001", domainSocket);
new CsiAdaptorProtocolService(new FakeCsiAdaptor() {});
service.init(conf);
}
@Test
public void testCustomizedAdaptor() throws IOException, YarnException {
ServerSocket ss = new ServerSocket(0);
ss.close();
InetSocketAddress address = new InetSocketAddress(ss.getLocalPort());
Configuration conf = new Configuration();
conf.set(YarnConfiguration.NM_CSI_DRIVER_NAMES, "customized-driver");
conf.setSocketAddr(
YarnConfiguration.NM_CSI_ADAPTOR_PREFIX + "customized-driver.address",
address);
conf.set(
YarnConfiguration.NM_CSI_ADAPTOR_PREFIX + "customized-driver.class",
"org.apache.hadoop.yarn.csi.adaptor.MockCsiAdaptor");
conf.set(
YarnConfiguration.NM_CSI_DRIVER_PREFIX + "customized-driver.endpoint",
"unix:///tmp/customized-driver.sock");
CsiAdaptorServices services =
new CsiAdaptorServices();
services.init(conf);
services.start();
YarnRPC rpc = YarnRPC.create(conf);
UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
CsiAdaptorProtocol adaptorClient = NMProxy
.createNMProxy(conf, CsiAdaptorProtocol.class, currentUser, rpc,
NetUtils.createSocketAddrForHost("localhost", ss.getLocalPort()));
// Test getPluginInfo
GetPluginInfoResponse pluginInfo =
adaptorClient.getPluginInfo(GetPluginInfoRequest.newInstance());
Assert.assertEquals(pluginInfo.getDriverName(), "customized-driver");
Assert.assertEquals(pluginInfo.getVersion(), "1.0");
// Test validateVolumeCapacity
ValidateVolumeCapabilitiesRequest request =
ValidateVolumeCapabilitiesRequestPBImpl
.newInstance("volume-id-0000123",
ImmutableList.of(new ValidateVolumeCapabilitiesRequest
.VolumeCapability(
MULTI_NODE_MULTI_WRITER, FILE_SYSTEM,
ImmutableList.of("mountFlag1", "mountFlag2"))),
ImmutableMap.of("k1", "v1", "k2", "v2"));
ValidateVolumeCapabilitiesResponse response = adaptorClient
.validateVolumeCapacity(request);
Assert.assertEquals(true, response.isSupported());
Assert.assertEquals("verified via MockCsiAdaptor",
response.getResponseMessage());
services.stop();
}
@Test
public void testMultipleCsiAdaptors() throws IOException, YarnException {
ServerSocket driver1Addr = new ServerSocket(0);
ServerSocket driver2Addr = new ServerSocket(0);
InetSocketAddress address1 =
new InetSocketAddress(driver1Addr.getLocalPort());
InetSocketAddress address2 =
new InetSocketAddress(driver2Addr.getLocalPort());
Configuration conf = new Configuration();
// Two csi-drivers configured
conf.set(YarnConfiguration.NM_CSI_DRIVER_NAMES,
"customized-driver-1,customized-driver-2");
// customized-driver-1
conf.setSocketAddr(YarnConfiguration.NM_CSI_ADAPTOR_PREFIX
+ "customized-driver-1.address", address1);
conf.set(YarnConfiguration.NM_CSI_ADAPTOR_PREFIX
+ "customized-driver-1.class",
"org.apache.hadoop.yarn.csi.adaptor.MockCsiAdaptor");
conf.set(YarnConfiguration.NM_CSI_DRIVER_PREFIX
+ "customized-driver-1.endpoint",
"unix:///tmp/customized-driver-1.sock");
// customized-driver-2
conf.setSocketAddr(YarnConfiguration.NM_CSI_ADAPTOR_PREFIX
+ "customized-driver-2.address", address2);
conf.set(YarnConfiguration.NM_CSI_ADAPTOR_PREFIX
+ "customized-driver-2.class",
"org.apache.hadoop.yarn.csi.adaptor.MockCsiAdaptor");
conf.set(YarnConfiguration.NM_CSI_DRIVER_PREFIX
+ "customized-driver-2.endpoint",
"unix:///tmp/customized-driver-2.sock");
driver1Addr.close();
driver2Addr.close();
CsiAdaptorServices services =
new CsiAdaptorServices();
services.init(conf);
services.start();
YarnRPC rpc = YarnRPC.create(conf);
UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
CsiAdaptorProtocol client1 = NMProxy
.createNMProxy(conf, CsiAdaptorProtocol.class, currentUser, rpc,
NetUtils.createSocketAddrForHost("localhost",
driver1Addr.getLocalPort()));
// ***************************************************
// Verify talking with customized-driver-1
// ***************************************************
// Test getPluginInfo
GetPluginInfoResponse pluginInfo =
client1.getPluginInfo(GetPluginInfoRequest.newInstance());
Assert.assertEquals(pluginInfo.getDriverName(), "customized-driver-1");
Assert.assertEquals(pluginInfo.getVersion(), "1.0");
// Test validateVolumeCapacity
ValidateVolumeCapabilitiesRequest request =
ValidateVolumeCapabilitiesRequestPBImpl
.newInstance("driver-1-volume-00001",
ImmutableList.of(new ValidateVolumeCapabilitiesRequest
.VolumeCapability(
MULTI_NODE_MULTI_WRITER, FILE_SYSTEM,
ImmutableList.of())), ImmutableMap.of());
ValidateVolumeCapabilitiesResponse response = client1
.validateVolumeCapacity(request);
Assert.assertEquals(true, response.isSupported());
Assert.assertEquals("verified via MockCsiAdaptor",
response.getResponseMessage());
// ***************************************************
// Verify talking with customized-driver-2
// ***************************************************
CsiAdaptorProtocol client2 = NMProxy
.createNMProxy(conf, CsiAdaptorProtocol.class, currentUser, rpc,
NetUtils.createSocketAddrForHost("localhost",
driver2Addr.getLocalPort()));
GetPluginInfoResponse pluginInfo2 =
client2.getPluginInfo(GetPluginInfoRequest.newInstance());
Assert.assertEquals(pluginInfo2.getDriverName(), "customized-driver-2");
Assert.assertEquals(pluginInfo2.getVersion(), "1.0");
services.stop();
}
}