diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/CsiAdaptorPlugin.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/CsiAdaptorPlugin.java
new file mode 100644
index 00000000000..26b45f7c31f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/CsiAdaptorPlugin.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.api;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+/**
+ * csi-adaptor is a plugin, user can provide customized implementation
+ * according to this interface. NM will init and load this into a NM aux
+ * service, and it can run multiple csi-adaptor servers.
+ *
+ * User needs to implement all the methods defined in
+ * {@link CsiAdaptorProtocol}, and plus the methods in this interface.
+ */
+public interface CsiAdaptorPlugin extends CsiAdaptorProtocol {
+
+ /**
+ * A csi-adaptor implementation can init its state within this function.
+ * Configuration is available so the implementation can retrieve some
+ * customized configuration from yarn-site.xml.
+ * @param driverName the name of the csi-driver.
+ * @param conf configuration.
+ * @throws YarnException
+ */
+ void init(String driverName, Configuration conf) throws YarnException;
+
+ /**
+ * Returns the driver name of the csi-driver this adaptor works with.
+ * The name should be consistent on all the places being used, ideally
+ * it should come from the value when init is done.
+ * @return the name of the csi-driver that this adaptor works with.
+ */
+ String getDriverName();
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 2be73e1d966..de66e7525e1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -3489,6 +3489,8 @@ public class YarnConfiguration extends Configuration {
".endpoint";
public static final String NM_CSI_ADAPTOR_ADDRESS_SUFFIX =
".address";
+ public static final String NM_CSI_ADAPTOR_CLASS =
+ ".class";
/**
* One or more socket addresses for csi-adaptor.
* Multiple addresses are delimited by ",".
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 2508c48d1ce..45894e9461f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -4096,12 +4096,20 @@
CSI driver names running on this node, multiple driver names need to
be delimited by comma. The driver name should be same value returned
- by the getPluginInfo call. For each of the CSI driver name, it must
+ by the getPluginInfo call.For each of the CSI driver name, it must
to define following two corresponding properties:
"yarn.nodemanager.csi-driver.${NAME}.endpoint"
"yarn.nodemanager.csi-driver-adaptor.${NAME}.address"
The 1st property defines where the driver's endpoint is;
2nd property defines where the mapping csi-driver-adaptor's address is.
+ What's more, an optional csi-driver-adaptor class can be defined
+ for each csi-driver:
+ "yarn.nodemanager.csi-driver.${NAME}.class"
+ once given, the adaptor will be initiated with the given class instead
+ of the default implementation
+ org.apache.hadoop.yarn.csi.adaptor.DefaultCsiAdaptorImpl. User can plug
+ customized adaptor code for csi-driver with this configuration
+ if necessary.
yarn.nodemanager.csi-driver.names
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/adaptor/CsiAdaptorFactory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/adaptor/CsiAdaptorFactory.java
new file mode 100644
index 00000000000..955c36f81f8
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/adaptor/CsiAdaptorFactory.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.csi.adaptor;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.yarn.api.CsiAdaptorPlugin;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Desired csi-adaptor implementation is configurable, default to
+ * CsiAdaptorProtocolService. If user wants to have a different implementation,
+ * just to configure a different class for the csi-driver.
+ */
+public final class CsiAdaptorFactory {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(CsiAdaptorFactory.class);
+
+ private CsiAdaptorFactory() {
+ // hide constructor for this factory class.
+ }
+
+ /**
+ * Load csi-driver-adaptor from configuration. If the configuration is not
+ * specified, the default implementation
+ * for the adaptor is {@link DefaultCsiAdaptorImpl}. If the configured class
+ * is not a valid variation of {@link CsiAdaptorPlugin} or the class cannot
+ * be found, this function will throw a RuntimeException.
+ * @param driverName
+ * @param conf
+ * @return CsiAdaptorPlugin
+ * @throws YarnException if unable to create the adaptor class.
+ * @throws RuntimeException if given class is not found or not
+ * an instance of {@link CsiAdaptorPlugin}
+ */
+ public static CsiAdaptorPlugin getAdaptor(String driverName,
+ Configuration conf) throws YarnException {
+ // load configuration
+ String configName = YarnConfiguration.NM_CSI_ADAPTOR_PREFIX
+ + driverName + YarnConfiguration.NM_CSI_ADAPTOR_CLASS;
+ Class extends CsiAdaptorPlugin> impl = conf.getClass(configName,
+ DefaultCsiAdaptorImpl.class, CsiAdaptorPlugin.class);
+ if (impl == null) {
+ throw new YarnException("Unable to init csi-adaptor from the"
+ + " class specified via " + configName);
+ }
+
+ // init the adaptor
+ CsiAdaptorPlugin instance = ReflectionUtils.newInstance(impl, conf);
+ LOG.info("csi-adaptor initiated, implementation: "
+ + impl.getCanonicalName());
+ return instance;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/adaptor/CsiAdaptorProtocolService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/adaptor/CsiAdaptorProtocolService.java
index 7020f061f99..300c5bcaa26 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/adaptor/CsiAdaptorProtocolService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/adaptor/CsiAdaptorProtocolService.java
@@ -17,11 +17,11 @@
*/
package org.apache.hadoop.yarn.csi.adaptor;
-import com.google.common.annotations.VisibleForTesting;
-import csi.v0.Csi;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.CsiAdaptorProtocol;
+import org.apache.hadoop.yarn.api.CsiAdaptorPlugin;
import org.apache.hadoop.yarn.api.protocolrecords.GetPluginInfoRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetPluginInfoResponse;
import org.apache.hadoop.yarn.api.protocolrecords.NodePublishVolumeRequest;
@@ -30,27 +30,20 @@ import org.apache.hadoop.yarn.api.protocolrecords.NodeUnpublishVolumeRequest;
import org.apache.hadoop.yarn.api.protocolrecords.NodeUnpublishVolumeResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesResponse;
-import org.apache.hadoop.yarn.csi.client.CsiClient;
-import org.apache.hadoop.yarn.csi.client.CsiClientImpl;
-import org.apache.hadoop.yarn.csi.translator.ProtoTranslatorFactory;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
-import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
-import org.apache.hadoop.yarn.server.api.AuxiliaryService;
import org.apache.hadoop.yarn.util.csi.CsiConfigUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
/**
* This is a Hadoop RPC server, we uses the Hadoop RPC framework here
* because we need to stick to the security model current Hadoop supports.
*/
-public class CsiAdaptorProtocolService extends AuxiliaryService
+public class CsiAdaptorProtocolService extends AbstractService
implements CsiAdaptorProtocol {
private static final Logger LOG =
@@ -58,35 +51,17 @@ public class CsiAdaptorProtocolService extends AuxiliaryService
private Server server;
private InetSocketAddress adaptorServiceAddress;
- private CsiClient csiClient;
- private String csiDriverName;
+ private CsiAdaptorPlugin serverImpl;
- public CsiAdaptorProtocolService() {
+ public CsiAdaptorProtocolService(CsiAdaptorPlugin adaptorImpl) {
super(CsiAdaptorProtocolService.class.getName());
- // TODO read this from configuration
- this.csiDriverName = "ch.ctrox.csi.s3-driver";
- }
-
- public CsiAdaptorProtocolService(String driverName,
- String domainSocketPath) {
- super(CsiAdaptorProtocolService.class.getName());
- this.csiClient = new CsiClientImpl(domainSocketPath);
- this.csiDriverName = driverName;
- }
-
- @VisibleForTesting
- public void setCsiClient(CsiClient client) {
- this.csiClient = client;
+ this.serverImpl = adaptorImpl;
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
-
- String driverEndpoint = CsiConfigUtils
- .getCsiDriverEndpoint(csiDriverName, conf);
- this.csiClient = new CsiClientImpl(driverEndpoint);
adaptorServiceAddress = CsiConfigUtils
- .getCsiAdaptorAddressForDriver(csiDriverName, conf);
+ .getCsiAdaptorAddressForDriver(serverImpl.getDriverName(), conf);
super.serviceInit(conf);
}
@@ -96,7 +71,7 @@ public class CsiAdaptorProtocolService extends AuxiliaryService
YarnRPC rpc = YarnRPC.create(conf);
this.server = rpc.getServer(
CsiAdaptorProtocol.class,
- this, adaptorServiceAddress, conf, null, 1);
+ serverImpl, adaptorServiceAddress, conf, null, 1);
this.server.start();
LOG.info("{} started, listening on address: {}",
CsiAdaptorProtocolService.class.getName(),
@@ -115,76 +90,25 @@ public class CsiAdaptorProtocolService extends AuxiliaryService
@Override
public GetPluginInfoResponse getPluginInfo(
GetPluginInfoRequest request) throws YarnException, IOException {
- Csi.GetPluginInfoResponse response = csiClient.getPluginInfo();
- return ProtoTranslatorFactory.getTranslator(
- GetPluginInfoResponse.class, Csi.GetPluginInfoResponse.class)
- .convertFrom(response);
+ return serverImpl.getPluginInfo(request);
}
@Override
public ValidateVolumeCapabilitiesResponse validateVolumeCapacity(
ValidateVolumeCapabilitiesRequest request) throws YarnException,
IOException {
- Csi.ValidateVolumeCapabilitiesRequest req = ProtoTranslatorFactory
- .getTranslator(ValidateVolumeCapabilitiesRequest.class,
- Csi.ValidateVolumeCapabilitiesRequest.class)
- .convertTo(request);
- Csi.ValidateVolumeCapabilitiesResponse response =
- csiClient.validateVolumeCapabilities(req);
- return ProtoTranslatorFactory.getTranslator(
- ValidateVolumeCapabilitiesResponse.class,
- Csi.ValidateVolumeCapabilitiesResponse.class)
- .convertFrom(response);
+ return serverImpl.validateVolumeCapacity(request);
}
@Override
public NodePublishVolumeResponse nodePublishVolume(
NodePublishVolumeRequest request) throws YarnException, IOException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Received nodePublishVolume call, request: {}",
- request.toString());
- }
- Csi.NodePublishVolumeRequest req = ProtoTranslatorFactory
- .getTranslator(NodePublishVolumeRequest.class,
- Csi.NodePublishVolumeRequest.class).convertTo(request);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Translate to CSI proto message: {}", req.toString());
- }
- csiClient.nodePublishVolume(req);
- return NodePublishVolumeResponse.newInstance();
+ return serverImpl.nodePublishVolume(request);
}
@Override
public NodeUnpublishVolumeResponse nodeUnpublishVolume(
NodeUnpublishVolumeRequest request) throws YarnException, IOException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Received nodeUnpublishVolume call, request: {}",
- request.toString());
- }
- Csi.NodeUnpublishVolumeRequest req = ProtoTranslatorFactory
- .getTranslator(NodeUnpublishVolumeRequest.class,
- Csi.NodeUnpublishVolumeRequest.class).convertTo(request);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Translate to CSI proto message: {}", req.toString());
- }
- csiClient.nodeUnpublishVolume(req);
- return NodeUnpublishVolumeResponse.newInstance();
- }
-
- @Override
- public void initializeApplication(
- ApplicationInitializationContext initAppContext) {
- // do nothing
- }
-
- @Override
- public void stopApplication(
- ApplicationTerminationContext stopAppContext) {
- // do nothing
- }
-
- @Override
- public ByteBuffer getMetaData() {
- return ByteBuffer.allocate(0);
+ return serverImpl.nodeUnpublishVolume(request);
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/adaptor/CsiAdaptorServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/adaptor/CsiAdaptorServices.java
new file mode 100644
index 00000000000..78debbe5baa
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/adaptor/CsiAdaptorServices.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.csi.adaptor;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.CsiAdaptorPlugin;
+import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
+import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
+import org.apache.hadoop.yarn.server.api.AuxiliaryService;
+import org.apache.hadoop.yarn.util.csi.CsiConfigUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * NM manages csi-adaptors as a single NM AUX service, this service
+ * manages a set of rpc services and each of them serves one particular
+ * csi-driver. It loads all available drivers from configuration, and
+ * find a csi-driver-adaptor implementation class for each of them. At last
+ * it brings up all of them as a composite service.
+ */
+public class CsiAdaptorServices extends AuxiliaryService {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(CsiAdaptorServices.class);
+
+ private List serviceList;
+ protected CsiAdaptorServices() {
+ super(CsiAdaptorServices.class.getName());
+ serviceList = new ArrayList<>();
+ }
+
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ // load configuration and init adaptors
+ String[] names = CsiConfigUtils.getCsiDriverNames(conf);
+ if (names != null && names.length > 0) {
+ for (String driverName : names) {
+ LOG.info("Adding csi-driver-adaptor for csi-driver {}", driverName);
+ CsiAdaptorPlugin serviceImpl = CsiAdaptorFactory
+ .getAdaptor(driverName, conf);
+ serviceImpl.init(driverName, conf);
+ CsiAdaptorProtocolService service =
+ new CsiAdaptorProtocolService(serviceImpl);
+ serviceList.add(service);
+ service.serviceInit(conf);
+ }
+ }
+ super.serviceInit(conf);
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ if (serviceList != null && serviceList.size() > 0) {
+ for (CsiAdaptorProtocolService service : serviceList) {
+ try {
+ service.serviceStop();
+ } catch (Exception e) {
+ LOG.warn("Unable to stop service " + service.getName(), e);
+ }
+ }
+ }
+ }
+
+ @Override
+ protected void serviceStart() throws Exception {
+ if (serviceList != null && serviceList.size() > 0) {
+ for (CsiAdaptorProtocolService service : serviceList) {
+ service.serviceStart();
+ }
+ }
+ }
+
+ @Override
+ public void initializeApplication(
+ ApplicationInitializationContext initAppContext) {
+ // do nothing
+ }
+
+ @Override
+ public void stopApplication(
+ ApplicationTerminationContext stopAppContext) {
+ // do nothing
+ }
+
+ @Override
+ public ByteBuffer getMetaData() {
+ return ByteBuffer.allocate(0);
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/adaptor/DefaultCsiAdaptorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/adaptor/DefaultCsiAdaptorImpl.java
new file mode 100644
index 00000000000..a2035878bb7
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/adaptor/DefaultCsiAdaptorImpl.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.csi.adaptor;
+
+import csi.v0.Csi;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.CsiAdaptorPlugin;
+import org.apache.hadoop.yarn.api.protocolrecords.GetPluginInfoRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetPluginInfoResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.NodePublishVolumeRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.NodePublishVolumeResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.NodeUnpublishVolumeRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.NodeUnpublishVolumeResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesResponse;
+import org.apache.hadoop.yarn.csi.client.CsiClient;
+import org.apache.hadoop.yarn.csi.client.CsiClientImpl;
+import org.apache.hadoop.yarn.csi.translator.ProtoTranslatorFactory;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.csi.CsiConfigUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * The default implementation of csi-driver-adaptor service.
+ */
+public class DefaultCsiAdaptorImpl implements CsiAdaptorPlugin {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(DefaultCsiAdaptorImpl.class);
+
+ private String driverName;
+ private CsiClient csiClient;
+
+ public DefaultCsiAdaptorImpl() {
+ // use default constructor for reflection
+ }
+
+ @Override
+ public void init(String driverName, Configuration conf)
+ throws YarnException {
+ // if the driver end point is invalid, following code will fail.
+ String driverEndpoint = CsiConfigUtils
+ .getCsiDriverEndpoint(driverName, conf);
+ LOG.info("This csi-adaptor is configured to contact with"
+ + " the csi-driver {} via gRPC endpoint: {}",
+ driverName, driverEndpoint);
+ this.csiClient = new CsiClientImpl(driverEndpoint);
+ this.driverName = driverName;
+ }
+
+ @Override
+ public String getDriverName() {
+ return driverName;
+ }
+
+ @Override
+ public GetPluginInfoResponse getPluginInfo(
+ GetPluginInfoRequest request) throws YarnException, IOException {
+ Csi.GetPluginInfoResponse response = csiClient.getPluginInfo();
+ return ProtoTranslatorFactory.getTranslator(
+ GetPluginInfoResponse.class, Csi.GetPluginInfoResponse.class)
+ .convertFrom(response);
+ }
+
+ @Override
+ public ValidateVolumeCapabilitiesResponse validateVolumeCapacity(
+ ValidateVolumeCapabilitiesRequest request) throws YarnException,
+ IOException {
+ Csi.ValidateVolumeCapabilitiesRequest req = ProtoTranslatorFactory
+ .getTranslator(ValidateVolumeCapabilitiesRequest.class,
+ Csi.ValidateVolumeCapabilitiesRequest.class)
+ .convertTo(request);
+ Csi.ValidateVolumeCapabilitiesResponse response =
+ csiClient.validateVolumeCapabilities(req);
+ return ProtoTranslatorFactory.getTranslator(
+ ValidateVolumeCapabilitiesResponse.class,
+ Csi.ValidateVolumeCapabilitiesResponse.class)
+ .convertFrom(response);
+ }
+
+ @Override
+ public NodePublishVolumeResponse nodePublishVolume(
+ NodePublishVolumeRequest request) throws YarnException, IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Received nodePublishVolume call, request: {}",
+ request.toString());
+ }
+ Csi.NodePublishVolumeRequest req = ProtoTranslatorFactory
+ .getTranslator(NodePublishVolumeRequest.class,
+ Csi.NodePublishVolumeRequest.class).convertTo(request);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Translate to CSI proto message: {}", req.toString());
+ }
+ csiClient.nodePublishVolume(req);
+ return NodePublishVolumeResponse.newInstance();
+ }
+
+ @Override
+ public NodeUnpublishVolumeResponse nodeUnpublishVolume(
+ NodeUnpublishVolumeRequest request) throws YarnException, IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Received nodeUnpublishVolume call, request: {}",
+ request.toString());
+ }
+ Csi.NodeUnpublishVolumeRequest req = ProtoTranslatorFactory
+ .getTranslator(NodeUnpublishVolumeRequest.class,
+ Csi.NodeUnpublishVolumeRequest.class).convertTo(request);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Translate to CSI proto message: {}", req.toString());
+ }
+ csiClient.nodeUnpublishVolume(req);
+ return NodeUnpublishVolumeResponse.newInstance();
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/MockCsiAdaptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/MockCsiAdaptor.java
new file mode 100644
index 00000000000..4bcc5092861
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/MockCsiAdaptor.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.csi.adaptor;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.CsiAdaptorPlugin;
+import org.apache.hadoop.yarn.api.protocolrecords.GetPluginInfoRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetPluginInfoResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.NodePublishVolumeRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.NodePublishVolumeResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.NodeUnpublishVolumeRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.NodeUnpublishVolumeResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesResponse;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+import java.io.IOException;
+
+/**
+ * This class is used by {@link TestCsiAdaptorService} for testing.
+ * It gives some dummy implementation for a adaptor plugin, and used to
+ * verify the plugin can be properly loaded by NM and execution logic is
+ * as expected.
+ *
+ * This is created as a separated class instead of an inner class, because
+ * {@link CsiAdaptorServices} is loading classes using conf.getClass(),
+ * the utility class is unable to resolve inner classes.
+ */
+public class MockCsiAdaptor implements CsiAdaptorPlugin {
+
+ private String driverName;
+
+ @Override
+ public void init(String driverName, Configuration conf)
+ throws YarnException {
+ this.driverName = driverName;
+ }
+
+ @Override
+ public String getDriverName() {
+ return this.driverName;
+ }
+
+ @Override
+ public GetPluginInfoResponse getPluginInfo(
+ GetPluginInfoRequest request) throws YarnException, IOException {
+ return GetPluginInfoResponse.newInstance(driverName,
+ "1.0");
+ }
+
+ @Override
+ public ValidateVolumeCapabilitiesResponse validateVolumeCapacity(
+ ValidateVolumeCapabilitiesRequest request)
+ throws YarnException, IOException {
+ return ValidateVolumeCapabilitiesResponse.newInstance(true,
+ "verified via MockCsiAdaptor");
+ }
+
+ @Override
+ public NodePublishVolumeResponse nodePublishVolume(
+ NodePublishVolumeRequest request) throws YarnException, IOException {
+ return null;
+ }
+
+ @Override
+ public NodeUnpublishVolumeResponse nodeUnpublishVolume(
+ NodeUnpublishVolumeRequest request) throws YarnException, IOException {
+ return null;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/TestCsiAdaptorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/TestCsiAdaptorService.java
index d6ee231b0c1..c415ced7488 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/TestCsiAdaptorService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/TestCsiAdaptorService.java
@@ -27,13 +27,19 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.ServiceStateException;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.api.CsiAdaptorProtocol;
+import org.apache.hadoop.yarn.api.CsiAdaptorPlugin;
import org.apache.hadoop.yarn.api.impl.pb.client.CsiAdaptorProtocolPBClientImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.GetPluginInfoRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetPluginInfoResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.NodePublishVolumeRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.NodePublishVolumeResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.NodeUnpublishVolumeRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.NodeUnpublishVolumeResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ValidateVolumeCapabilitiesRequestPBImpl;
import org.apache.hadoop.yarn.client.NMProxy;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.csi.client.ICsiClientTest;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.junit.AfterClass;
@@ -72,6 +78,39 @@ public class TestCsiAdaptorService {
}
}
+ private interface FakeCsiAdaptor extends CsiAdaptorPlugin {
+
+ default void init(String driverName, Configuration conf)
+ throws YarnException {
+ return;
+ }
+
+ default String getDriverName() {
+ return null;
+ }
+
+ default GetPluginInfoResponse getPluginInfo(GetPluginInfoRequest request)
+ throws YarnException, IOException {
+ return null;
+ }
+
+ default ValidateVolumeCapabilitiesResponse validateVolumeCapacity(
+ ValidateVolumeCapabilitiesRequest request) throws YarnException,
+ IOException {
+ return null;
+ }
+
+ default NodePublishVolumeResponse nodePublishVolume(
+ NodePublishVolumeRequest request) throws YarnException, IOException {
+ return null;
+ }
+
+ default NodeUnpublishVolumeResponse nodeUnpublishVolume(
+ NodeUnpublishVolumeRequest request) throws YarnException, IOException{
+ return null;
+ }
+ }
+
@Test
public void testValidateVolume() throws IOException, YarnException {
ServerSocket ss = new ServerSocket(0);
@@ -83,50 +122,52 @@ public class TestCsiAdaptorService {
address);
conf.set(
YarnConfiguration.NM_CSI_DRIVER_PREFIX + "test-driver.endpoint",
- "unix:///tmp/test-driver.scok");
- CsiAdaptorProtocolService service =
- new CsiAdaptorProtocolService("test-driver", domainSocket);
- service.init(conf);
- service.start();
+ "unix:///tmp/test-driver.sock");
- // inject a fake CSI client
+ // inject a fake CSI adaptor
// this client validates if the ValidateVolumeCapabilitiesRequest
// is integrity, and then reply a fake response
- service.setCsiClient(new ICsiClientTest() {
+ CsiAdaptorPlugin plugin = new FakeCsiAdaptor() {
@Override
- public Csi.GetPluginInfoResponse getPluginInfo() {
- return Csi.GetPluginInfoResponse.newBuilder()
- .setName("test-plugin")
- .setVendorVersion("0.1")
- .build();
+ public String getDriverName() {
+ return "test-driver";
+ }
+
+
+ @Override
+ public GetPluginInfoResponse getPluginInfo(GetPluginInfoRequest request) {
+ return GetPluginInfoResponse.newInstance("test-plugin", "0.1");
}
@Override
- public Csi.ValidateVolumeCapabilitiesResponse validateVolumeCapabilities(
- Csi.ValidateVolumeCapabilitiesRequest request) {
+ public ValidateVolumeCapabilitiesResponse validateVolumeCapacity(
+ ValidateVolumeCapabilitiesRequest request) throws YarnException,
+ IOException {
// validate we get all info from the request
Assert.assertEquals("volume-id-0000123", request.getVolumeId());
- Assert.assertEquals(1, request.getVolumeCapabilitiesCount());
+ Assert.assertEquals(1, request.getVolumeCapabilities().size());
Assert.assertEquals(Csi.VolumeCapability.AccessMode
- .newBuilder().setModeValue(5).build(),
- request.getVolumeCapabilities(0).getAccessMode());
- Assert.assertTrue(request.getVolumeCapabilities(0).hasMount());
- Assert.assertEquals(2, request.getVolumeCapabilities(0)
- .getMount().getMountFlagsCount());
- Assert.assertTrue(request.getVolumeCapabilities(0)
- .getMount().getMountFlagsList().contains("mountFlag1"));
- Assert.assertTrue(request.getVolumeCapabilities(0)
- .getMount().getMountFlagsList().contains("mountFlag2"));
- Assert.assertEquals(2, request.getVolumeAttributesCount());
- Assert.assertEquals("v1", request.getVolumeAttributesMap().get("k1"));
- Assert.assertEquals("v2", request.getVolumeAttributesMap().get("k2"));
+ .newBuilder().setModeValue(5).build().getMode().name(),
+ request.getVolumeCapabilities().get(0).getAccessMode().name());
+ Assert.assertEquals(2, request.getVolumeCapabilities().get(0)
+ .getMountFlags().size());
+ Assert.assertTrue(request.getVolumeCapabilities().get(0)
+ .getMountFlags().contains("mountFlag1"));
+ Assert.assertTrue(request.getVolumeCapabilities().get(0)
+ .getMountFlags().contains("mountFlag2"));
+ Assert.assertEquals(2, request.getVolumeAttributes().size());
+ Assert.assertEquals("v1", request.getVolumeAttributes().get("k1"));
+ Assert.assertEquals("v2", request.getVolumeAttributes().get("k2"));
// return a fake result
- return Csi.ValidateVolumeCapabilitiesResponse.newBuilder()
- .setSupported(false)
- .setMessage("this is a test")
- .build();
+ return ValidateVolumeCapabilitiesResponse
+ .newInstance(false, "this is a test");
}
- });
+ };
+
+ CsiAdaptorProtocolService service =
+ new CsiAdaptorProtocolService(plugin);
+ service.init(conf);
+ service.start();
try (CsiAdaptorProtocolPBClientImpl client =
new CsiAdaptorProtocolPBClientImpl(1L, address, new Configuration())) {
@@ -161,50 +202,53 @@ public class TestCsiAdaptorService {
address);
conf.set(
YarnConfiguration.NM_CSI_DRIVER_PREFIX + "test-driver.endpoint",
- "unix:///tmp/test-driver.scok");
- CsiAdaptorProtocolService service =
- new CsiAdaptorProtocolService("test-driver", domainSocket);
- service.init(conf);
- service.start();
+ "unix:///tmp/test-driver.sock");
- // inject a fake CSI client
+ // inject a fake CSI adaptor
// this client validates if the ValidateVolumeCapabilitiesRequest
// is integrity, and then reply a fake response
- service.setCsiClient(new ICsiClientTest() {
+ FakeCsiAdaptor plugin = new FakeCsiAdaptor() {
@Override
- public Csi.GetPluginInfoResponse getPluginInfo() {
- return Csi.GetPluginInfoResponse.newBuilder()
- .setName("test-plugin")
- .setVendorVersion("0.1")
- .build();
+ public String getDriverName() {
+ return "test-driver";
}
@Override
- public Csi.ValidateVolumeCapabilitiesResponse validateVolumeCapabilities(
- Csi.ValidateVolumeCapabilitiesRequest request) {
+ public GetPluginInfoResponse getPluginInfo(
+ GetPluginInfoRequest request) throws YarnException, IOException {
+ return GetPluginInfoResponse.newInstance("test-plugin", "0.1");
+ }
+
+ @Override
+ public ValidateVolumeCapabilitiesResponse validateVolumeCapacity(
+ ValidateVolumeCapabilitiesRequest request)
+ throws YarnException, IOException {
// validate we get all info from the request
Assert.assertEquals("volume-id-0000123", request.getVolumeId());
- Assert.assertEquals(1, request.getVolumeCapabilitiesCount());
- Assert.assertEquals(Csi.VolumeCapability.AccessMode
- .newBuilder().setModeValue(5).build(),
- request.getVolumeCapabilities(0).getAccessMode());
- Assert.assertTrue(request.getVolumeCapabilities(0).hasMount());
- Assert.assertEquals(2, request.getVolumeCapabilities(0)
- .getMount().getMountFlagsCount());
- Assert.assertTrue(request.getVolumeCapabilities(0)
- .getMount().getMountFlagsList().contains("mountFlag1"));
- Assert.assertTrue(request.getVolumeCapabilities(0)
- .getMount().getMountFlagsList().contains("mountFlag2"));
- Assert.assertEquals(2, request.getVolumeAttributesCount());
- Assert.assertEquals("v1", request.getVolumeAttributesMap().get("k1"));
- Assert.assertEquals("v2", request.getVolumeAttributesMap().get("k2"));
+ Assert.assertEquals(1, request.getVolumeCapabilities().size());
+ Assert.assertEquals(
+ Csi.VolumeCapability.AccessMode.newBuilder().setModeValue(5)
+ .build().getMode().name(),
+ request.getVolumeCapabilities().get(0).getAccessMode().name());
+ Assert.assertEquals(2,
+ request.getVolumeCapabilities().get(0).getMountFlags().size());
+ Assert.assertTrue(request.getVolumeCapabilities().get(0).getMountFlags()
+ .contains("mountFlag1"));
+ Assert.assertTrue(request.getVolumeCapabilities().get(0).getMountFlags()
+ .contains("mountFlag2"));
+ Assert.assertEquals(2, request.getVolumeAttributes().size());
+ Assert.assertEquals("v1", request.getVolumeAttributes().get("k1"));
+ Assert.assertEquals("v2", request.getVolumeAttributes().get("k2"));
// return a fake result
- return Csi.ValidateVolumeCapabilitiesResponse.newBuilder()
- .setSupported(false)
- .setMessage("this is a test")
- .build();
+ return ValidateVolumeCapabilitiesResponse
+ .newInstance(false, "this is a test");
}
- });
+ };
+
+ CsiAdaptorProtocolService service =
+ new CsiAdaptorProtocolService(plugin);
+ service.init(conf);
+ service.start();
YarnRPC rpc = YarnRPC.create(conf);
UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
@@ -232,7 +276,7 @@ public class TestCsiAdaptorService {
public void testMissingConfiguration() {
Configuration conf = new Configuration();
CsiAdaptorProtocolService service =
- new CsiAdaptorProtocolService("test-driver", domainSocket);
+ new CsiAdaptorProtocolService(new FakeCsiAdaptor() {});
service.init(conf);
}
@@ -243,7 +287,7 @@ public class TestCsiAdaptorService {
+ "test-driver-0001.address",
"0.0.0.0:-100"); // this is an invalid address
CsiAdaptorProtocolService service =
- new CsiAdaptorProtocolService("test-driver-0001", domainSocket);
+ new CsiAdaptorProtocolService(new FakeCsiAdaptor() {});
service.init(conf);
}
@@ -254,7 +298,151 @@ public class TestCsiAdaptorService {
+ "test-driver-0001.address",
"192.0.1:8999"); // this is an invalid ip address
CsiAdaptorProtocolService service =
- new CsiAdaptorProtocolService("test-driver-0001", domainSocket);
+ new CsiAdaptorProtocolService(new FakeCsiAdaptor() {});
service.init(conf);
}
+
+ @Test
+ public void testCustomizedAdaptor() throws IOException, YarnException {
+ ServerSocket ss = new ServerSocket(0);
+ ss.close();
+ InetSocketAddress address = new InetSocketAddress(ss.getLocalPort());
+ Configuration conf = new Configuration();
+ conf.set(YarnConfiguration.NM_CSI_DRIVER_NAMES, "customized-driver");
+ conf.setSocketAddr(
+ YarnConfiguration.NM_CSI_ADAPTOR_PREFIX + "customized-driver.address",
+ address);
+ conf.set(
+ YarnConfiguration.NM_CSI_ADAPTOR_PREFIX + "customized-driver.class",
+ "org.apache.hadoop.yarn.csi.adaptor.MockCsiAdaptor");
+ conf.set(
+ YarnConfiguration.NM_CSI_DRIVER_PREFIX + "customized-driver.endpoint",
+ "unix:///tmp/customized-driver.sock");
+
+ CsiAdaptorServices services =
+ new CsiAdaptorServices();
+ services.init(conf);
+ services.start();
+
+ YarnRPC rpc = YarnRPC.create(conf);
+ UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
+ CsiAdaptorProtocol adaptorClient = NMProxy
+ .createNMProxy(conf, CsiAdaptorProtocol.class, currentUser, rpc,
+ NetUtils.createSocketAddrForHost("localhost", ss.getLocalPort()));
+
+ // Test getPluginInfo
+ GetPluginInfoResponse pluginInfo =
+ adaptorClient.getPluginInfo(GetPluginInfoRequest.newInstance());
+ Assert.assertEquals(pluginInfo.getDriverName(), "customized-driver");
+ Assert.assertEquals(pluginInfo.getVersion(), "1.0");
+
+ // Test validateVolumeCapacity
+ ValidateVolumeCapabilitiesRequest request =
+ ValidateVolumeCapabilitiesRequestPBImpl
+ .newInstance("volume-id-0000123",
+ ImmutableList.of(new ValidateVolumeCapabilitiesRequest
+ .VolumeCapability(
+ MULTI_NODE_MULTI_WRITER, FILE_SYSTEM,
+ ImmutableList.of("mountFlag1", "mountFlag2"))),
+ ImmutableMap.of("k1", "v1", "k2", "v2"));
+
+ ValidateVolumeCapabilitiesResponse response = adaptorClient
+ .validateVolumeCapacity(request);
+ Assert.assertEquals(true, response.isSupported());
+ Assert.assertEquals("verified via MockCsiAdaptor",
+ response.getResponseMessage());
+
+ services.stop();
+ }
+
+ @Test
+ public void testMultipleCsiAdaptors() throws IOException, YarnException {
+ ServerSocket driver1Addr = new ServerSocket(0);
+ ServerSocket driver2Addr = new ServerSocket(0);
+
+ InetSocketAddress address1 =
+ new InetSocketAddress(driver1Addr.getLocalPort());
+ InetSocketAddress address2 =
+ new InetSocketAddress(driver2Addr.getLocalPort());
+
+ Configuration conf = new Configuration();
+
+ // Two csi-drivers configured
+ conf.set(YarnConfiguration.NM_CSI_DRIVER_NAMES,
+ "customized-driver-1,customized-driver-2");
+
+ // customized-driver-1
+ conf.setSocketAddr(YarnConfiguration.NM_CSI_ADAPTOR_PREFIX
+ + "customized-driver-1.address", address1);
+ conf.set(YarnConfiguration.NM_CSI_ADAPTOR_PREFIX
+ + "customized-driver-1.class",
+ "org.apache.hadoop.yarn.csi.adaptor.MockCsiAdaptor");
+ conf.set(YarnConfiguration.NM_CSI_DRIVER_PREFIX
+ + "customized-driver-1.endpoint",
+ "unix:///tmp/customized-driver-1.sock");
+
+ // customized-driver-2
+ conf.setSocketAddr(YarnConfiguration.NM_CSI_ADAPTOR_PREFIX
+ + "customized-driver-2.address", address2);
+ conf.set(YarnConfiguration.NM_CSI_ADAPTOR_PREFIX
+ + "customized-driver-2.class",
+ "org.apache.hadoop.yarn.csi.adaptor.MockCsiAdaptor");
+ conf.set(YarnConfiguration.NM_CSI_DRIVER_PREFIX
+ + "customized-driver-2.endpoint",
+ "unix:///tmp/customized-driver-2.sock");
+
+ driver1Addr.close();
+ driver2Addr.close();
+
+ CsiAdaptorServices services =
+ new CsiAdaptorServices();
+ services.init(conf);
+ services.start();
+
+ YarnRPC rpc = YarnRPC.create(conf);
+ UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
+ CsiAdaptorProtocol client1 = NMProxy
+ .createNMProxy(conf, CsiAdaptorProtocol.class, currentUser, rpc,
+ NetUtils.createSocketAddrForHost("localhost",
+ driver1Addr.getLocalPort()));
+
+ // ***************************************************
+ // Verify talking with customized-driver-1
+ // ***************************************************
+ // Test getPluginInfo
+ GetPluginInfoResponse pluginInfo =
+ client1.getPluginInfo(GetPluginInfoRequest.newInstance());
+ Assert.assertEquals(pluginInfo.getDriverName(), "customized-driver-1");
+ Assert.assertEquals(pluginInfo.getVersion(), "1.0");
+
+ // Test validateVolumeCapacity
+ ValidateVolumeCapabilitiesRequest request =
+ ValidateVolumeCapabilitiesRequestPBImpl
+ .newInstance("driver-1-volume-00001",
+ ImmutableList.of(new ValidateVolumeCapabilitiesRequest
+ .VolumeCapability(
+ MULTI_NODE_MULTI_WRITER, FILE_SYSTEM,
+ ImmutableList.of())), ImmutableMap.of());
+
+ ValidateVolumeCapabilitiesResponse response = client1
+ .validateVolumeCapacity(request);
+ Assert.assertEquals(true, response.isSupported());
+ Assert.assertEquals("verified via MockCsiAdaptor",
+ response.getResponseMessage());
+
+
+ // ***************************************************
+ // Verify talking with customized-driver-2
+ // ***************************************************
+ CsiAdaptorProtocol client2 = NMProxy
+ .createNMProxy(conf, CsiAdaptorProtocol.class, currentUser, rpc,
+ NetUtils.createSocketAddrForHost("localhost",
+ driver2Addr.getLocalPort()));
+ GetPluginInfoResponse pluginInfo2 =
+ client2.getPluginInfo(GetPluginInfoRequest.newInstance());
+ Assert.assertEquals(pluginInfo2.getDriverName(), "customized-driver-2");
+ Assert.assertEquals(pluginInfo2.getVersion(), "1.0");
+
+ services.stop();
+ }
}