diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/CsiAdaptorProtocol.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/CsiAdaptorProtocol.java index 0822163dc85..4e064eb2196 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/CsiAdaptorProtocol.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/CsiAdaptorProtocol.java @@ -19,6 +19,10 @@ package org.apache.hadoop.yarn.api; import org.apache.hadoop.yarn.api.protocolrecords.GetPluginInfoRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetPluginInfoResponse; +import org.apache.hadoop.yarn.api.protocolrecords.NodePublishVolumeRequest; +import org.apache.hadoop.yarn.api.protocolrecords.NodePublishVolumeResponse; +import org.apache.hadoop.yarn.api.protocolrecords.NodeUnpublishVolumeRequest; +import org.apache.hadoop.yarn.api.protocolrecords.NodeUnpublishVolumeResponse; import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest; import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesResponse; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -30,10 +34,50 @@ import java.io.IOException; */ public interface CsiAdaptorProtocol { + /** + * Get plugin info from the CSI driver. The driver usually returns + * the name of the driver and its version. + * @param request get plugin info request. + * @return response that contains driver name and its version. + * @throws YarnException + * @throws IOException + */ GetPluginInfoResponse getPluginInfo(GetPluginInfoRequest request) throws YarnException, IOException; + /** + * Validate if the volume capacity can be satisfied on the underneath + * storage system. This method responses if the capacity can be satisfied + * or not, with a detailed message. + * @param request validate volume capability request. + * @return validation response. + * @throws YarnException + * @throws IOException + */ ValidateVolumeCapabilitiesResponse validateVolumeCapacity( ValidateVolumeCapabilitiesRequest request) throws YarnException, IOException; + + /** + * Publish the volume on a node manager, the volume will be mounted + * to the local file system and become visible for clients. + * @param request publish volume request. + * @return publish volume response. + * @throws YarnException + * @throws IOException + */ + NodePublishVolumeResponse nodePublishVolume( + NodePublishVolumeRequest request) throws YarnException, IOException; + + /** + * This is a reverse operation of + * {@link #nodePublishVolume(NodePublishVolumeRequest)}, it un-mounts the + * volume from given node. + * @param request un-publish volume request. + * @return un-publish volume response. + * @throws YarnException + * @throws IOException + */ + NodeUnpublishVolumeResponse nodeUnpublishVolume( + NodeUnpublishVolumeRequest request) throws YarnException, IOException; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/NodePublishVolumeRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/NodePublishVolumeRequest.java new file mode 100644 index 00000000000..43c605fc68d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/NodePublishVolumeRequest.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.api.protocolrecords; + +import com.google.gson.JsonObject; +import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest.VolumeCapability; +import org.apache.hadoop.yarn.util.Records; + +import java.util.Map; + +/** + * The request sent by node manager to CSI driver adaptor + * to publish a volume on a node. + */ +public abstract class NodePublishVolumeRequest { + + public static NodePublishVolumeRequest newInstance(String volumeId, + boolean readOnly, String targetPath, String stagingPath, + VolumeCapability capability, + Map publishContext, + Map secrets) { + NodePublishVolumeRequest request = + Records.newRecord(NodePublishVolumeRequest.class); + request.setVolumeId(volumeId); + request.setReadonly(readOnly); + request.setTargetPath(targetPath); + request.setStagingPath(stagingPath); + request.setVolumeCapability(capability); + request.setPublishContext(publishContext); + request.setSecrets(secrets); + return request; + } + + public abstract void setVolumeId(String volumeId); + + public abstract String getVolumeId(); + + public abstract void setReadonly(boolean readonly); + + public abstract boolean getReadOnly(); + + public abstract void setTargetPath(String targetPath); + + public abstract String getTargetPath(); + + public abstract void setStagingPath(String stagingPath); + + public abstract String getStagingPath(); + + public abstract void setVolumeCapability(VolumeCapability capability); + + public abstract VolumeCapability getVolumeCapability(); + + public abstract void setPublishContext(Map publishContext); + + public abstract Map getPublishContext(); + + public abstract void setSecrets(Map secrets); + + public abstract Map getSecrets(); + + public String toString() { + JsonObject jsonObject = new JsonObject(); + jsonObject.addProperty("VolumeId", getVolumeId()); + jsonObject.addProperty("ReadOnly", getReadOnly()); + jsonObject.addProperty("TargetPath", getTargetPath()); + jsonObject.addProperty("StagingPath", getStagingPath()); + if (getVolumeCapability() != null) { + JsonObject jsonCap = new JsonObject(); + jsonCap.addProperty("AccessMode", + getVolumeCapability().getAccessMode().name()); + jsonCap.addProperty("VolumeType", + getVolumeCapability().getVolumeType().name()); + jsonObject.addProperty("VolumeCapability", + jsonCap.toString()); + } + return jsonObject.toString(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/NodePublishVolumeResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/NodePublishVolumeResponse.java new file mode 100644 index 00000000000..c2377aa4283 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/NodePublishVolumeResponse.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.api.protocolrecords; + +import org.apache.hadoop.yarn.util.Records; + +/** + * The response sent by a CSI driver adaptor to the node manager + * after publishing a volume on the node. + */ +public abstract class NodePublishVolumeResponse { + + public static NodePublishVolumeResponse newInstance() { + return Records.newRecord(NodePublishVolumeResponse.class); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/NodeUnpublishVolumeRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/NodeUnpublishVolumeRequest.java new file mode 100644 index 00000000000..48c16e2059d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/NodeUnpublishVolumeRequest.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.api.protocolrecords; + +import org.apache.hadoop.yarn.util.Records; + +/** + * The request sent by node manager to CSI driver adaptor + * to un-publish a volume on a node. + */ +public abstract class NodeUnpublishVolumeRequest { + + public static NodeUnpublishVolumeRequest newInstance(String volumeId, + String targetPath) { + NodeUnpublishVolumeRequest request = + Records.newRecord(NodeUnpublishVolumeRequest.class); + request.setVolumeId(volumeId); + request.setTargetPath(targetPath); + return request; + } + + public abstract void setVolumeId(String volumeId); + + public abstract void setTargetPath(String targetPath); + + public abstract String getVolumeId(); + + public abstract String getTargetPath(); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/NodeUnpublishVolumeResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/NodeUnpublishVolumeResponse.java new file mode 100644 index 00000000000..1b339c0c35b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/NodeUnpublishVolumeResponse.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.api.protocolrecords; + +import org.apache.hadoop.yarn.util.Records; + +/** + * The response sent by a CSI driver adaptor to the node manager + * after un-publishing a volume on the node. + */ +public class NodeUnpublishVolumeResponse { + + public static NodeUnpublishVolumeResponse newInstance() { + return Records.newRecord(NodeUnpublishVolumeResponse.class); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceInformation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceInformation.java index 047c09ac8b2..4209ca78ce6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceInformation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceInformation.java @@ -276,10 +276,10 @@ public class ResourceInformation implements Comparable { } public static ResourceInformation newInstance(String name, String units, - long value, Map attributes) { + long value, Set tags, Map attributes) { return ResourceInformation .newInstance(name, units, value, ResourceTypes.COUNTABLE, 0L, - Long.MAX_VALUE, null, attributes); + Long.MAX_VALUE, tags, attributes); } public static ResourceInformation newInstance(String name, String units, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 3bc9957020f..511edefd3d1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -3443,13 +3443,28 @@ public class YarnConfiguration extends Configuration { // CSI Volume configs //////////////////////////////// /** - * One or more socket addresses for csi-adaptor. - * Multiple addresses are delimited by ",". + * TERMS: + * csi-driver: a 3rd party CSI driver which implements the CSI protocol. + * It is provided by the storage system. + * csi-driver-adaptor: this is an internal RPC service working + * as a bridge between YARN and a csi-driver. */ public static final String NM_CSI_ADAPTOR_PREFIX = NM_PREFIX + "csi-driver-adaptor."; + public static final String NM_CSI_DRIVER_PREFIX = + NM_PREFIX + "csi-driver."; + public static final String NM_CSI_DRIVER_ENDPOINT_SUFFIX = + ".endpoint"; + public static final String NM_CSI_ADAPTOR_ADDRESS_SUFFIX = + ".address"; + /** + * One or more socket addresses for csi-adaptor. + * Multiple addresses are delimited by ",". + */ public static final String NM_CSI_ADAPTOR_ADDRESSES = NM_CSI_ADAPTOR_PREFIX + "addresses"; + public static final String NM_CSI_DRIVER_NAMES = + NM_CSI_DRIVER_PREFIX + "names"; //////////////////////////////// // Other Configs diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/utils/ConfigUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/csi/CsiConfigUtils.java similarity index 67% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/utils/ConfigUtils.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/csi/CsiConfigUtils.java index 77e69551806..e1177053fdf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/utils/ConfigUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/csi/CsiConfigUtils.java @@ -15,8 +15,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.yarn.csi.utils; +package org.apache.hadoop.yarn.util.csi; +import com.google.common.base.Strings; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -24,13 +25,30 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import java.net.InetSocketAddress; /** - * Utility class to load configurations. + * Utility class for CSI in the API level. */ -public final class ConfigUtils { +public final class CsiConfigUtils { - private ConfigUtils() { + private CsiConfigUtils() { // Hide constructor for utility class. } + + public static String[] getCsiDriverNames(Configuration conf) { + return conf.getStrings(YarnConfiguration.NM_CSI_DRIVER_NAMES); + } + + public static String getCsiDriverEndpoint(String driverName, + Configuration conf) throws YarnException { + String driverEndpointProperty = YarnConfiguration.NM_CSI_DRIVER_PREFIX + + driverName + YarnConfiguration.NM_CSI_DRIVER_ENDPOINT_SUFFIX; + String driverEndpoint = conf.get(driverEndpointProperty); + if (Strings.isNullOrEmpty(driverEndpoint)) { + throw new YarnException("CSI driver's endpoint is not specified or" + + " invalid, property "+ driverEndpointProperty + " is not defined"); + } + return driverEndpoint; + } + /** * Resolve the CSI adaptor address for a CSI driver from configuration. * Expected configuration property name is @@ -43,7 +61,7 @@ public final class ConfigUtils { public static InetSocketAddress getCsiAdaptorAddressForDriver( String driverName, Configuration conf) throws YarnException { String configName = YarnConfiguration.NM_CSI_ADAPTOR_PREFIX - + driverName + ".address"; + + driverName + YarnConfiguration.NM_CSI_ADAPTOR_ADDRESS_SUFFIX; String errorMessage = "Failed to load CSI adaptor address for driver " + driverName + ", configuration property " + configName + " is not defined or invalid."; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/csi/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/csi/package-info.java new file mode 100644 index 00000000000..18b70573fdb --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/csi/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Package that includes some CSI utility classes. + */ +package org.apache.hadoop.yarn.util.csi; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/YarnCsiAdaptor.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/YarnCsiAdaptor.proto index 9dcb8a73551..146f5bfba31 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/YarnCsiAdaptor.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/YarnCsiAdaptor.proto @@ -31,4 +31,10 @@ service CsiAdaptorProtocolService { rpc validateVolumeCapacity (ValidateVolumeCapabilitiesRequest) returns (ValidateVolumeCapabilitiesResponse); + + rpc nodePublishVolume (NodePublishVolumeRequest) + returns (NodePublishVolumeResponse); + + rpc nodeUnpublishVolume (NodeUnpublishVolumeRequest) + returns (NodeUnpublishVolumeResponse); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_csi_adaptor.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_csi_adaptor.proto index c9adbea7839..9b645e1a9e1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_csi_adaptor.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_csi_adaptor.proto @@ -66,4 +66,28 @@ message GetPluginInfoRequest { message GetPluginInfoResponse { required string name = 1; required string vendor_version = 2; +} + +message NodePublishVolumeRequest { + required string volume_id = 1; + repeated StringStringMapProto publish_context = 2; + optional string staging_target_path = 3; + required string target_path = 4; + required VolumeCapability volume_capability = 5; + required bool readonly = 6; + repeated StringStringMapProto secrets = 7; + repeated StringStringMapProto volume_context = 8; +} + +message NodePublishVolumeResponse { + // Intentionally empty. +} + +message NodeUnpublishVolumeRequest { + required string volume_id = 1; + required string target_path = 2; +} + +message NodeUnpublishVolumeResponse { + // Intentionally empty. } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ResourceInformation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ResourceInformation.java index e466ce7ea57..f8681768c50 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ResourceInformation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ResourceInformation.java @@ -19,12 +19,14 @@ package org.apache.hadoop.yarn.service.api.records; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; import com.google.gson.annotations.SerializedName; import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModelProperty; import java.util.Map; import java.util.Objects; +import java.util.Set; /** * ResourceInformation determines unit/name/value of resource types in addition to memory and vcores. It will be part of Resource object @@ -40,11 +42,25 @@ public class ResourceInformation { @SerializedName("attributes") private Map attributes = null; + @SerializedName("tags") + private Set tags = null; + public ResourceInformation value(Long value) { this.value = value; return this; } + public ResourceInformation tags(Set resourceTags) { + this.tags = resourceTags; + return this; + } + + @ApiModelProperty(value = "") + @JsonProperty("tags") + public Set getTags() { + return tags == null ? ImmutableSet.of() : tags; + } + @ApiModelProperty(value = "") @JsonProperty("attributes") public Map getAttributes() { @@ -116,6 +132,7 @@ public class ResourceInformation { sb.append(" unit: ").append(toIndentedString(unit)).append("\n"); sb.append(" attributes: ").append(toIndentedString(attributes)) .append("\n"); + sb.append(" tags: ").append(toIndentedString(tags)).append("\n"); sb.append("}"); return sb.toString(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java index 9895fba82bf..f885b25d4da 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java @@ -755,6 +755,7 @@ public class Component implements EventHandler { entry.getKey(), specInfo.getUnit(), specInfo.getValue(), + specInfo.getTags(), specInfo.getAttributes()); resource.setResourceInformation(resourceName, ri); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/conf/TestAppJsonResolve.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/conf/TestAppJsonResolve.java index 25c502f6494..04c84dc1a2b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/conf/TestAppJsonResolve.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/conf/TestAppJsonResolve.java @@ -231,5 +231,6 @@ public class TestAppJsonResolve extends Assert { Assert.assertEquals("yarn.io/csi-volume", volume.getKey()); Assert.assertEquals(100L, volume.getValue().getValue().longValue()); Assert.assertEquals(2, volume.getValue().getAttributes().size()); + Assert.assertEquals(1, volume.getValue().getTags().size()); } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/resources/org/apache/hadoop/yarn/service/conf/examples/external3.json b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/resources/org/apache/hadoop/yarn/service/conf/examples/external3.json index 74569bd0ba1..ef8e3238cc6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/resources/org/apache/hadoop/yarn/service/conf/examples/external3.json +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/resources/org/apache/hadoop/yarn/service/conf/examples/external3.json @@ -14,6 +14,7 @@ "yarn.io/csi-volume": { "value": 100, "unit": "Gi", + "tags": ["sample-tag"], "attributes" : { "driver" : "hostpath", "mountPath" : "/mnt/data" diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/CsiAdaptorProtocolPBClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/CsiAdaptorProtocolPBClientImpl.java index 2e10f720468..a43d087a826 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/CsiAdaptorProtocolPBClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/CsiAdaptorProtocolPBClientImpl.java @@ -25,10 +25,18 @@ import org.apache.hadoop.yarn.api.CsiAdaptorPB; import org.apache.hadoop.yarn.api.CsiAdaptorProtocol; import org.apache.hadoop.yarn.api.protocolrecords.GetPluginInfoRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetPluginInfoResponse; +import org.apache.hadoop.yarn.api.protocolrecords.NodePublishVolumeRequest; +import org.apache.hadoop.yarn.api.protocolrecords.NodePublishVolumeResponse; +import org.apache.hadoop.yarn.api.protocolrecords.NodeUnpublishVolumeRequest; +import org.apache.hadoop.yarn.api.protocolrecords.NodeUnpublishVolumeResponse; import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest; import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesResponse; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetPluginInfoRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetPluginInfoResponsePBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.NodePublishVolumeRequestPBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.NodePublishVolumeResponsePBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.NodeUnpublishVolumeRequestPBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.NodeUnpublishVolumeResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ValidateVolumeCapabilitiesRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ValidateVolumeCapabilitiesResponsePBImpl; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -82,6 +90,34 @@ public class CsiAdaptorProtocolPBClientImpl } } + @Override + public NodePublishVolumeResponse nodePublishVolume( + NodePublishVolumeRequest request) throws IOException, YarnException { + CsiAdaptorProtos.NodePublishVolumeRequest requestProto = + ((NodePublishVolumeRequestPBImpl) request).getProto(); + try { + return new NodePublishVolumeResponsePBImpl( + proxy.nodePublishVolume(null, requestProto)); + } catch (ServiceException e) { + RPCUtil.unwrapAndThrowException(e); + return null; + } + } + + @Override + public NodeUnpublishVolumeResponse nodeUnpublishVolume( + NodeUnpublishVolumeRequest request) throws YarnException, IOException { + CsiAdaptorProtos.NodeUnpublishVolumeRequest requestProto = + ((NodeUnpublishVolumeRequestPBImpl) request).getProto(); + try { + return new NodeUnpublishVolumeResponsePBImpl( + proxy.nodeUnpublishVolume(null, requestProto)); + } catch (ServiceException e) { + RPCUtil.unwrapAndThrowException(e); + return null; + } + } + @Override public void close() throws IOException { if(this.proxy != null) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/CsiAdaptorProtocolPBServiceImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/CsiAdaptorProtocolPBServiceImpl.java index 9a194351e56..624ad3730ef 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/CsiAdaptorProtocolPBServiceImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/CsiAdaptorProtocolPBServiceImpl.java @@ -23,9 +23,15 @@ import org.apache.hadoop.yarn.api.CsiAdaptorPB; import org.apache.hadoop.yarn.api.CsiAdaptorProtocol; import org.apache.hadoop.yarn.api.protocolrecords.GetPluginInfoRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetPluginInfoResponse; +import org.apache.hadoop.yarn.api.protocolrecords.NodePublishVolumeResponse; +import org.apache.hadoop.yarn.api.protocolrecords.NodeUnpublishVolumeResponse; import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesResponse; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetPluginInfoRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetPluginInfoResponsePBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.NodePublishVolumeRequestPBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.NodePublishVolumeResponsePBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.NodeUnpublishVolumeRequestPBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.NodeUnpublishVolumeResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ValidateVolumeCapabilitiesRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ValidateVolumeCapabilitiesResponsePBImpl; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -72,4 +78,34 @@ public class CsiAdaptorProtocolPBServiceImpl implements CsiAdaptorPB { throw new ServiceException(e); } } + + @Override + public CsiAdaptorProtos.NodePublishVolumeResponse nodePublishVolume( + RpcController controller, + CsiAdaptorProtos.NodePublishVolumeRequest request) + throws ServiceException { + try { + NodePublishVolumeRequestPBImpl req = + new NodePublishVolumeRequestPBImpl(request); + NodePublishVolumeResponse response = real.nodePublishVolume(req); + return ((NodePublishVolumeResponsePBImpl) response).getProto(); + } catch (YarnException | IOException e) { + throw new ServiceException(e); + } + } + + @Override + public CsiAdaptorProtos.NodeUnpublishVolumeResponse nodeUnpublishVolume( + RpcController controller, + CsiAdaptorProtos.NodeUnpublishVolumeRequest request) + throws ServiceException { + try { + NodeUnpublishVolumeRequestPBImpl req = + new NodeUnpublishVolumeRequestPBImpl(request); + NodeUnpublishVolumeResponse response = real.nodeUnpublishVolume(req); + return ((NodeUnpublishVolumeResponsePBImpl) response).getProto(); + } catch (YarnException | IOException e) { + throw new ServiceException(e); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/NodePublishVolumeRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/NodePublishVolumeRequestPBImpl.java new file mode 100644 index 00000000000..c3590230c53 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/NodePublishVolumeRequestPBImpl.java @@ -0,0 +1,201 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.api.protocolrecords.impl.pb; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; +import com.google.protobuf.TextFormat; +import org.apache.hadoop.yarn.api.protocolrecords.NodePublishVolumeRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest.VolumeCapability; +import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; +import org.apache.hadoop.yarn.proto.CsiAdaptorProtos; +import org.apache.hadoop.yarn.proto.YarnProtos; + +import java.util.Map; + +/** + * Request to publish volume on node manager. + */ +public class NodePublishVolumeRequestPBImpl extends + NodePublishVolumeRequest { + + private CsiAdaptorProtos.NodePublishVolumeRequest.Builder builder; + + public NodePublishVolumeRequestPBImpl() { + this.builder = CsiAdaptorProtos.NodePublishVolumeRequest.newBuilder(); + } + + public NodePublishVolumeRequestPBImpl( + CsiAdaptorProtos.NodePublishVolumeRequest request) { + this.builder = request.toBuilder(); + } + + public CsiAdaptorProtos.NodePublishVolumeRequest getProto() { + Preconditions.checkNotNull(builder); + return builder.build(); + } + + @Override + public void setVolumeId(String volumeId) { + Preconditions.checkNotNull(builder); + builder.setVolumeId(volumeId); + } + + @Override + public String getVolumeId() { + Preconditions.checkNotNull(builder); + return builder.getVolumeId(); + } + + @Override + public void setReadonly(boolean readonly) { + Preconditions.checkNotNull(builder); + builder.setReadonly(readonly); + } + + @Override + public boolean getReadOnly() { + Preconditions.checkNotNull(builder); + return builder.getReadonly(); + } + + @Override + public void setSecrets(Map secrets) { + if (secrets != null) { + Preconditions.checkNotNull(builder); + for(Map.Entry entry : secrets.entrySet()) { + YarnProtos.StringStringMapProto mapEntry = + YarnProtos.StringStringMapProto.newBuilder() + .setKey(entry.getKey()) + .setValue(entry.getValue()) + .build(); + builder.addSecrets(mapEntry); + } + } + } + + @Override + public Map getSecrets() { + Preconditions.checkNotNull(builder); + return builder.getSecretsCount() > 0 ? + ProtoUtils.convertStringStringMapProtoListToMap( + builder.getSecretsList()) : ImmutableMap.of(); + } + + @Override + public String getTargetPath() { + Preconditions.checkNotNull(builder); + return builder.getTargetPath(); + } + + @Override + public void setStagingPath(String stagingPath) { + Preconditions.checkNotNull(builder); + builder.setStagingTargetPath(stagingPath); + } + + @Override + public String getStagingPath() { + Preconditions.checkNotNull(builder); + return builder.getStagingTargetPath(); + } + + @Override + public void setPublishContext(Map publishContext) { + if (publishContext != null) { + Preconditions.checkNotNull(builder); + for(Map.Entry entry : publishContext.entrySet()) { + YarnProtos.StringStringMapProto mapEntry = + YarnProtos.StringStringMapProto.newBuilder() + .setKey(entry.getKey()) + .setValue(entry.getValue()) + .build(); + builder.addPublishContext(mapEntry); + } + } + } + + @Override + public Map getPublishContext() { + Preconditions.checkNotNull(builder); + return builder.getPublishContextCount() > 0 ? + ProtoUtils.convertStringStringMapProtoListToMap( + builder.getPublishContextList()) : ImmutableMap.of(); + } + + @Override + public void setTargetPath(String targetPath) { + if (targetPath != null) { + Preconditions.checkNotNull(builder); + builder.setTargetPath(targetPath); + } + } + + @Override + public void setVolumeCapability( + VolumeCapability capability) { + if (capability != null) { + CsiAdaptorProtos.VolumeCapability vc = + CsiAdaptorProtos.VolumeCapability.newBuilder() + .setAccessMode(CsiAdaptorProtos.VolumeCapability + .AccessMode.valueOf( + capability.getAccessMode().ordinal())) + .setVolumeType(CsiAdaptorProtos.VolumeCapability + .VolumeType.valueOf(capability.getVolumeType().ordinal())) + .addAllMountFlags(capability.getMountFlags()) + .build(); + builder.setVolumeCapability(vc); + } + } + + @Override + public VolumeCapability getVolumeCapability() { + CsiAdaptorProtos.VolumeCapability cap0 = builder.getVolumeCapability(); + if (builder.hasVolumeCapability()) { + return new VolumeCapability( + ValidateVolumeCapabilitiesRequest.AccessMode + .valueOf(cap0.getAccessMode().name()), + ValidateVolumeCapabilitiesRequest.VolumeType + .valueOf(cap0.getVolumeType().name()), + cap0.getMountFlagsList()); + } + return null; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) { + return false; + } + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/NodePublishVolumeResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/NodePublishVolumeResponsePBImpl.java new file mode 100644 index 00000000000..cbdf91fd50a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/NodePublishVolumeResponsePBImpl.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.api.protocolrecords.impl.pb; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.yarn.api.protocolrecords.NodePublishVolumeResponse; +import org.apache.hadoop.yarn.proto.CsiAdaptorProtos; + +/** + * Protobuf record class for node publish response. + */ +public class NodePublishVolumeResponsePBImpl + extends NodePublishVolumeResponse { + + private CsiAdaptorProtos.NodePublishVolumeResponse.Builder builder; + + public NodePublishVolumeResponsePBImpl( + CsiAdaptorProtos.NodePublishVolumeResponse proto) { + this.builder = proto.toBuilder(); + } + + public NodePublishVolumeResponsePBImpl() { + this.builder = CsiAdaptorProtos.NodePublishVolumeResponse + .newBuilder(); + } + + public CsiAdaptorProtos.NodePublishVolumeResponse getProto() { + Preconditions.checkNotNull(builder); + return builder.build(); + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) { + return false; + } + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/NodeUnpublishVolumeRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/NodeUnpublishVolumeRequestPBImpl.java new file mode 100644 index 00000000000..455b1f770be --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/NodeUnpublishVolumeRequestPBImpl.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.api.protocolrecords.impl.pb; + +import com.google.common.base.Preconditions; +import com.google.protobuf.TextFormat; +import org.apache.hadoop.yarn.api.protocolrecords.NodeUnpublishVolumeRequest; +import org.apache.hadoop.yarn.proto.CsiAdaptorProtos; + +/** + * The protobuf record class for request to un-publish volume on node manager. + */ +public class NodeUnpublishVolumeRequestPBImpl extends + NodeUnpublishVolumeRequest { + + private CsiAdaptorProtos.NodeUnpublishVolumeRequest.Builder builder; + + public NodeUnpublishVolumeRequestPBImpl() { + this.builder = CsiAdaptorProtos.NodeUnpublishVolumeRequest.newBuilder(); + } + + public NodeUnpublishVolumeRequestPBImpl( + CsiAdaptorProtos.NodeUnpublishVolumeRequest request) { + this.builder = request.toBuilder(); + } + + public CsiAdaptorProtos.NodeUnpublishVolumeRequest getProto() { + Preconditions.checkNotNull(builder); + return builder.build(); + } + + @Override + public void setVolumeId(String volumeId) { + Preconditions.checkNotNull(builder); + this.builder.setVolumeId(volumeId); + } + + @Override + public void setTargetPath(String targetPath) { + Preconditions.checkNotNull(builder); + this.builder.setTargetPath(targetPath); + } + + @Override + public String getVolumeId() { + return builder.getVolumeId(); + } + + @Override + public String getTargetPath() { + return builder.getTargetPath(); + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) { + return false; + } + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/NodeUnpublishVolumeResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/NodeUnpublishVolumeResponsePBImpl.java new file mode 100644 index 00000000000..8406e419e52 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/NodeUnpublishVolumeResponsePBImpl.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.api.protocolrecords.impl.pb; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.yarn.api.protocolrecords.NodeUnpublishVolumeResponse; +import org.apache.hadoop.yarn.proto.CsiAdaptorProtos; + +/** + * Response to the un-publish volume request on node manager. + */ +public class NodeUnpublishVolumeResponsePBImpl extends + NodeUnpublishVolumeResponse { + + private CsiAdaptorProtos.NodeUnpublishVolumeResponse.Builder builder; + + public NodeUnpublishVolumeResponsePBImpl() { + this.builder = CsiAdaptorProtos.NodeUnpublishVolumeResponse.newBuilder(); + } + + public NodeUnpublishVolumeResponsePBImpl( + CsiAdaptorProtos.NodeUnpublishVolumeResponse response) { + this.builder = response.toBuilder(); + } + + public CsiAdaptorProtos.NodeUnpublishVolumeResponse getProto() { + Preconditions.checkNotNull(builder); + return builder.build(); + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) { + return false; + } + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index ff64c90e8d2..a0e0eda74f4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -4083,4 +4083,19 @@ yarn.nodemanager.csi-driver-adaptor.addresses + + + + CSI driver names running on this node, multiple driver names need to + be delimited by comma. The driver name should be same value returned + by the getPluginInfo call. For each of the CSI driver name, it must + to define following two corresponding properties: + "yarn.nodemanager.csi-driver.${NAME}.endpoint" + "yarn.nodemanager.csi-driver-adaptor.${NAME}.address" + The 1st property defines where the driver's endpoint is; + 2nd property defines where the mapping csi-driver-adaptor's address is. + + yarn.nodemanager.csi-driver.names + + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/pom.xml index 1a19f0e573a..44c2607a8e0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/pom.xml @@ -180,6 +180,24 @@ csi.v0 + + org.codehaus.mojo + build-helper-maven-plugin + + + add-source + generate-sources + + add-source + + + + ${basedir}/target/generated-sources/protobuf/java + + + + + \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/adaptor/CsiAdaptorProtocolService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/adaptor/CsiAdaptorProtocolService.java index f94275f40e9..7020f061f99 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/adaptor/CsiAdaptorProtocolService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/adaptor/CsiAdaptorProtocolService.java @@ -21,29 +21,36 @@ import com.google.common.annotations.VisibleForTesting; import csi.v0.Csi; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ipc.Server; -import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.CsiAdaptorProtocol; import org.apache.hadoop.yarn.api.protocolrecords.GetPluginInfoRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetPluginInfoResponse; +import org.apache.hadoop.yarn.api.protocolrecords.NodePublishVolumeRequest; +import org.apache.hadoop.yarn.api.protocolrecords.NodePublishVolumeResponse; +import org.apache.hadoop.yarn.api.protocolrecords.NodeUnpublishVolumeRequest; +import org.apache.hadoop.yarn.api.protocolrecords.NodeUnpublishVolumeResponse; import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest; import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesResponse; import org.apache.hadoop.yarn.csi.client.CsiClient; import org.apache.hadoop.yarn.csi.client.CsiClientImpl; import org.apache.hadoop.yarn.csi.translator.ProtoTranslatorFactory; -import org.apache.hadoop.yarn.csi.utils.ConfigUtils; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext; +import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext; +import org.apache.hadoop.yarn.server.api.AuxiliaryService; +import org.apache.hadoop.yarn.util.csi.CsiConfigUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.InetSocketAddress; +import java.nio.ByteBuffer; /** * This is a Hadoop RPC server, we uses the Hadoop RPC framework here * because we need to stick to the security model current Hadoop supports. */ -public class CsiAdaptorProtocolService extends AbstractService +public class CsiAdaptorProtocolService extends AuxiliaryService implements CsiAdaptorProtocol { private static final Logger LOG = @@ -54,6 +61,12 @@ public class CsiAdaptorProtocolService extends AbstractService private CsiClient csiClient; private String csiDriverName; + public CsiAdaptorProtocolService() { + super(CsiAdaptorProtocolService.class.getName()); + // TODO read this from configuration + this.csiDriverName = "ch.ctrox.csi.s3-driver"; + } + public CsiAdaptorProtocolService(String driverName, String domainSocketPath) { super(CsiAdaptorProtocolService.class.getName()); @@ -68,7 +81,11 @@ public class CsiAdaptorProtocolService extends AbstractService @Override protected void serviceInit(Configuration conf) throws Exception { - adaptorServiceAddress = ConfigUtils + + String driverEndpoint = CsiConfigUtils + .getCsiDriverEndpoint(csiDriverName, conf); + this.csiClient = new CsiClientImpl(driverEndpoint); + adaptorServiceAddress = CsiConfigUtils .getCsiAdaptorAddressForDriver(csiDriverName, conf); super.serviceInit(conf); } @@ -119,4 +136,55 @@ public class CsiAdaptorProtocolService extends AbstractService Csi.ValidateVolumeCapabilitiesResponse.class) .convertFrom(response); } + + @Override + public NodePublishVolumeResponse nodePublishVolume( + NodePublishVolumeRequest request) throws YarnException, IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("Received nodePublishVolume call, request: {}", + request.toString()); + } + Csi.NodePublishVolumeRequest req = ProtoTranslatorFactory + .getTranslator(NodePublishVolumeRequest.class, + Csi.NodePublishVolumeRequest.class).convertTo(request); + if (LOG.isDebugEnabled()) { + LOG.debug("Translate to CSI proto message: {}", req.toString()); + } + csiClient.nodePublishVolume(req); + return NodePublishVolumeResponse.newInstance(); + } + + @Override + public NodeUnpublishVolumeResponse nodeUnpublishVolume( + NodeUnpublishVolumeRequest request) throws YarnException, IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("Received nodeUnpublishVolume call, request: {}", + request.toString()); + } + Csi.NodeUnpublishVolumeRequest req = ProtoTranslatorFactory + .getTranslator(NodeUnpublishVolumeRequest.class, + Csi.NodeUnpublishVolumeRequest.class).convertTo(request); + if (LOG.isDebugEnabled()) { + LOG.debug("Translate to CSI proto message: {}", req.toString()); + } + csiClient.nodeUnpublishVolume(req); + return NodeUnpublishVolumeResponse.newInstance(); + } + + @Override + public void initializeApplication( + ApplicationInitializationContext initAppContext) { + // do nothing + } + + @Override + public void stopApplication( + ApplicationTerminationContext stopAppContext) { + // do nothing + } + + @Override + public ByteBuffer getMetaData() { + return ByteBuffer.allocate(0); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/client/CsiClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/client/CsiClient.java index d31c0c9b75f..837b667a5e6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/client/CsiClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/client/CsiClient.java @@ -40,4 +40,10 @@ public interface CsiClient { Csi.ValidateVolumeCapabilitiesResponse validateVolumeCapabilities( Csi.ValidateVolumeCapabilitiesRequest request) throws IOException; + + Csi.NodePublishVolumeResponse nodePublishVolume( + Csi.NodePublishVolumeRequest request) throws IOException; + + Csi.NodeUnpublishVolumeResponse nodeUnpublishVolume( + Csi.NodeUnpublishVolumeRequest request) throws IOException; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/client/CsiClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/client/CsiClientImpl.java index 5b3d2e23c31..0a107e16b5e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/client/CsiClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/client/CsiClientImpl.java @@ -59,4 +59,24 @@ public class CsiClientImpl implements CsiClient { .validateVolumeCapabilities(request); } } + + @Override + public Csi.NodePublishVolumeResponse nodePublishVolume( + Csi.NodePublishVolumeRequest request) throws IOException { + try (CsiGrpcClient client = CsiGrpcClient.newBuilder() + .setDomainSocketAddress(address).build()) { + return client.createNodeBlockingStub() + .nodePublishVolume(request); + } + } + + @Override + public Csi.NodeUnpublishVolumeResponse nodeUnpublishVolume( + Csi.NodeUnpublishVolumeRequest request) throws IOException { + try (CsiGrpcClient client = CsiGrpcClient.newBuilder() + .setDomainSocketAddress(address).build()) { + return client.createNodeBlockingStub() + .nodeUnpublishVolume(request); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/translator/GetPluginInfoResponseProtoTranslator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/translator/GetPluginInfoResponseProtoTranslator.java index c4f042ebc18..bcf634addcb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/translator/GetPluginInfoResponseProtoTranslator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/translator/GetPluginInfoResponseProtoTranslator.java @@ -25,7 +25,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException; * Protobuf message translator for GetPluginInfoResponse and * Csi.GetPluginInfoResponse. */ -public class GetPluginInfoResponseProtoTranslator implements +public class GetPluginInfoResponseProtoTranslator implements ProtoTranslator { @Override public Csi.GetPluginInfoResponse convertTo( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/translator/NodePublishVolumeRequestProtoTranslator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/translator/NodePublishVolumeRequestProtoTranslator.java new file mode 100644 index 00000000000..e86dd3fcc9f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/translator/NodePublishVolumeRequestProtoTranslator.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.csi.translator; + +import csi.v0.Csi; +import org.apache.hadoop.yarn.api.protocolrecords.NodePublishVolumeRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest; +import org.apache.hadoop.yarn.exceptions.YarnException; + +/** + * This class helps to transform a YARN side NodePublishVolumeRequest + * to corresponding CSI protocol message. + * @param YARN NodePublishVolumeRequest + * @param CSI NodePublishVolumeRequest + */ +public class NodePublishVolumeRequestProtoTranslator implements + ProtoTranslator { + + @Override + public Csi.NodePublishVolumeRequest convertTo( + NodePublishVolumeRequest messageA) throws YarnException { + Csi.NodePublishVolumeRequest.Builder builder = + Csi.NodePublishVolumeRequest.newBuilder(); + ValidateVolumeCapabilitiesRequest.VolumeCapability cap = + messageA.getVolumeCapability(); + Csi.VolumeCapability csiVolumeCap = Csi.VolumeCapability.newBuilder() + .setAccessMode(Csi.VolumeCapability.AccessMode.newBuilder() + .setModeValue(cap.getAccessMode().ordinal())) // access mode + // TODO support block + .setMount(Csi.VolumeCapability.MountVolume.newBuilder() + // TODO support fsType + .setFsType("xfs") // fs type + .addAllMountFlags(cap.getMountFlags())) // mount flags + .build(); + builder.setVolumeCapability(csiVolumeCap); + builder.setVolumeId(messageA.getVolumeId()); + builder.setTargetPath(messageA.getTargetPath()); + builder.setReadonly(messageA.getReadOnly()); + builder.putAllNodePublishSecrets(messageA.getSecrets()); + builder.putAllPublishInfo(messageA.getPublishContext()); + builder.setStagingTargetPath(messageA.getStagingPath()); + return builder.build(); + } + + @Override + public NodePublishVolumeRequest convertFrom( + Csi.NodePublishVolumeRequest messageB) throws YarnException { + Csi.VolumeCapability cap0 = messageB.getVolumeCapability(); + ValidateVolumeCapabilitiesRequest.VolumeCapability cap = + new ValidateVolumeCapabilitiesRequest.VolumeCapability( + ValidateVolumeCapabilitiesRequest.AccessMode + .valueOf(cap0.getAccessMode().getMode().name()), + ValidateVolumeCapabilitiesRequest.VolumeType.FILE_SYSTEM, + cap0.getMount().getMountFlagsList()); + return NodePublishVolumeRequest.newInstance( + messageB.getVolumeId(), messageB.getReadonly(), + messageB.getTargetPath(), messageB.getStagingTargetPath(), + cap, messageB.getPublishInfoMap(), + messageB.getNodePublishSecretsMap()); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/translator/NodeUnpublishVolumeRequestProtoTranslator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/translator/NodeUnpublishVolumeRequestProtoTranslator.java new file mode 100644 index 00000000000..485237e3697 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/translator/NodeUnpublishVolumeRequestProtoTranslator.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.csi.translator; + +import csi.v0.Csi; +import org.apache.hadoop.yarn.api.protocolrecords.NodeUnpublishVolumeRequest; +import org.apache.hadoop.yarn.exceptions.YarnException; + +/** + * This class helps to transform a YARN side NodeUnpublishVolumeRequest + * to corresponding CSI protocol message. + * @param YARN NodeUnpublishVolumeRequest + * @param CSI NodeUnpublishVolumeRequest + */ +public class NodeUnpublishVolumeRequestProtoTranslator implements + ProtoTranslator { + + @Override + public Csi.NodeUnpublishVolumeRequest convertTo( + NodeUnpublishVolumeRequest messageA) throws YarnException { + return Csi.NodeUnpublishVolumeRequest.newBuilder() + .setVolumeId(messageA.getVolumeId()) + .setTargetPath(messageA.getTargetPath()) + .build(); + } + + @Override + public NodeUnpublishVolumeRequest convertFrom( + Csi.NodeUnpublishVolumeRequest messageB) throws YarnException { + return NodeUnpublishVolumeRequest + .newInstance(messageB.getVolumeId(), messageB.getTargetPath()); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/translator/ProtoTranslatorFactory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/translator/ProtoTranslatorFactory.java index 5eb76ffab5f..1a7306f0bf2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/translator/ProtoTranslatorFactory.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/translator/ProtoTranslatorFactory.java @@ -18,8 +18,11 @@ package org.apache.hadoop.yarn.csi.translator; import csi.v0.Csi; +import org.apache.hadoop.yarn.api.protocolrecords.NodePublishVolumeRequest; +import org.apache.hadoop.yarn.api.protocolrecords.NodeUnpublishVolumeRequest; import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest; import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetPluginInfoResponse; /** * Factory class to get desired proto transformer instance. @@ -57,6 +60,15 @@ public final class ProtoTranslatorFactory { } else if (yarnProto == ValidateVolumeCapabilitiesResponse.class && csiProto == Csi.ValidateVolumeCapabilitiesResponse.class) { return new ValidationVolumeCapabilitiesResponseProtoTranslator(); + } else if (yarnProto == NodePublishVolumeRequest.class + && csiProto == Csi.NodePublishVolumeRequest.class) { + return new NodePublishVolumeRequestProtoTranslator(); + } else if (yarnProto == GetPluginInfoResponse.class + && csiProto == Csi.GetPluginInfoResponse.class) { + return new GetPluginInfoResponseProtoTranslator(); + } else if (yarnProto == NodeUnpublishVolumeRequest.class + && csiProto == Csi.NodeUnpublishVolumeRequest.class) { + return new NodeUnpublishVolumeRequestProtoTranslator(); } throw new IllegalArgumentException("A problem is found while processing" + " proto message translating. Unexpected message types," diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/TestCsiAdaptorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/TestCsiAdaptorService.java index 128240d86d3..d6ee231b0c1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/TestCsiAdaptorService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/TestCsiAdaptorService.java @@ -33,7 +33,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesResp import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ValidateVolumeCapabilitiesRequestPBImpl; import org.apache.hadoop.yarn.client.NMProxy; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.csi.client.CsiClient; +import org.apache.hadoop.yarn.csi.client.ICsiClientTest; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.ipc.YarnRPC; import org.junit.AfterClass; @@ -81,89 +81,18 @@ public class TestCsiAdaptorService { conf.setSocketAddr( YarnConfiguration.NM_CSI_ADAPTOR_PREFIX + "test-driver.address", address); + conf.set( + YarnConfiguration.NM_CSI_DRIVER_PREFIX + "test-driver.endpoint", + "unix:///tmp/test-driver.scok"); CsiAdaptorProtocolService service = new CsiAdaptorProtocolService("test-driver", domainSocket); - - // inject a fake CSI client - // this client validates if the ValidateVolumeCapabilitiesRequest - // is integrity, and then reply a fake response - service.setCsiClient(new CsiClient() { - @Override - public Csi.GetPluginInfoResponse getPluginInfo() { - return Csi.GetPluginInfoResponse.newBuilder() - .setName("test-plugin") - .setVendorVersion("0.1") - .build(); - } - - @Override - public Csi.ValidateVolumeCapabilitiesResponse validateVolumeCapabilities( - Csi.ValidateVolumeCapabilitiesRequest request) { - // validate we get all info from the request - Assert.assertEquals("volume-id-0000123", request.getVolumeId()); - Assert.assertEquals(1, request.getVolumeCapabilitiesCount()); - Assert.assertEquals(Csi.VolumeCapability.AccessMode - .newBuilder().setModeValue(5).build(), - request.getVolumeCapabilities(0).getAccessMode()); - Assert.assertTrue(request.getVolumeCapabilities(0).hasMount()); - Assert.assertEquals(2, request.getVolumeCapabilities(0) - .getMount().getMountFlagsCount()); - Assert.assertTrue(request.getVolumeCapabilities(0) - .getMount().getMountFlagsList().contains("mountFlag1")); - Assert.assertTrue(request.getVolumeCapabilities(0) - .getMount().getMountFlagsList().contains("mountFlag2")); - Assert.assertEquals(2, request.getVolumeAttributesCount()); - Assert.assertEquals("v1", request.getVolumeAttributesMap().get("k1")); - Assert.assertEquals("v2", request.getVolumeAttributesMap().get("k2")); - // return a fake result - return Csi.ValidateVolumeCapabilitiesResponse.newBuilder() - .setSupported(false) - .setMessage("this is a test") - .build(); - } - }); - service.init(conf); service.start(); - try (CsiAdaptorProtocolPBClientImpl client = - new CsiAdaptorProtocolPBClientImpl(1L, address, new Configuration())) { - ValidateVolumeCapabilitiesRequest request = - ValidateVolumeCapabilitiesRequestPBImpl - .newInstance("volume-id-0000123", - ImmutableList.of( - new ValidateVolumeCapabilitiesRequest - .VolumeCapability( - MULTI_NODE_MULTI_WRITER, FILE_SYSTEM, - ImmutableList.of("mountFlag1", "mountFlag2"))), - ImmutableMap.of("k1", "v1", "k2", "v2")); - - ValidateVolumeCapabilitiesResponse response = client - .validateVolumeCapacity(request); - - Assert.assertEquals(false, response.isSupported()); - Assert.assertEquals("this is a test", response.getResponseMessage()); - } finally { - service.stop(); - } - } - - @Test - public void testValidateVolumeWithNMProxy() throws Exception { - ServerSocket ss = new ServerSocket(0); - ss.close(); - InetSocketAddress address = new InetSocketAddress(ss.getLocalPort()); - Configuration conf = new Configuration(); - conf.setSocketAddr( - YarnConfiguration.NM_CSI_ADAPTOR_PREFIX + "test-driver.address", - address); - CsiAdaptorProtocolService service = - new CsiAdaptorProtocolService("test-driver", domainSocket); - // inject a fake CSI client // this client validates if the ValidateVolumeCapabilitiesRequest // is integrity, and then reply a fake response - service.setCsiClient(new CsiClient() { + service.setCsiClient(new ICsiClientTest() { @Override public Csi.GetPluginInfoResponse getPluginInfo() { return Csi.GetPluginInfoResponse.newBuilder() @@ -199,9 +128,84 @@ public class TestCsiAdaptorService { } }); + try (CsiAdaptorProtocolPBClientImpl client = + new CsiAdaptorProtocolPBClientImpl(1L, address, new Configuration())) { + ValidateVolumeCapabilitiesRequest request = + ValidateVolumeCapabilitiesRequestPBImpl + .newInstance("volume-id-0000123", + ImmutableList.of( + new ValidateVolumeCapabilitiesRequest + .VolumeCapability( + MULTI_NODE_MULTI_WRITER, FILE_SYSTEM, + ImmutableList.of("mountFlag1", "mountFlag2"))), + ImmutableMap.of("k1", "v1", "k2", "v2")); + + ValidateVolumeCapabilitiesResponse response = client + .validateVolumeCapacity(request); + + Assert.assertEquals(false, response.isSupported()); + Assert.assertEquals("this is a test", response.getResponseMessage()); + } finally { + service.stop(); + } + } + + @Test + public void testValidateVolumeWithNMProxy() throws Exception { + ServerSocket ss = new ServerSocket(0); + ss.close(); + InetSocketAddress address = new InetSocketAddress(ss.getLocalPort()); + Configuration conf = new Configuration(); + conf.setSocketAddr( + YarnConfiguration.NM_CSI_ADAPTOR_PREFIX + "test-driver.address", + address); + conf.set( + YarnConfiguration.NM_CSI_DRIVER_PREFIX + "test-driver.endpoint", + "unix:///tmp/test-driver.scok"); + CsiAdaptorProtocolService service = + new CsiAdaptorProtocolService("test-driver", domainSocket); service.init(conf); service.start(); + // inject a fake CSI client + // this client validates if the ValidateVolumeCapabilitiesRequest + // is integrity, and then reply a fake response + service.setCsiClient(new ICsiClientTest() { + @Override + public Csi.GetPluginInfoResponse getPluginInfo() { + return Csi.GetPluginInfoResponse.newBuilder() + .setName("test-plugin") + .setVendorVersion("0.1") + .build(); + } + + @Override + public Csi.ValidateVolumeCapabilitiesResponse validateVolumeCapabilities( + Csi.ValidateVolumeCapabilitiesRequest request) { + // validate we get all info from the request + Assert.assertEquals("volume-id-0000123", request.getVolumeId()); + Assert.assertEquals(1, request.getVolumeCapabilitiesCount()); + Assert.assertEquals(Csi.VolumeCapability.AccessMode + .newBuilder().setModeValue(5).build(), + request.getVolumeCapabilities(0).getAccessMode()); + Assert.assertTrue(request.getVolumeCapabilities(0).hasMount()); + Assert.assertEquals(2, request.getVolumeCapabilities(0) + .getMount().getMountFlagsCount()); + Assert.assertTrue(request.getVolumeCapabilities(0) + .getMount().getMountFlagsList().contains("mountFlag1")); + Assert.assertTrue(request.getVolumeCapabilities(0) + .getMount().getMountFlagsList().contains("mountFlag2")); + Assert.assertEquals(2, request.getVolumeAttributesCount()); + Assert.assertEquals("v1", request.getVolumeAttributesMap().get("k1")); + Assert.assertEquals("v2", request.getVolumeAttributesMap().get("k2")); + // return a fake result + return Csi.ValidateVolumeCapabilitiesResponse.newBuilder() + .setSupported(false) + .setMessage("this is a test") + .build(); + } + }); + YarnRPC rpc = YarnRPC.create(conf); UserGroupInformation currentUser = UserGroupInformation.getCurrentUser(); CsiAdaptorProtocol adaptorClient = NMProxy diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/TestNodePublishVolumeRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/TestNodePublishVolumeRequest.java new file mode 100644 index 00000000000..5f7fa8774a4 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/TestNodePublishVolumeRequest.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.csi.adaptor; + +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.NodePublishVolumeRequestPBImpl; +import org.apache.hadoop.yarn.proto.CsiAdaptorProtos; +import org.apache.hadoop.yarn.proto.CsiAdaptorProtos.VolumeCapability.AccessMode; +import org.apache.hadoop.yarn.proto.CsiAdaptorProtos.VolumeCapability.VolumeType; +import org.junit.Assert; +import org.junit.Test; + +/** + * UT for NodePublishVolumeRequest. + */ +public class TestNodePublishVolumeRequest { + + @Test + public void testPBRecord() { + CsiAdaptorProtos.VolumeCapability capability = + CsiAdaptorProtos.VolumeCapability.newBuilder() + .setAccessMode(AccessMode.MULTI_NODE_READER_ONLY) + .setVolumeType(VolumeType.FILE_SYSTEM) + .build(); + CsiAdaptorProtos.NodePublishVolumeRequest proto = + CsiAdaptorProtos.NodePublishVolumeRequest.newBuilder() + .setReadonly(false) + .setVolumeId("test-vol-000001") + .setTargetPath("/mnt/data") + .setStagingTargetPath("/mnt/staging") + .setVolumeCapability(capability) + .build(); + + NodePublishVolumeRequestPBImpl pbImpl = + new NodePublishVolumeRequestPBImpl(proto); + Assert.assertEquals("test-vol-000001", pbImpl.getVolumeId()); + Assert.assertEquals("/mnt/data", pbImpl.getTargetPath()); + Assert.assertEquals("/mnt/staging", pbImpl.getStagingPath()); + Assert.assertFalse(pbImpl.getReadOnly()); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/client/ICsiClientTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/client/ICsiClientTest.java new file mode 100644 index 00000000000..2f150cb8dfb --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/client/ICsiClientTest.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.csi.client; + +import csi.v0.Csi; + +import java.io.IOException; + +/** + * This interface is used only in testing. It gives default implementation + * of all methods. + */ +public interface ICsiClientTest extends CsiClient { + + @Override + default Csi.GetPluginInfoResponse getPluginInfo() + throws IOException { + return null; + } + + @Override + default Csi.ValidateVolumeCapabilitiesResponse validateVolumeCapabilities( + Csi.ValidateVolumeCapabilitiesRequest request) throws IOException { + return null; + } + + @Override + default Csi.NodePublishVolumeResponse nodePublishVolume( + Csi.NodePublishVolumeRequest request) throws IOException { + return null; + } + + @Override + default Csi.NodeUnpublishVolumeResponse nodeUnpublishVolume( + Csi.NodeUnpublishVolumeRequest request) throws IOException { + return null; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java index faa695cf009..05658854b45 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java @@ -75,6 +75,10 @@ public interface Container extends EventHandler { void setWorkDir(String workDir); + String getCsiVolumesRootDir(); + + void setCsiVolumesRootDir(String volumesRootDir); + String getLogDir(); void setLogDir(String logDir); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java index e8f47914227..1d6ba2e1104 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java @@ -172,6 +172,7 @@ public class ContainerImpl implements Container { private SlidingWindowRetryPolicy.RetryContext windowRetryContext; private SlidingWindowRetryPolicy retryPolicy; + private String csiVolumesRootDir; private String workDir; private String logDir; private String host; @@ -936,6 +937,16 @@ public class ContainerImpl implements Container { this.workDir = workDir; } + @Override + public String getCsiVolumesRootDir() { + return csiVolumesRootDir; + } + + @Override + public void setCsiVolumesRootDir(String volumesRootDir) { + this.csiVolumesRootDir = volumesRootDir; + } + private void clearIpAndHost() { LOG.info("{} clearing ip and host", containerId); this.ips = null; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java index f06040ea344..9b6fae71833 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java @@ -250,6 +250,10 @@ public class ContainerLaunch implements Callable { Path containerWorkDir = deriveContainerWorkDir(); recordContainerWorkDir(containerID, containerWorkDir.toString()); + // Select a root dir for all csi volumes for the container + Path csiVolumesRoot = deriveCsiVolumesRootDir(); + recordContainerCsiVolumesRootDir(containerID, csiVolumesRoot.toString()); + String pidFileSubpath = getPidFileSubpath(appIdStr, containerIdStr); // pid file should be in nm private dir so that it is not // accessible by users @@ -358,6 +362,7 @@ public class ContainerLaunch implements Callable { .setUser(user) .setAppId(appIdStr) .setContainerWorkDir(containerWorkDir) + .setContainerCsiVolumesRootDir(csiVolumesRoot) .setLocalDirs(localDirs) .setLogDirs(logDirs) .setFilecacheDirs(filecacheDirs) @@ -388,6 +393,27 @@ public class ContainerLaunch implements Callable { return ret; } + /** + * Volumes mount point root: + * ${YARN_LOCAL_DIR}/usercache/${user}/filecache/csiVolumes/app/container + * CSI volumes may creates the mount point with different permission bits. + * If we create the volume mount under container work dir, it may + * mess up the existing permission structure, which is restricted by + * linux container executor. So we put all volume mounts under a same + * root dir so it is easier cleanup. + **/ + private Path deriveCsiVolumesRootDir() throws IOException { + final String containerVolumePath = + ContainerLocalizer.USERCACHE + Path.SEPARATOR + + container.getUser() + Path.SEPARATOR + + ContainerLocalizer.FILECACHE + Path.SEPARATOR + + ContainerLocalizer.CSI_VOLIUME_MOUNTS_ROOT + Path.SEPARATOR + + app.getAppId().toString() + Path.SEPARATOR + + container.getContainerId().toString(); + return dirsHandler.getLocalPathForWrite(containerVolumePath, + LocalDirAllocator.SIZE_UNKNOWN, false); + } + private Path deriveContainerWorkDir() throws IOException { final String containerWorkDirPath = @@ -1752,6 +1778,12 @@ public class ContainerLaunch implements Callable { } } + private void recordContainerCsiVolumesRootDir(ContainerId containerId, + String volumesRoot) throws IOException { + container.setCsiVolumesRootDir(volumesRoot); + // TODO persistent to the NM store... + } + protected Path getContainerWorkDir() throws IOException { String containerWorkDir = container.getWorkDir(); if (containerWorkDir == null diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/runtime/DockerLinuxContainerRuntime.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/runtime/DockerLinuxContainerRuntime.java index 4970c7c8b79..7fc386d9120 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/runtime/DockerLinuxContainerRuntime.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/runtime/DockerLinuxContainerRuntime.java @@ -24,7 +24,10 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; +import org.apache.hadoop.yarn.api.CsiAdaptorProtocol; +import org.apache.hadoop.yarn.api.impl.pb.client.CsiAdaptorProtocolPBClientImpl; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerCommand; import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerCommandExecutor; @@ -35,7 +38,9 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime. import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerVolumeCommand; import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.DockerCommandPlugin; import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePlugin; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.volume.csi.ContainerVolumePublisher; import org.apache.hadoop.yarn.util.DockerClientConfigHandler; +import org.apache.hadoop.yarn.util.csi.CsiConfigUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -69,6 +74,7 @@ import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerExecContext; import java.io.File; import java.io.IOException; import java.net.InetAddress; +import java.net.InetSocketAddress; import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.nio.file.Files; @@ -76,6 +82,7 @@ import java.nio.file.Paths; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -262,6 +269,7 @@ public class DockerLinuxContainerRuntime implements LinuxContainerRuntime { private Configuration conf; private Context nmContext; private DockerClient dockerClient; + private Map csiClients = new HashMap<>(); private PrivilegedOperationExecutor privilegedOperationExecutor; private String defaultImageName; private Set allowedNetworks = new HashSet<>(); @@ -363,6 +371,9 @@ public class DockerLinuxContainerRuntime implements LinuxContainerRuntime { throw new ContainerExecutionException(message); } + // initialize csi adaptors if necessary + initiateCsiClients(conf); + privilegedContainersAcl = new AccessControlList(conf.getTrimmed( YarnConfiguration.NM_DOCKER_PRIVILEGED_CONTAINERS_ACL, YarnConfiguration.DEFAULT_NM_DOCKER_PRIVILEGED_CONTAINERS_ACL)); @@ -398,6 +409,10 @@ public class DockerLinuxContainerRuntime implements LinuxContainerRuntime { YarnConfiguration.NM_DOCKER_DEFAULT_TMPFS_MOUNTS))); } + public Map getCsiClients() { + return csiClients; + } + @Override public boolean isRuntimeRequested(Map env) { return isDockerContainerRequested(conf, env); @@ -942,6 +957,18 @@ public class DockerLinuxContainerRuntime implements LinuxContainerRuntime { } } + ContainerVolumePublisher publisher = new ContainerVolumePublisher( + container, container.getCsiVolumesRootDir(), this); + try { + Map volumeMounts = publisher.publishVolumes(); + volumeMounts.forEach((local, remote) -> + runCommand.addReadWriteMountLocation(local, remote)); + } catch (YarnException | IOException e) { + throw new ContainerExecutionException( + "Container requests for volume resource but we are failed" + + " to publish volumes on this node"); + } + if (environment.containsKey(ENV_DOCKER_CONTAINER_TMPFS_MOUNTS)) { String[] tmpfsMounts = environment.get(ENV_DOCKER_CONTAINER_TMPFS_MOUNTS) .split(","); @@ -1442,6 +1469,14 @@ public class DockerLinuxContainerRuntime implements LinuxContainerRuntime { ContainerExecutor.Signal signal) throws ContainerExecutionException { Container container = ctx.getContainer(); + ContainerVolumePublisher publisher = new ContainerVolumePublisher( + container, container.getCsiVolumesRootDir(), this); + try { + publisher.unpublishVolumes(); + } catch (YarnException | IOException e) { + throw new ContainerExecutionException(e); + } + // Only need to check whether the container was asked to be privileged. // If the container had failed the permissions checks upon launch, it // would have never been launched and thus we wouldn't be here @@ -1537,4 +1572,33 @@ public class DockerLinuxContainerRuntime implements LinuxContainerRuntime { } } + /** + * Initiate CSI clients to talk to the CSI adaptors on this node and + * cache the clients for easier fetch. + * @param config configuration + * @throws ContainerExecutionException + */ + private void initiateCsiClients(Configuration config) + throws ContainerExecutionException { + String[] driverNames = CsiConfigUtils.getCsiDriverNames(config); + if (driverNames != null && driverNames.length > 0) { + for (String driverName : driverNames) { + try { + // find out the adaptors service address + InetSocketAddress adaptorServiceAddress = + CsiConfigUtils.getCsiAdaptorAddressForDriver(driverName, config); + LOG.info("Initializing a csi-adaptor-client for csi-adaptor {}," + + " csi-driver {}", adaptorServiceAddress.toString(), driverName); + CsiAdaptorProtocolPBClientImpl client = + new CsiAdaptorProtocolPBClientImpl(1L, adaptorServiceAddress, + config); + csiClients.put(driverName, client); + } catch (IOException e1) { + throw new ContainerExecutionException(e1.getMessage()); + } catch (YarnException e2) { + throw new ContainerExecutionException(e2.getMessage()); + } + } + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java index c5b8625b7cd..859f0c32196 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java @@ -101,6 +101,7 @@ public class ContainerLocalizer { new FsPermission((short)0710); private static final FsPermission USERCACHE_FOLDER_PERMS = new FsPermission((short) 0755); + public static final String CSI_VOLIUME_MOUNTS_ROOT = "csivolumes"; private final String user; private final String appId; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/volume/csi/ContainerVolumePublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/volume/csi/ContainerVolumePublisher.java new file mode 100644 index 00000000000..78f2d2db02a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/volume/csi/ContainerVolumePublisher.java @@ -0,0 +1,205 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.nodemanager.containermanager.volume.csi; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.apache.hadoop.yarn.api.CsiAdaptorProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.NodePublishVolumeRequest; +import org.apache.hadoop.yarn.api.protocolrecords.NodeUnpublishVolumeRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceInformation; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.DockerLinuxContainerRuntime; +import org.apache.hadoop.yarn.server.volume.csi.VolumeMetaData; +import org.apache.hadoop.yarn.server.volume.csi.exception.InvalidVolumeException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Publish/un-publish CSI volumes on node manager. + */ +public class ContainerVolumePublisher { + + private static final Logger LOG = + LoggerFactory.getLogger(ContainerVolumePublisher.class); + + private final Container container; + private final String localMountRoot; + private final DockerLinuxContainerRuntime runtime; + + public ContainerVolumePublisher(Container container, String localMountRoot, + DockerLinuxContainerRuntime runtime) { + LOG.info("Initiate container volume publisher, containerID={}," + + " volume local mount rootDir={}", + container.getContainerId().toString(), localMountRoot); + this.container = container; + this.localMountRoot = localMountRoot; + this.runtime = runtime; + } + + /** + * It first discovers the volume info from container resource; + * then negotiates with CSI driver adaptor to publish the volume on this + * node manager, on a specific directory under container's work dir; + * and then map the local mounted directory to volume target mount in + * the docker container. + * + * CSI volume publish is a two phase work, by reaching up here + * we can assume the 1st phase is done on the RM side, which means + * YARN is already called the controller service of csi-driver + * to publish the volume; here we only need to call the node service of + * csi-driver to publish the volume on this local node manager. + * + * @return a map where each key is the local mounted path on current node, + * and value is the remote mount path on the container. + * @throws YarnException + * @throws IOException + */ + public Map publishVolumes() throws YarnException, + IOException { + LOG.info("publishing volumes"); + Map volumeMounts = new HashMap<>(); + List volumes = getVolumes(); + LOG.info("Found {} volumes to be published on this node", volumes.size()); + for (VolumeMetaData volume : volumes) { + Map bindings = publishVolume(volume); + if (bindings != null && !bindings.isEmpty()) { + volumeMounts.putAll(bindings); + } + } + return volumeMounts; + } + + public void unpublishVolumes() throws YarnException, IOException { + LOG.info("Un-publishing Volumes"); + List volumes = getVolumes(); + LOG.info("Volumes to un-publish {}", volumes.size()); + for (VolumeMetaData volume : volumes) { + this.unpublishVolume(volume); + } + } + + private File getLocalVolumeMountPath( + String containerWorkDir, String volumeId) { + return new File(containerWorkDir, volumeId + "_mount"); + } + + private File getLocalVolumeStagingPath( + String containerWorkDir, String volumeId) { + return new File(containerWorkDir, volumeId + "_staging"); + } + + private List getVolumes() throws InvalidVolumeException { + List volumes = new ArrayList<>(); + Resource containerResource = container.getResource(); + if (containerResource != null) { + for (ResourceInformation resourceInformation : + containerResource.getAllResourcesListCopy()) { + if (resourceInformation.getTags().contains("system:csi-volume")) { + volumes.addAll(VolumeMetaData.fromResource(resourceInformation)); + } + } + } + if (volumes.size() > 0) { + LOG.info("Total number of volumes require provisioning is {}", + volumes.size()); + } + return volumes; + } + + private Map publishVolume(VolumeMetaData volume) + throws IOException, YarnException { + Map bindVolumes = new HashMap<>(); + // compose a local mount for CSI volume with the container ID + File localMount = getLocalVolumeMountPath( + localMountRoot, volume.getVolumeId().toString()); + File localStaging = getLocalVolumeStagingPath( + localMountRoot, volume.getVolumeId().toString()); + LOG.info("Volume {}, local mount path: {}, local staging path {}", + volume.getVolumeId().toString(), localMount, localStaging); + + NodePublishVolumeRequest publishRequest = NodePublishVolumeRequest + .newInstance(volume.getVolumeId().getId(), // volume Id + false, // read only flag + localMount.getAbsolutePath(), // target path + localStaging.getAbsolutePath(), // staging path + new ValidateVolumeCapabilitiesRequest.VolumeCapability( + ValidateVolumeCapabilitiesRequest + .AccessMode.SINGLE_NODE_WRITER, + ValidateVolumeCapabilitiesRequest.VolumeType.FILE_SYSTEM, + ImmutableList.of()), // capability + ImmutableMap.of(), // publish context + ImmutableMap.of()); // secrets + + // make sure the volume is a known type + if (runtime.getCsiClients().get(volume.getDriverName()) == null) { + throw new YarnException("No csi-adaptor is found that can talk" + + " to csi-driver " + volume.getDriverName()); + } + + // publish volume to node + LOG.info("Publish volume on NM, request {}", + publishRequest.toString()); + runtime.getCsiClients().get(volume.getDriverName()) + .nodePublishVolume(publishRequest); + // once succeed, bind the container to this mount + String containerMountPath = volume.getMountPoint(); + bindVolumes.put(localMount.getAbsolutePath(), containerMountPath); + return bindVolumes; + } + + private void unpublishVolume(VolumeMetaData volume) + throws YarnException, IOException { + CsiAdaptorProtocol csiClient = + runtime.getCsiClients().get(volume.getDriverName()); + if (csiClient == null) { + throw new YarnException( + "No csi-adaptor is found that can talk" + + " to csi-driver " + volume.getDriverName()); + } + + // When container is launched, the container work dir is memorized, + // and that is also the dir we mount the volume to. + File localMount = getLocalVolumeMountPath(container.getCsiVolumesRootDir(), + volume.getVolumeId().toString()); + if (!localMount.exists()) { + LOG.info("Local mount {} no longer exist, skipping cleaning" + + " up the volume", localMount.getAbsolutePath()); + return; + } + NodeUnpublishVolumeRequest unpublishRequest = + NodeUnpublishVolumeRequest.newInstance( + volume.getVolumeId().getId(), // volume id + localMount.getAbsolutePath()); // target path + + // un-publish volume from node + LOG.info("Un-publish volume {}, request {}", + volume.getVolumeId().toString(), unpublishRequest.toString()); + csiClient.nodeUnpublishVolume(unpublishRequest); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/volume/csi/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/volume/csi/package-info.java new file mode 100644 index 00000000000..5b894b884d1 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/volume/csi/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * CSI volumes. + */ +package org.apache.hadoop.yarn.server.nodemanager.containermanager.volume.csi; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/executor/ContainerStartContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/executor/ContainerStartContext.java index 444a1e0a64c..01d34f169fb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/executor/ContainerStartContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/executor/ContainerStartContext.java @@ -45,6 +45,7 @@ public final class ContainerStartContext { private final String user; private final String appId; private final Path containerWorkDir; + private final Path csiVolumesRootDir; private final List localDirs; private final List logDirs; private final List filecacheDirs; @@ -64,6 +65,7 @@ public final class ContainerStartContext { private String user; private String appId; private Path containerWorkDir; + private Path csiVolumesRoot; private List localDirs; private List logDirs; private List filecacheDirs; @@ -118,6 +120,11 @@ public final class ContainerStartContext { return this; } + public Builder setContainerCsiVolumesRootDir(Path csiVolumesRootDir) { + this.csiVolumesRoot = csiVolumesRootDir; + return this; + } + public Builder setContainerWorkDir(Path containerWorkDir) { this.containerWorkDir = containerWorkDir; return this; @@ -188,6 +195,7 @@ public final class ContainerStartContext { this.containerLogDirs = builder.containerLogDirs; this.userFilecacheDirs = builder.userFilecacheDirs; this.applicationLocalDirs = builder.applicationLocalDirs; + this.csiVolumesRootDir = builder.csiVolumesRoot; } public Container getContainer() { @@ -262,4 +270,8 @@ public final class ContainerStartContext { public List getApplicationLocalDirs() { return Collections.unmodifiableList(this.applicationLocalDirs); } + + public Path getCsiVolumesRootDir() { + return this.csiVolumesRootDir; + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java index c6860001f6a..f1b39bdb961 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java @@ -173,6 +173,16 @@ public class MockContainer implements Container { public void setWorkDir(String workDir) { } + @Override + public String getCsiVolumesRootDir() { + return null; + } + + @Override + public void setCsiVolumesRootDir(String volumesRootDir) { + + } + @Override public String getLogDir() { return null; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/VolumeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/VolumeImpl.java index 82a4acb04fa..068e8a4fe24 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/VolumeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/VolumeImpl.java @@ -146,6 +146,8 @@ public class VolumeImpl implements Volume { @Override public VolumeState transition(VolumeImpl volume, VolumeEvent volumeEvent) { + // Some of CSI driver implementation does't provide the capability + // to validate volumes. Skip this for now. try { // this call could cross node, we should keep the message tight // TODO we should parse the capability from volume resource spec