diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/volume/csi/CsiAdaptorClientProtocol.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/volume/csi/CsiAdaptorClientProtocol.java new file mode 100644 index 00000000000..b894d4e046e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/volume/csi/CsiAdaptorClientProtocol.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.volume.csi; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.server.volume.csi.exception.VolumeException; + +/** + * Protocol for the CSI adaptor. + */ +@Private +@Unstable +public interface CsiAdaptorClientProtocol { + + void validateVolume() throws VolumeException; + + void controllerPublishVolume() throws VolumeException; +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/volume/csi/CsiConstants.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/volume/csi/CsiConstants.java new file mode 100644 index 00000000000..fcf9cf4f332 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/volume/csi/CsiConstants.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.volume.csi; + +/** + * CSI constants. + */ +public final class CsiConstants { + + private CsiConstants() { + // Hide the constructor for this constant class. + } + + public static final String CSI_VOLUME_NAME = "volume.name"; + public static final String CSI_VOLUME_ID = "volume.id"; + public static final String CSI_VOLUME_CAPABILITY = "volume.capability"; + public static final String CSI_DRIVER_NAME = "driver.name"; + public static final String CSI_VOLUME_MOUNT = "volume.mount"; + public static final String CSI_VOLUME_ACCESS_MODE = "volume.accessMode"; + + public static final String CSI_VOLUME_RESOURCE_TAG = "system:csi-volume"; +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/volume/csi/VolumeCapabilityRange.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/volume/csi/VolumeCapabilityRange.java new file mode 100644 index 00000000000..e4775fe92e2 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/volume/csi/VolumeCapabilityRange.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.volume.csi; + +import com.google.common.base.Strings; +import org.apache.hadoop.yarn.server.volume.csi.exception.InvalidVolumeException; + +/** + * Volume capability range that specified in a volume resource request, + * this range defines the desired min/max capacity. + */ +public final class VolumeCapabilityRange { + + private final long minCapacity; + private final long maxCapacity; + private final String unit; + + private VolumeCapabilityRange(long minCapacity, + long maxCapacity, String unit) { + this.minCapacity = minCapacity; + this.maxCapacity = maxCapacity; + this.unit = unit; + } + + public long getMinCapacity() { + return minCapacity; + } + + public long getMaxCapacity() { + return maxCapacity; + } + + public String getUnit() { + return unit; + } + + @Override + public String toString() { + return "MinCapability: " + minCapacity + unit + + ", MaxCapability: " + maxCapacity + unit; + } + + public static VolumeCapabilityBuilder newBuilder() { + return new VolumeCapabilityBuilder(); + } + + /** + * The builder used to build a VolumeCapabilityRange instance. + */ + public static class VolumeCapabilityBuilder { + // An invalid default value implies this value must be set + private long minCap = -1L; + private long maxCap = Long.MAX_VALUE; + private String unit; + + public VolumeCapabilityBuilder minCapacity(long minCapacity) { + this.minCap = minCapacity; + return this; + } + + public VolumeCapabilityBuilder maxCapacity(long maxCapacity) { + this.maxCap = maxCapacity; + return this; + } + + public VolumeCapabilityBuilder unit(String capacityUnit) { + this.unit = capacityUnit; + return this; + } + + public VolumeCapabilityRange build() throws InvalidVolumeException { + VolumeCapabilityRange + capability = new VolumeCapabilityRange(minCap, maxCap, unit); + validateCapability(capability); + return capability; + } + + private void validateCapability(VolumeCapabilityRange capability) + throws InvalidVolumeException { + if (capability.getMinCapacity() < 0) { + throw new InvalidVolumeException("Invalid volume capability range," + + " minimal capability must not be less than 0. Capability: " + + capability.toString()); + } + if (Strings.isNullOrEmpty(capability.getUnit())) { + throw new InvalidVolumeException("Invalid volume capability range," + + " capability unit is missing. Capability: " + + capability.toString()); + } + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/volume/csi/VolumeId.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/volume/csi/VolumeId.java new file mode 100644 index 00000000000..8acc95e4d9a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/volume/csi/VolumeId.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.volume.csi; + +import org.apache.commons.lang3.builder.HashCodeBuilder; +import org.apache.hadoop.util.StringUtils; + +/** + * Unique ID for a volume. This may or may not come from a storage system, + * YARN depends on this ID to recognized volumes and manage their states. + */ +public class VolumeId { + + private final String volumeId; + + public VolumeId(String volumeId) { + this.volumeId = volumeId; + } + + public String getId() { + return this.volumeId; + } + + @Override + public String toString() { + return this.volumeId; + } + + @Override + public boolean equals(Object obj) { + if (obj == null || !(obj instanceof VolumeId)) { + return false; + } + return StringUtils.equalsIgnoreCase(volumeId, + ((VolumeId) obj).getId()); + } + + @Override + public int hashCode() { + HashCodeBuilder hc = new HashCodeBuilder(); + hc.append(volumeId); + return hc.toHashCode(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/volume/csi/VolumeMetaData.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/volume/csi/VolumeMetaData.java new file mode 100644 index 00000000000..7f2c92ca818 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/volume/csi/VolumeMetaData.java @@ -0,0 +1,227 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.volume.csi; + +import com.google.common.base.Strings; +import com.google.gson.JsonObject; +import org.apache.hadoop.yarn.api.records.ResourceInformation; +import org.apache.hadoop.yarn.server.volume.csi.exception.InvalidVolumeException; + +import java.util.ArrayList; +import java.util.List; + +/** + * VolumeMetaData defines all valid info for a CSI compatible volume. + */ +public class VolumeMetaData { + + private VolumeId volumeId; + private String volumeName; + private VolumeCapabilityRange volumeCapabilityRange; + private String driverName; + private String mountPoint; + + private void setVolumeId(VolumeId volumeId) { + this.volumeId = volumeId; + } + + private void setVolumeName(String volumeName) { + this.volumeName = volumeName; + } + + private void setVolumeCapabilityRange(VolumeCapabilityRange capability) { + this.volumeCapabilityRange = capability; + } + + private void setDriverName(String driverName) { + this.driverName = driverName; + } + + private void setMountPoint(String mountPoint) { + this.mountPoint = mountPoint; + } + + public boolean isProvisionedVolume() { + return this.volumeId != null; + } + + public VolumeId getVolumeId() { + return volumeId; + } + + public String getVolumeName() { + return volumeName; + } + + public VolumeCapabilityRange getVolumeCapabilityRange() { + return volumeCapabilityRange; + } + + public String getDriverName() { + return driverName; + } + + public String getMountPoint() { + return mountPoint; + } + + public static VolumeSpecBuilder newBuilder() { + return new VolumeSpecBuilder(); + } + + public static List fromResource( + ResourceInformation resourceInfo) throws InvalidVolumeException { + List volumeMetaData = new ArrayList<>(); + if (resourceInfo != null) { + if (resourceInfo.getTags() != null && resourceInfo.getTags() + .contains(CsiConstants.CSI_VOLUME_RESOURCE_TAG)) { + VolumeSpecBuilder builder = VolumeMetaData.newBuilder(); + // Volume ID + if (resourceInfo.getAttributes() + .containsKey(CsiConstants.CSI_VOLUME_ID)) { + String id = resourceInfo.getAttributes() + .get(CsiConstants.CSI_VOLUME_ID); + builder.volumeId(new VolumeId(id)); + } + // Volume name + if (resourceInfo.getAttributes() + .containsKey(CsiConstants.CSI_VOLUME_NAME)) { + builder.volumeName(resourceInfo.getAttributes() + .get(CsiConstants.CSI_VOLUME_NAME)); + } + // CSI driver name + if (resourceInfo.getAttributes() + .containsKey(CsiConstants.CSI_DRIVER_NAME)) { + builder.driverName(resourceInfo.getAttributes() + .get(CsiConstants.CSI_DRIVER_NAME)); + } + // Mount path + if (resourceInfo.getAttributes() + .containsKey(CsiConstants.CSI_VOLUME_MOUNT)) { + builder.mountPoint(resourceInfo.getAttributes() + .get(CsiConstants.CSI_VOLUME_MOUNT)); + } + // Volume capability + VolumeCapabilityRange volumeCapabilityRange = + VolumeCapabilityRange.newBuilder() + .minCapacity(resourceInfo.getValue()) + .unit(resourceInfo.getUnits()) + .build(); + builder.capability(volumeCapabilityRange); + volumeMetaData.add(builder.build()); + } + } + return volumeMetaData; + } + + @Override + public String toString() { + JsonObject json = new JsonObject(); + if (!Strings.isNullOrEmpty(volumeName)) { + json.addProperty(CsiConstants.CSI_VOLUME_NAME, volumeName); + } + if (volumeId != null) { + json.addProperty(CsiConstants.CSI_VOLUME_ID, volumeId.toString()); + } + if (volumeCapabilityRange != null) { + json.addProperty(CsiConstants.CSI_VOLUME_CAPABILITY, + volumeCapabilityRange.toString()); + } + if (!Strings.isNullOrEmpty(driverName)) { + json.addProperty(CsiConstants.CSI_DRIVER_NAME, driverName); + } + if (!Strings.isNullOrEmpty(mountPoint)) { + json.addProperty(CsiConstants.CSI_VOLUME_MOUNT, mountPoint); + } + return json.toString(); + } + + /** + * The builder used to build a VolumeMetaData instance. + */ + public static class VolumeSpecBuilder { + // @CreateVolumeRequest + // The suggested name for the storage space. + private VolumeId volumeId; + private String volumeName; + private VolumeCapabilityRange volumeCapabilityRange; + private String driverName; + private String mountPoint; + + public VolumeSpecBuilder volumeId(VolumeId volumeId) { + this.volumeId = volumeId; + return this; + } + + public VolumeSpecBuilder volumeName(String name) { + this.volumeName = name; + return this; + } + + public VolumeSpecBuilder driverName(String driverName) { + this.driverName = driverName; + return this; + } + + public VolumeSpecBuilder mountPoint(String mountPoint) { + this.mountPoint = mountPoint; + return this; + } + + public VolumeSpecBuilder capability(VolumeCapabilityRange capability) { + this.volumeCapabilityRange = capability; + return this; + } + + public VolumeMetaData build() throws InvalidVolumeException { + VolumeMetaData spec = new VolumeMetaData(); + spec.setVolumeId(volumeId); + spec.setVolumeName(volumeName); + spec.setVolumeCapabilityRange(volumeCapabilityRange); + spec.setDriverName(driverName); + spec.setMountPoint(mountPoint); + validate(spec); + return spec; + } + + private void validate(VolumeMetaData spec) throws InvalidVolumeException { + // Volume name OR Volume ID must be set + if (Strings.isNullOrEmpty(spec.getVolumeName()) + && spec.getVolumeId() == null) { + throw new InvalidVolumeException("Invalid volume, both volume name" + + " and ID are missing from the spec. Volume spec: " + + spec.toString()); + } + // Volume capability must be set + if (spec.getVolumeCapabilityRange() == null) { + throw new InvalidVolumeException("Invalid volume, volume capability" + + " is missing. Volume spec: " + spec.toString()); + } + // CSI driver name must be set + if (Strings.isNullOrEmpty(spec.getDriverName())) { + throw new InvalidVolumeException("Invalid volume, the csi-driver name" + + " is missing. Volume spec: " + spec.toString()); + } + // Mount point must be set + if (Strings.isNullOrEmpty(spec.getMountPoint())) { + throw new InvalidVolumeException("Invalid volume, the mount point" + + " is missing. Volume spec: " + spec.toString()); + } + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/volume/csi/exception/InvalidVolumeException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/volume/csi/exception/InvalidVolumeException.java new file mode 100644 index 00000000000..0559e8a1afb --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/volume/csi/exception/InvalidVolumeException.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.volume.csi.exception; + +/** + * This exception is thrown when a volume is found not valid. + */ +public class InvalidVolumeException extends VolumeException { + + public InvalidVolumeException(String message) { + super(message); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/volume/csi/exception/VolumeException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/volume/csi/exception/VolumeException.java new file mode 100644 index 00000000000..60f96599c7c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/volume/csi/exception/VolumeException.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.volume.csi.exception; + +import org.apache.hadoop.yarn.exceptions.YarnException; + +/** + * Base class for all volume related exceptions. + */ +public class VolumeException extends YarnException { + + public VolumeException(String message) { + super(message); + } + + public VolumeException(String message, Exception e) { + super(message, e); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/volume/csi/exception/VolumeProvisioningException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/volume/csi/exception/VolumeProvisioningException.java new file mode 100644 index 00000000000..348eaf10cc5 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/volume/csi/exception/VolumeProvisioningException.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.volume.csi.exception; + +/** + * Exception throws when volume provisioning is failed. + */ +public class VolumeProvisioningException extends VolumeException { + + public VolumeProvisioningException(String message) { + super(message); + } + + public VolumeProvisioningException(String message, Exception e) { + super(message, e); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/volume/csi/exception/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/volume/csi/exception/package-info.java new file mode 100644 index 00000000000..40737f04f03 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/volume/csi/exception/package-info.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * This package contains volume related exception classes. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +package org.apache.hadoop.yarn.server.volume.csi.exception; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/volume/csi/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/volume/csi/package-info.java new file mode 100644 index 00000000000..ef4ffef5646 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/volume/csi/package-info.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * This package contains common volume related classes. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +package org.apache.hadoop.yarn.server.volume.csi; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java index 35620781677..f829a4cbbc4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java @@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManag import org.apache.hadoop.yarn.server.resourcemanager.security.ProxyCAManager; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager; +import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.VolumeManager; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.SystemClock; @@ -121,6 +122,7 @@ public class RMActiveServiceContext { private MultiNodeSortingManager multiNodeSortingManager; private ProxyCAManager proxyCAManager; + private VolumeManager volumeManager; public RMActiveServiceContext() { queuePlacementManager = new PlacementManager(); @@ -569,4 +571,16 @@ public class RMActiveServiceContext { public void setProxyCAManager(ProxyCAManager proxyCAManager) { this.proxyCAManager = proxyCAManager; } + + @Private + @Unstable + public VolumeManager getVolumeManager() { + return this.volumeManager; + } + + @Private + @Unstable + public void setVolumeManager(VolumeManager volumeManager) { + this.volumeManager = volumeManager; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java index f06befe2eae..4e9846c731b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java @@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.ProxyCAManager; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager; +import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.VolumeManager; /** * Context of the ResourceManager. @@ -193,4 +194,8 @@ public interface RMContext extends ApplicationMasterServiceContext { ProxyCAManager getProxyCAManager(); void setProxyCAManager(ProxyCAManager proxyCAManager); + + VolumeManager getVolumeManager(); + + void setVolumeManager(VolumeManager volumeManager); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java index 48f74d368e9..ab71134c93f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java @@ -62,6 +62,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.ProxyCAManager; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager; +import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.VolumeManager; import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils; import org.apache.hadoop.yarn.util.Clock; @@ -648,6 +649,17 @@ public class RMContextImpl implements RMContext { public void setProxyCAManager(ProxyCAManager proxyCAManager) { this.activeServiceContext.setProxyCAManager(proxyCAManager); } + + @Override + public VolumeManager getVolumeManager() { + return activeServiceContext.getVolumeManager(); + } + + @Override + public void setVolumeManager(VolumeManager volumeManager) { + this.activeServiceContext.setVolumeManager(volumeManager); + } + // Note: Read java doc before adding any services over here. @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index a89069a5247..69d50f217e8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -109,6 +109,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRen import org.apache.hadoop.yarn.server.resourcemanager.security.ProxyCAManager; import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager; import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager; +import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.VolumeManager; +import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.VolumeManagerImpl; +import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.processor.VolumeAMSProcessor; import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebApp; import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebAppUtil; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; @@ -136,6 +139,7 @@ import java.nio.charset.Charset; import java.security.PrivilegedExceptionAction; import java.security.SecureRandom; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -845,6 +849,16 @@ public class ResourceManager extends CompositeService addIfService(systemServiceManager); } + // Add volume manager to RM context when it is necessary + String[] amsProcessorList = conf.getStrings( + YarnConfiguration.RM_APPLICATION_MASTER_SERVICE_PROCESSORS); + if (amsProcessorList != null&& Arrays.stream(amsProcessorList) + .anyMatch(s -> VolumeAMSProcessor.class.getName().equals(s))) { + VolumeManager volumeManager = new VolumeManagerImpl(); + rmContext.setVolumeManager(volumeManager); + addIfService(volumeManager); + } + super.serviceInit(conf); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/CsiAdaptorClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/CsiAdaptorClient.java new file mode 100644 index 00000000000..043e7ae8deb --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/CsiAdaptorClient.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.volume.csi; + +import org.apache.hadoop.yarn.server.volume.csi.exception.VolumeException; +import org.apache.hadoop.yarn.server.volume.csi.CsiAdaptorClientProtocol; + +/** + * Client talks to CSI adaptor. + */ +public class CsiAdaptorClient implements CsiAdaptorClientProtocol { + + @Override + public void validateVolume() throws VolumeException { + // TODO + } + + @Override public void controllerPublishVolume() throws VolumeException { + // TODO + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/VolumeBuilder.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/VolumeBuilder.java new file mode 100644 index 00000000000..af093737cd8 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/VolumeBuilder.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.volume.csi; + +import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.Volume; +import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.VolumeImpl; +import org.apache.hadoop.yarn.server.volume.csi.VolumeCapabilityRange; +import org.apache.hadoop.yarn.server.volume.csi.VolumeId; +import org.apache.hadoop.yarn.server.volume.csi.VolumeMetaData; +import org.apache.hadoop.yarn.server.volume.csi.exception.InvalidVolumeException; + +import java.util.Optional; +import java.util.UUID; + +/** + * Helper class to build a {@link Volume}. + */ +public final class VolumeBuilder { + + private String id; + private String name; + private Long min; + private Long max; + private String unit; + private String driver; + private String mount; + + private VolumeBuilder() { + // hide constructor + } + + public static VolumeBuilder newBuilder() { + return new VolumeBuilder(); + } + + public VolumeBuilder volumeId(String volumeId) { + this.id = volumeId; + return this; + } + + public VolumeBuilder volumeName(String volumeName) { + this.name = volumeName; + return this; + } + + public VolumeBuilder minCapability(long minCapability) { + this.min = Long.valueOf(minCapability); + return this; + } + + public VolumeBuilder maxCapability(long maxCapability) { + this.max = Long.valueOf(maxCapability); + return this; + } + + public VolumeBuilder unit(String capUnit) { + this.unit = capUnit; + return this; + } + + public VolumeBuilder driverName(String driverName) { + this.driver = driverName; + return this; + } + + public VolumeBuilder mountPoint(String mountPoint) { + this.mount = mountPoint; + return this; + } + + public Volume build() throws InvalidVolumeException { + VolumeId vid = new VolumeId( + Optional.ofNullable(id) + .orElse(UUID.randomUUID().toString())); + + VolumeCapabilityRange volumeCap = VolumeCapabilityRange.newBuilder() + .minCapacity(Optional.ofNullable(min).orElse(0L)) + .maxCapacity(Optional.ofNullable(max).orElse(Long.MAX_VALUE)) + .unit(Optional.ofNullable(unit).orElse("Gi")) + .build(); + + VolumeMetaData meta = VolumeMetaData.newBuilder() + .capability(volumeCap) + .driverName(Optional.ofNullable(driver).orElse("test-driver")) + .mountPoint(Optional.ofNullable(mount).orElse("/mnt/data")) + .volumeName(name) + .volumeId(vid) + .build(); + return new VolumeImpl(meta); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/VolumeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/VolumeManager.java new file mode 100644 index 00000000000..5f2669de497 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/VolumeManager.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.volume.csi; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.Volume; +import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.provisioner.VolumeProvisioningResults; +import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.provisioner.VolumeProvisioningTask; +import org.apache.hadoop.yarn.server.volume.csi.CsiAdaptorClientProtocol; + +import java.util.concurrent.ScheduledFuture; + +/** + * Main interface for volume manager that manages all volumes. + * Volume manager talks to a CSI controller plugin to handle the + * volume operations before it is available to be published on + * any node manager. + */ +@Private +@Unstable +public interface VolumeManager { + + /** + * @return all known volumes and their states. + */ + @VisibleForTesting + VolumeStates getVolumeStates(); + + @VisibleForTesting + void setClient(CsiAdaptorClientProtocol client); + + /** + * Start to supervise on a volume. + * @param volume + * @return the volume being managed by the manager. + */ + Volume addOrGetVolume(Volume volume); + + /** + * Execute volume provisioning tasks as backend threads. + * @param volumeProvisioningTask + * @param delaySecond + */ + ScheduledFuture schedule( + VolumeProvisioningTask volumeProvisioningTask, int delaySecond); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/VolumeManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/VolumeManagerImpl.java new file mode 100644 index 00000000000..5252f535146 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/VolumeManagerImpl.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.volume.csi; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.Volume; +import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.VolumeImpl; +import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.provisioner.VolumeProvisioningResults; +import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.provisioner.VolumeProvisioningTask; +import org.apache.hadoop.yarn.server.volume.csi.CsiAdaptorClientProtocol; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +/** + * A service manages all volumes. + */ +public class VolumeManagerImpl extends AbstractService + implements VolumeManager { + + private static final Log LOG = LogFactory.getLog(VolumeManagerImpl.class); + + private final VolumeStates volumeStates; + private ScheduledExecutorService provisioningExecutor; + private CsiAdaptorClientProtocol adaptorClient; + + private final static int PROVISIONING_TASK_THREAD_POOL_SIZE = 10; + + public VolumeManagerImpl() { + super(VolumeManagerImpl.class.getName()); + this.volumeStates = new VolumeStates(); + this.provisioningExecutor = Executors + .newScheduledThreadPool(PROVISIONING_TASK_THREAD_POOL_SIZE); + this.adaptorClient = new CsiAdaptorClient(); + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + super.serviceInit(conf); + } + + @Override + protected void serviceStart() throws Exception { + super.serviceStart(); + } + + @Override + protected void serviceStop() throws Exception { + provisioningExecutor.shutdown(); + super.serviceStop(); + } + + @Override + public VolumeStates getVolumeStates() { + return this.volumeStates; + } + + @Override + public Volume addOrGetVolume(Volume volume) { + if (volumeStates.getVolume(volume.getVolumeId()) != null) { + // volume already exists + return volumeStates.getVolume(volume.getVolumeId()); + } else { + // add the volume and set the client + ((VolumeImpl) volume).setClient(adaptorClient); + this.volumeStates.addVolumeIfAbsent(volume); + return volume; + } + } + + @VisibleForTesting + public void setClient(CsiAdaptorClientProtocol client) { + this.adaptorClient = client; + } + + @Override + public ScheduledFuture schedule( + VolumeProvisioningTask volumeProvisioningTask, + int delaySecond) { + LOG.info("Scheduling provision volume task (with delay " + + delaySecond + "s)," + " handling " + + volumeProvisioningTask.getVolumes().size() + + " volume provisioning"); + return provisioningExecutor.schedule(volumeProvisioningTask, + delaySecond, TimeUnit.SECONDS); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/VolumeStates.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/VolumeStates.java new file mode 100644 index 00000000000..fcef3f7f270 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/VolumeStates.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.volume.csi; + +import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.Volume; +import org.apache.hadoop.yarn.server.volume.csi.VolumeId; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Volume manager states, including all managed volumes and their states. + */ +public class VolumeStates { + + private final Map volumeStates; + + public VolumeStates() { + this.volumeStates = new ConcurrentHashMap<>(); + } + + public Volume getVolume(VolumeId volumeId) { + return volumeStates.get(volumeId); + } + + /** + * Add volume if it is not yet added. + * If a new volume is added with a same {@link VolumeId} + * with a existing volume, existing volume will be returned. + * @param volume volume to add + * @return volume added or existing volume + */ + public Volume addVolumeIfAbsent(Volume volume) { + if (volume.getVolumeId() != null) { + return volumeStates.putIfAbsent(volume.getVolumeId(), volume); + } else { + // for dynamical provisioned volumes, + // the volume ID might not be available at time being. + // we can makeup one with the combination of driver+volumeName+timestamp + // once the volume ID is generated, we should replace ID. + return volume; + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/event/ControllerPublishVolumeEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/event/ControllerPublishVolumeEvent.java new file mode 100644 index 00000000000..3e294aa5e15 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/event/ControllerPublishVolumeEvent.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.volume.csi.event; + +import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.Volume; + +/** + * Trigger controller publish. + */ +public class ControllerPublishVolumeEvent extends VolumeEvent { + + public ControllerPublishVolumeEvent(Volume volume) { + super(volume, VolumeEventType.CONTROLLER_PUBLISH_VOLUME_EVENT); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/event/ValidateVolumeEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/event/ValidateVolumeEvent.java new file mode 100644 index 00000000000..5e0c5e36968 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/event/ValidateVolumeEvent.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.volume.csi.event; + +import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.Volume; + +/** + * Validate volume capability with the CSI driver. + */ +public class ValidateVolumeEvent extends VolumeEvent { + + public ValidateVolumeEvent(Volume volume) { + super(volume, VolumeEventType.VALIDATE_VOLUME_EVENT); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/event/VolumeEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/event/VolumeEvent.java new file mode 100644 index 00000000000..2a3388743c3 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/event/VolumeEvent.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.volume.csi.event; + +import org.apache.hadoop.yarn.event.AbstractEvent; +import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.Volume; +import org.apache.hadoop.yarn.server.volume.csi.VolumeId; + +/** + * Base volume event class that used to trigger volume state transitions. + */ +public class VolumeEvent extends AbstractEvent { + + private Volume volume; + + public VolumeEvent(Volume volume, VolumeEventType volumeEventType) { + super(volumeEventType, System.currentTimeMillis()); + this.volume = volume; + } + + public Volume getVolume() { + return this.volume; + } + + public VolumeId getVolumeId() { + return this.volume.getVolumeId(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/event/VolumeEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/event/VolumeEventType.java new file mode 100644 index 00000000000..572e60d0bbd --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/event/VolumeEventType.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.volume.csi.event; + +/** + * Volume events. + */ +public enum VolumeEventType { + VALIDATE_VOLUME_EVENT, + CREATE_VOLUME_EVENT, + CONTROLLER_PUBLISH_VOLUME_EVENT, + CONTROLLER_UNPUBLISH_VOLUME_EVENT, + DELETE_VOLUME +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/event/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/event/package-info.java new file mode 100644 index 00000000000..7d53281a559 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/event/package-info.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * This package contains volume related events. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +package org.apache.hadoop.yarn.server.resourcemanager.volume.csi.event; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/Volume.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/Volume.java new file mode 100644 index 00000000000..68e89b0b34a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/Volume.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.event.VolumeEvent; +import org.apache.hadoop.yarn.server.volume.csi.VolumeId; + +/** + * Major volume interface at RM's view, it maintains the volume states and + * state transition according to the CSI volume lifecycle. + */ +@Private +@Unstable +public interface Volume extends EventHandler { + + VolumeState getVolumeState(); + + VolumeId getVolumeId(); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/VolumeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/VolumeImpl.java new file mode 100644 index 00000000000..25150473b96 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/VolumeImpl.java @@ -0,0 +1,199 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.CsiAdaptorClient; +import org.apache.hadoop.yarn.server.volume.csi.CsiAdaptorClientProtocol; +import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.event.VolumeEvent; +import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.event.VolumeEventType; +import org.apache.hadoop.yarn.state.InvalidStateTransitionException; +import org.apache.hadoop.yarn.state.MultipleArcTransition; +import org.apache.hadoop.yarn.state.StateMachine; +import org.apache.hadoop.yarn.state.StateMachineFactory; +import org.apache.hadoop.yarn.server.volume.csi.VolumeId; +import org.apache.hadoop.yarn.server.volume.csi.VolumeMetaData; +import org.apache.hadoop.yarn.server.volume.csi.exception.VolumeException; + +import java.util.EnumSet; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + * This class maintains the volume states and state transition + * according to the CSI volume lifecycle. Volume states are stored in + * {@link org.apache.hadoop.yarn.server.resourcemanager.volume.csi.VolumeStates} + * class. + */ +public class VolumeImpl implements Volume { + + private static final Log LOG = LogFactory.getLog(VolumeImpl.class); + + private final Lock readLock; + private final Lock writeLock; + private final StateMachine + stateMachine; + + private final VolumeId volumeId; + private final VolumeMetaData volumeMeta; + private CsiAdaptorClientProtocol client; + + public VolumeImpl(VolumeMetaData volumeMeta) { + ReadWriteLock lock = new ReentrantReadWriteLock(); + this.writeLock = lock.writeLock(); + this.readLock = lock.readLock(); + this.volumeId = volumeMeta.getVolumeId(); + this.volumeMeta = volumeMeta; + this.stateMachine = createVolumeStateFactory().make(this); + this.client = new CsiAdaptorClient(); + } + + @VisibleForTesting + public void setClient(CsiAdaptorClientProtocol client) { + this.client = client; + } + + public CsiAdaptorClientProtocol getClient() { + return this.client; + } + + private StateMachineFactory createVolumeStateFactory() { + return new StateMachineFactory< + VolumeImpl, VolumeState, VolumeEventType, VolumeEvent>(VolumeState.NEW) + .addTransition( + VolumeState.NEW, + EnumSet.of(VolumeState.VALIDATED, VolumeState.UNAVAILABLE), + VolumeEventType.VALIDATE_VOLUME_EVENT, + new ValidateVolumeTransition()) + .addTransition(VolumeState.VALIDATED, VolumeState.VALIDATED, + VolumeEventType.VALIDATE_VOLUME_EVENT) + .addTransition( + VolumeState.VALIDATED, + EnumSet.of(VolumeState.NODE_READY, VolumeState.UNAVAILABLE), + VolumeEventType.CONTROLLER_PUBLISH_VOLUME_EVENT, + new ControllerPublishVolumeTransition()) + .addTransition( + VolumeState.UNAVAILABLE, + EnumSet.of(VolumeState.UNAVAILABLE, VolumeState.VALIDATED), + VolumeEventType.VALIDATE_VOLUME_EVENT, + new ValidateVolumeTransition()) + .addTransition( + VolumeState.UNAVAILABLE, + VolumeState.UNAVAILABLE, + EnumSet.of(VolumeEventType.CONTROLLER_PUBLISH_VOLUME_EVENT)) + .addTransition( + VolumeState.NODE_READY, + VolumeState.NODE_READY, + EnumSet.of(VolumeEventType.CONTROLLER_PUBLISH_VOLUME_EVENT, + VolumeEventType.VALIDATE_VOLUME_EVENT)) + .installTopology(); + } + + @Override + public VolumeState getVolumeState() { + try { + readLock.lock(); + return stateMachine.getCurrentState(); + } finally { + readLock.unlock(); + } + } + + @Override + public VolumeId getVolumeId() { + try { + readLock.lock(); + return this.volumeId; + } finally { + readLock.unlock(); + } + } + + private static class ValidateVolumeTransition + implements MultipleArcTransition { + @Override + public VolumeState transition(VolumeImpl volume, + VolumeEvent volumeEvent) { + try { + // this call could cross node, we should keep the message tight + volume.getClient().validateVolume(); + return VolumeState.VALIDATED; + } catch (VolumeException e) { + LOG.warn("Got exception while calling the CSI adaptor", e); + return VolumeState.UNAVAILABLE; + } + } + } + + private static class ControllerPublishVolumeTransition + implements MultipleArcTransition { + + @Override + public VolumeState transition(VolumeImpl volume, + VolumeEvent volumeEvent) { + try { + // this call could cross node, we should keep the message tight + volume.getClient().controllerPublishVolume(); + return VolumeState.NODE_READY; + } catch (VolumeException e) { + LOG.warn("Got exception while calling the CSI adaptor", e); + return volume.getVolumeState(); + } + } + } + + @Override + public void handle(VolumeEvent event) { + try { + this.writeLock.lock(); + VolumeId volumeId = event.getVolumeId(); + + if (volumeId == null) { + // This should not happen, safely ignore the event + LOG.warn("Unexpected volume event received, event type is " + + event.getType().name() + ", but the volumeId is null."); + return; + } + + LOG.info("Processing volume event, type=" + event.getType().name() + + ", volumeId=" + volumeId.toString()); + + VolumeState oldState = null; + VolumeState newState = null; + try { + oldState = stateMachine.getCurrentState(); + newState = stateMachine.doTransition(event.getType(), event); + } catch (InvalidStateTransitionException e) { + LOG.warn("Can't handle this event at current state: Current: [" + + oldState + "], eventType: [" + event.getType() + "]," + + " volumeId: [" + volumeId + "]", e); + } + + if (newState != null && oldState != newState) { + LOG.info("VolumeImpl " + volumeId + " transitioned from " + oldState + + " to " + newState); + } + }finally { + this.writeLock.unlock(); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/VolumeState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/VolumeState.java new file mode 100644 index 00000000000..9beb09ad192 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/VolumeState.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle; + +/** + * Volume states + * Volume states are defined in the CSI spec, see more in volume lifecycle. + */ +public enum VolumeState { + // initial state + NEW, + // volume capacity validated + VALIDATED, + // volume created by the controller + CREATED, + // controller published the volume + NODE_READY, + // unavailable + UNAVAILABLE +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/package-info.java new file mode 100644 index 00000000000..a9dd3896643 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/package-info.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * This package contains classes to manage volume lifecycle. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +package org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/package-info.java new file mode 100644 index 00000000000..5d71617d52b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/package-info.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * This package contains classes to manage CSI volumes. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +package org.apache.hadoop.yarn.server.resourcemanager.volume.csi; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/processor/VolumeAMSProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/processor/VolumeAMSProcessor.java new file mode 100644 index 00000000000..f275768c38b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/processor/VolumeAMSProcessor.java @@ -0,0 +1,158 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.volume.csi.processor; + +import org.apache.hadoop.yarn.ams.ApplicationMasterServiceContext; +import org.apache.hadoop.yarn.ams.ApplicationMasterServiceProcessor; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceInformation; +import org.apache.hadoop.yarn.api.records.SchedulingRequest; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.VolumeManager; +import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.Volume; +import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.VolumeImpl; +import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.provisioner.VolumeProvisioningResults; +import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.provisioner.VolumeProvisioningTask; +import org.apache.hadoop.yarn.server.volume.csi.VolumeMetaData; +import org.apache.hadoop.yarn.server.volume.csi.exception.InvalidVolumeException; +import org.apache.hadoop.yarn.server.volume.csi.exception.VolumeException; +import org.apache.hadoop.yarn.server.volume.csi.exception.VolumeProvisioningException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * AMS processor that handles volume resource requests. + * + */ +public class VolumeAMSProcessor implements ApplicationMasterServiceProcessor { + + private static final Logger LOG = LoggerFactory + .getLogger(VolumeAMSProcessor.class); + + private ApplicationMasterServiceProcessor nextAMSProcessor; + private VolumeManager volumeManager; + + @Override + public void init(ApplicationMasterServiceContext amsContext, + ApplicationMasterServiceProcessor nextProcessor) { + LOG.info("Initializing CSI volume processor"); + this.nextAMSProcessor = nextProcessor; + this.volumeManager = ((RMContext) amsContext).getVolumeManager(); + } + + @Override + public void registerApplicationMaster( + ApplicationAttemptId applicationAttemptId, + RegisterApplicationMasterRequest request, + RegisterApplicationMasterResponse response) + throws IOException, YarnException { + this.nextAMSProcessor.registerApplicationMaster(applicationAttemptId, + request, response); + } + + @Override + public void allocate(ApplicationAttemptId appAttemptId, + AllocateRequest request, AllocateResponse response) throws YarnException { + List volumes = aggregateVolumesFrom(request); + if (volumes != null && volumes.size() > 0) { + ScheduledFuture result = + this.volumeManager.schedule(new VolumeProvisioningTask(volumes), 0); + try { + VolumeProvisioningResults volumeResult = + result.get(3, TimeUnit.SECONDS); + if (!volumeResult.isSuccess()) { + throw new VolumeProvisioningException("Volume provisioning failed," + + " result details: " + volumeResult.getBriefMessage()); + } + } catch (TimeoutException | InterruptedException | ExecutionException e) { + LOG.warn("Volume provisioning task failed", e); + throw new VolumeException("Volume provisioning task failed", e); + } + } + + // Go to next processor + this.nextAMSProcessor.allocate(appAttemptId, request, response); + } + + // Currently only scheduling request is supported. + private List aggregateVolumesFrom(AllocateRequest request) + throws VolumeException { + List volumeList = new ArrayList<>(); + List requests = request.getSchedulingRequests(); + if (requests != null) { + for (SchedulingRequest req : requests) { + Resource totalResource = req.getResourceSizing().getResources(); + List resourceList = + totalResource.getAllResourcesListCopy(); + for (ResourceInformation resourceInformation : resourceList) { + List volumes = + VolumeMetaData.fromResource(resourceInformation); + for (VolumeMetaData vs : volumes) { + if (vs.getVolumeCapabilityRange().getMinCapacity() <= 0) { + // capacity not specified, ignore + continue; + } else if (vs.isProvisionedVolume()) { + volumeList.add(checkAndGetVolume(vs)); + } else { + throw new InvalidVolumeException("Only pre-provisioned volume" + + " is supported now, volumeID must exist."); + } + } + } + } + } + return volumeList; + } + + /** + * If given volume ID already exists in the volume manager, + * it returns the existing volume. Otherwise, it creates a new + * volume and add that to volume manager. + * @param metaData + * @return volume + */ + private Volume checkAndGetVolume(VolumeMetaData metaData) { + Volume toAdd = new VolumeImpl(metaData); + return this.volumeManager.addOrGetVolume(toAdd); + } + + @Override + public void finishApplicationMaster( + ApplicationAttemptId applicationAttemptId, + FinishApplicationMasterRequest request, + FinishApplicationMasterResponse response) { + this.nextAMSProcessor.finishApplicationMaster(applicationAttemptId, + request, response); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/processor/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/processor/package-info.java new file mode 100644 index 00000000000..788a4179a80 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/processor/package-info.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * This package contains AMS processor class for volume handling. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +package org.apache.hadoop.yarn.server.resourcemanager.volume.csi.processor; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/provisioner/VolumeProvisioner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/provisioner/VolumeProvisioner.java new file mode 100644 index 00000000000..47f0df2e70b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/provisioner/VolumeProvisioner.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.volume.csi.provisioner; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; + +import java.util.concurrent.Callable; + +/** + * A task interface to provision a volume to expected state. + */ +@Private +@Unstable +public interface VolumeProvisioner extends Callable { + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/provisioner/VolumeProvisioningResults.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/provisioner/VolumeProvisioningResults.java new file mode 100644 index 00000000000..657fa6c97fd --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/provisioner/VolumeProvisioningResults.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.volume.csi.provisioner; + +import com.google.gson.JsonObject; +import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.VolumeState; +import org.apache.hadoop.yarn.server.volume.csi.VolumeId; + +import java.util.HashMap; +import java.util.Map; + +/** + * Result of volumes' provisioning. + */ +public class VolumeProvisioningResults { + + private Map resultMap; + + public VolumeProvisioningResults() { + this.resultMap = new HashMap<>(); + } + + public boolean isSuccess() { + return !resultMap.isEmpty() && resultMap.values().stream() + .allMatch(subResult -> subResult.isSuccess()); + } + + public String getBriefMessage() { + JsonObject obj = new JsonObject(); + obj.addProperty("TotalVolumes", resultMap.size()); + + JsonObject failed = new JsonObject(); + for (VolumeProvisioningResult result : resultMap.values()) { + if (!result.isSuccess()) { + failed.addProperty(result.getVolumeId().toString(), + result.getVolumeState().name()); + } + } + obj.add("failedVolumesStates", failed); + return obj.toString(); + } + + static class VolumeProvisioningResult { + + private VolumeId volumeId; + private VolumeState volumeState; + private boolean success; + + VolumeProvisioningResult(VolumeId volumeId, VolumeState state) { + this.volumeId = volumeId; + this.volumeState = state; + this.success = state == VolumeState.NODE_READY; + } + + public boolean isSuccess() { + return this.success; + } + + public VolumeId getVolumeId() { + return this.volumeId; + } + + public VolumeState getVolumeState() { + return this.volumeState; + } + } + + public void addResult(VolumeId volumeId, VolumeState state) { + this.resultMap.put(volumeId, + new VolumeProvisioningResult(volumeId, state)); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/provisioner/VolumeProvisioningTask.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/provisioner/VolumeProvisioningTask.java new file mode 100644 index 00000000000..eb35431ea3a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/provisioner/VolumeProvisioningTask.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.volume.csi.provisioner; + +import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.event.ControllerPublishVolumeEvent; +import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.event.ValidateVolumeEvent; +import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.Volume; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +/** + * A provisioning task encapsulates all the logic required by a storage system + * to provision a volume. This class is the common implementation, it might + * be override if the provisioning behavior of a certain storage system + * is not completely align with this implementation. + */ +public class VolumeProvisioningTask implements VolumeProvisioner { + + private static final Logger LOG = LoggerFactory + .getLogger(VolumeProvisioningTask.class); + + private List volumes; + + public VolumeProvisioningTask(List volumes) { + this.volumes = volumes; + } + + public List getVolumes() { + return this.volumes; + } + + @Override + public VolumeProvisioningResults call() throws Exception { + VolumeProvisioningResults vpr = new VolumeProvisioningResults(); + + // Wait all volumes are reaching expected state + for (Volume vs : volumes) { + LOG.info("Provisioning volume : {}", vs.getVolumeId().toString()); + vs.handle(new ValidateVolumeEvent(vs)); + vs.handle(new ControllerPublishVolumeEvent(vs)); + } + + // collect results + volumes.stream().forEach(v -> + vpr.addResult(v.getVolumeId(), v.getVolumeState())); + + return vpr; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/provisioner/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/provisioner/package-info.java new file mode 100644 index 00000000000..92b4bdf1402 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/provisioner/package-info.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * This package contains the volume provisioning classes. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +package org.apache.hadoop.yarn.server.resourcemanager.volume.csi.provisioner; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/package-info.java new file mode 100644 index 00000000000..3f9bc938f79 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/package-info.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * This package contains classes to manage storage volumes in YARN. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +package org.apache.hadoop.yarn.server.resourcemanager.volume; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/TestVolumeCapabilityRange.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/TestVolumeCapabilityRange.java new file mode 100644 index 00000000000..51328641121 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/TestVolumeCapabilityRange.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.volume.csi; + +import org.apache.hadoop.yarn.server.volume.csi.exception.InvalidVolumeException; +import org.apache.hadoop.yarn.server.volume.csi.VolumeCapabilityRange; +import org.junit.Assert; +import org.junit.Test; + +/** + * Test cases for volume capability. + */ +public class TestVolumeCapabilityRange { + + @Test(expected = InvalidVolumeException.class) + public void testInvalidMinCapability() throws InvalidVolumeException { + VolumeCapabilityRange.newBuilder() + .minCapacity(-1L) + .maxCapacity(5L) + .unit("Gi") + .build(); + } + + @Test(expected = InvalidVolumeException.class) + public void testMissingMinCapability() throws InvalidVolumeException { + VolumeCapabilityRange.newBuilder() + .maxCapacity(5L) + .unit("Gi") + .build(); + } + + @Test(expected = InvalidVolumeException.class) + public void testMissingUnit() throws InvalidVolumeException { + VolumeCapabilityRange.newBuilder() + .minCapacity(0L) + .maxCapacity(5L) + .build(); + } + + @Test + public void testGetVolumeCapability() throws InvalidVolumeException { + VolumeCapabilityRange vc = VolumeCapabilityRange.newBuilder() + .minCapacity(0L) + .maxCapacity(5L) + .unit("Gi") + .build(); + + Assert.assertEquals(0L, vc.getMinCapacity()); + Assert.assertEquals(5L, vc.getMaxCapacity()); + Assert.assertEquals("Gi", vc.getUnit()); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/TestVolumeLifecycle.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/TestVolumeLifecycle.java new file mode 100644 index 00000000000..18c23e8d6b7 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/TestVolumeLifecycle.java @@ -0,0 +1,161 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.volume.csi; + +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.event.ControllerPublishVolumeEvent; +import org.apache.hadoop.yarn.server.volume.csi.CsiAdaptorClientProtocol; +import org.apache.hadoop.yarn.server.volume.csi.exception.InvalidVolumeException; +import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.event.ValidateVolumeEvent; +import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.VolumeImpl; +import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.VolumeState; +import org.apache.hadoop.yarn.server.volume.csi.exception.VolumeException; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; + +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.mockito.Mockito.*; + +/** + * Test cases for volume lifecycle management. + */ +public class TestVolumeLifecycle { + + @Test + public void testValidation() throws InvalidVolumeException { + VolumeImpl volume = (VolumeImpl) VolumeBuilder.newBuilder() + .volumeId("test_vol_00000001") + .maxCapability(5L) + .unit("Gi") + .mountPoint("/path/to/mount") + .driverName("test-driver-name") + .build(); + Assert.assertEquals(VolumeState.NEW, volume.getVolumeState()); + + volume.handle(new ValidateVolumeEvent(volume)); + Assert.assertEquals(VolumeState.VALIDATED, volume.getVolumeState()); + } + + @Test + public void testValidationFailure() throws VolumeException { + VolumeImpl volume = (VolumeImpl) VolumeBuilder + .newBuilder().volumeId("test_vol_00000001").build(); + CsiAdaptorClientProtocol mockedClient = Mockito + .mock(CsiAdaptorClientProtocol.class); + volume.setClient(mockedClient); + + // NEW -> UNAVAILABLE + // Simulate a failed API call to the adaptor + doThrow(new VolumeException("failed")).when(mockedClient).validateVolume(); + volume.handle(new ValidateVolumeEvent(volume)); + + try { + // Verify the countdown did not happen + GenericTestUtils.waitFor(() -> + volume.getVolumeState() == VolumeState.VALIDATED, 10, 50); + Assert.fail("Validate state not reached," + + " it should keep waiting until timeout"); + } catch (Exception e) { + Assert.assertTrue(e instanceof TimeoutException); + Assert.assertEquals(VolumeState.UNAVAILABLE, volume.getVolumeState()); + } + } + + @Test + public void testValidated() throws InvalidVolumeException { + AtomicInteger validatedTimes = new AtomicInteger(0); + VolumeImpl volume = (VolumeImpl) VolumeBuilder + .newBuilder().volumeId("test_vol_00000001").build(); + CsiAdaptorClientProtocol mockedClient = new CsiAdaptorClientProtocol() { + @Override + public void validateVolume() { + validatedTimes.incrementAndGet(); + } + + @Override + public void controllerPublishVolume() { + // do nothing + } + }; + // The client has a count to memorize how many times being called + volume.setClient(mockedClient); + + // NEW -> VALIDATED + Assert.assertEquals(VolumeState.NEW, volume.getVolumeState()); + volume.handle(new ValidateVolumeEvent(volume)); + Assert.assertEquals(VolumeState.VALIDATED, volume.getVolumeState()); + Assert.assertEquals(1, validatedTimes.get()); + + // VALIDATED -> VALIDATED + volume.handle(new ValidateVolumeEvent(volume)); + Assert.assertEquals(VolumeState.VALIDATED, volume.getVolumeState()); + Assert.assertEquals(1, validatedTimes.get()); + } + + @Test + public void testUnavailableState() throws VolumeException { + VolumeImpl volume = (VolumeImpl) VolumeBuilder + .newBuilder().volumeId("test_vol_00000001").build(); + CsiAdaptorClientProtocol mockedClient = Mockito + .mock(CsiAdaptorClientProtocol.class); + volume.setClient(mockedClient); + + // NEW -> UNAVAILABLE + doThrow(new VolumeException("failed")).when(mockedClient) + .validateVolume(); + Assert.assertEquals(VolumeState.NEW, volume.getVolumeState()); + volume.handle(new ValidateVolumeEvent(volume)); + Assert.assertEquals(VolumeState.UNAVAILABLE, volume.getVolumeState()); + + // UNAVAILABLE -> UNAVAILABLE + volume.handle(new ValidateVolumeEvent(volume)); + Assert.assertEquals(VolumeState.UNAVAILABLE, volume.getVolumeState()); + + // UNAVAILABLE -> VALIDATED + doNothing().when(mockedClient).validateVolume(); + volume.setClient(mockedClient); + volume.handle(new ValidateVolumeEvent(volume)); + Assert.assertEquals(VolumeState.VALIDATED, volume.getVolumeState()); + } + + @Test + public void testPublishUnavailableVolume() throws VolumeException { + VolumeImpl volume = (VolumeImpl) VolumeBuilder + .newBuilder().volumeId("test_vol_00000001").build(); + CsiAdaptorClientProtocol mockedClient = Mockito + .mock(CsiAdaptorClientProtocol.class); + volume.setClient(mockedClient); + + // NEW -> UNAVAILABLE (on validateVolume) + doThrow(new VolumeException("failed")).when(mockedClient) + .validateVolume(); + Assert.assertEquals(VolumeState.NEW, volume.getVolumeState()); + volume.handle(new ValidateVolumeEvent(volume)); + Assert.assertEquals(VolumeState.UNAVAILABLE, volume.getVolumeState()); + + // UNAVAILABLE -> UNAVAILABLE (on publishVolume) + volume.handle(new ControllerPublishVolumeEvent(volume)); + // controller publish is not called since the state is UNAVAILABLE + verify(mockedClient, times(0)).controllerPublishVolume(); + // state remains to UNAVAILABLE + Assert.assertEquals(VolumeState.UNAVAILABLE, volume.getVolumeState()); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/TestVolumeMetaData.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/TestVolumeMetaData.java new file mode 100644 index 00000000000..38dbe034e4d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/TestVolumeMetaData.java @@ -0,0 +1,178 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.volume.csi; + +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import org.apache.hadoop.yarn.server.volume.csi.CsiConstants; +import org.apache.hadoop.yarn.server.volume.csi.VolumeCapabilityRange; +import org.apache.hadoop.yarn.server.volume.csi.VolumeMetaData; +import org.apache.hadoop.yarn.server.volume.csi.exception.InvalidVolumeException; +import org.apache.hadoop.yarn.server.volume.csi.VolumeId; +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; + +/** + * Test cases for volume specification definition and parsing. + */ +public class TestVolumeMetaData { + + @Test + public void testPreprovisionedVolume() throws InvalidVolumeException { + VolumeCapabilityRange cap = VolumeCapabilityRange.newBuilder() + .minCapacity(1L) + .maxCapacity(5L) + .unit("Gi") + .build(); + + // When volume id is given, volume name is optional + VolumeMetaData meta = VolumeMetaData.newBuilder() + .volumeId(new VolumeId("id-000001")) + .capability(cap) + .driverName("csi-demo-driver") + .mountPoint("/mnt/data") + .build(); + + Assert.assertEquals(new VolumeId("id-000001"), meta.getVolumeId()); + Assert.assertEquals(1L, meta.getVolumeCapabilityRange().getMinCapacity()); + Assert.assertEquals(5L, meta.getVolumeCapabilityRange().getMaxCapacity()); + Assert.assertEquals("Gi", meta.getVolumeCapabilityRange().getUnit()); + Assert.assertEquals("csi-demo-driver", meta.getDriverName()); + Assert.assertEquals("/mnt/data", meta.getMountPoint()); + Assert.assertNull(meta.getVolumeName()); + Assert.assertTrue(meta.isProvisionedVolume()); + + // Test toString + JsonParser parser = new JsonParser(); + JsonElement element = parser.parse(meta.toString()); + JsonObject json = element.getAsJsonObject(); + Assert.assertNotNull(json); + Assert.assertNull(json.get(CsiConstants.CSI_VOLUME_NAME)); + Assert.assertEquals("id-000001", + json.get(CsiConstants.CSI_VOLUME_ID).getAsString()); + Assert.assertEquals("csi-demo-driver", + json.get(CsiConstants.CSI_DRIVER_NAME).getAsString()); + Assert.assertEquals("/mnt/data", + json.get(CsiConstants.CSI_VOLUME_MOUNT).getAsString()); + + } + + @Test + public void testDynamicalProvisionedVolume() throws InvalidVolumeException { + VolumeCapabilityRange cap = VolumeCapabilityRange.newBuilder() + .minCapacity(1L) + .maxCapacity(5L) + .unit("Gi") + .build(); + + // When volume name is given, volume id is optional + VolumeMetaData meta = VolumeMetaData.newBuilder() + .volumeName("volume-name") + .capability(cap) + .driverName("csi-demo-driver") + .mountPoint("/mnt/data") + .build(); + Assert.assertNotNull(meta); + + Assert.assertEquals("volume-name", meta.getVolumeName()); + Assert.assertEquals(1L, meta.getVolumeCapabilityRange().getMinCapacity()); + Assert.assertEquals(5L, meta.getVolumeCapabilityRange().getMaxCapacity()); + Assert.assertEquals("Gi", meta.getVolumeCapabilityRange().getUnit()); + Assert.assertEquals("csi-demo-driver", meta.getDriverName()); + Assert.assertEquals("/mnt/data", meta.getMountPoint()); + Assert.assertFalse(meta.isProvisionedVolume()); + + // Test toString + JsonParser parser = new JsonParser(); + JsonElement element = parser.parse(meta.toString()); + JsonObject json = element.getAsJsonObject(); + Assert.assertNotNull(json); + Assert.assertNull(json.get(CsiConstants.CSI_VOLUME_ID)); + Assert.assertEquals("volume-name", + json.get(CsiConstants.CSI_VOLUME_NAME).getAsString()); + Assert.assertEquals("csi-demo-driver", + json.get(CsiConstants.CSI_DRIVER_NAME).getAsString()); + Assert.assertEquals("/mnt/data", + json.get(CsiConstants.CSI_VOLUME_MOUNT).getAsString()); + } + + @Test(expected = InvalidVolumeException.class) + public void testMissingMountpoint() throws InvalidVolumeException { + VolumeCapabilityRange cap = VolumeCapabilityRange.newBuilder() + .minCapacity(1L) + .maxCapacity(5L) + .unit("Gi") + .build(); + + VolumeMetaData.newBuilder() + .volumeId(new VolumeId("id-000001")) + .capability(cap) + .driverName("csi-demo-driver") + .build(); + } + + + @Test(expected = InvalidVolumeException.class) + public void testMissingCsiDriverName() throws InvalidVolumeException { + VolumeCapabilityRange cap = VolumeCapabilityRange.newBuilder() + .minCapacity(1L) + .maxCapacity(5L) + .unit("Gi") + .build(); + + VolumeMetaData.newBuilder() + .volumeId(new VolumeId("id-000001")) + .capability(cap) + .mountPoint("/mnt/data") + .build(); + } + + @Test(expected = InvalidVolumeException.class) + public void testMissingVolumeCapability() throws InvalidVolumeException { + VolumeMetaData.newBuilder() + .volumeId(new VolumeId("id-000001")) + .driverName("csi-demo-driver") + .mountPoint("/mnt/data") + .build(); + } + + @Test + public void testVolumeId() { + VolumeId id1 = new VolumeId("test00001"); + VolumeId id11 = new VolumeId("test00001"); + VolumeId id2 = new VolumeId("test00002"); + + Assert.assertEquals(id1, id11); + Assert.assertEquals(id1.hashCode(), id11.hashCode()); + Assert.assertNotEquals(id1, id2); + + HashMap map = new HashMap<>(); + map.put(id1, "1"); + Assert.assertEquals(1, map.size()); + Assert.assertEquals("1", map.get(id11)); + map.put(id11, "2"); + Assert.assertEquals(1, map.size()); + Assert.assertEquals("2", map.get(id11)); + Assert.assertEquals("2", map.get(new VolumeId("test00001"))); + + Assert.assertNotEquals(id1, id2); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/TestVolumeProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/TestVolumeProcessor.java new file mode 100644 index 00000000000..d6f9d920d11 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/TestVolumeProcessor.java @@ -0,0 +1,250 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.volume.csi; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceInformation; +import org.apache.hadoop.yarn.api.records.ResourceSizing; +import org.apache.hadoop.yarn.api.records.SchedulingRequest; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.MockAM; +import org.apache.hadoop.yarn.server.resourcemanager.MockNM; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.Volume; +import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.VolumeState; +import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.processor.VolumeAMSProcessor; +import org.apache.hadoop.yarn.server.volume.csi.CsiAdaptorClientProtocol; +import org.apache.hadoop.yarn.server.volume.csi.CsiConstants; +import org.apache.hadoop.yarn.server.volume.csi.VolumeId; +import org.apache.hadoop.yarn.server.volume.csi.exception.InvalidVolumeException; +import org.apache.hadoop.yarn.server.volume.csi.exception.VolumeException; +import org.apache.hadoop.yarn.server.volume.csi.exception.VolumeProvisioningException; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.util.Arrays; + +/** + * Test cases for volume processor. + */ +public class TestVolumeProcessor { + + private static final int GB = 1024; + private YarnConfiguration conf; + private RMNodeLabelsManager mgr; + private MockRM rm; + private MockNM[] mockNMS; + private RMNode[] rmNodes; + private static final int NUM_OF_NMS = 4; + // resource-types.xml will be created under target/test-classes/ dir, + // it must be deleted after the test is done, to avoid it from reading + // by other UT classes. + private File resourceTypesFile = null; + + private static final String VOLUME_RESOURCE_NAME = "yarn.io/csi-volume"; + + @Before + public void setUp() throws Exception { + conf = new YarnConfiguration(); + resourceTypesFile = new File(conf.getClassLoader() + .getResource(".").getPath(), "resource-types.xml"); + writeTmpResourceTypesFile(resourceTypesFile); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER, + YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER); + conf.set("yarn.scheduler.capacity.resource-calculator", + "org.apache.hadoop.yarn.util.resource.DominantResourceCalculator"); + conf.set(CapacitySchedulerConfiguration.PREFIX + + CapacitySchedulerConfiguration.ROOT + ".default.ordering-policy", + "fair"); + conf.set(YarnConfiguration.RM_APPLICATION_MASTER_SERVICE_PROCESSORS, + VolumeAMSProcessor.class.getName()); + mgr = new NullRMNodeLabelsManager(); + mgr.init(conf); + rm = new MockRM(conf) { + @Override + public RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + rm.getRMContext().setNodeLabelManager(mgr); + rm.start(); + mockNMS = new MockNM[NUM_OF_NMS]; + rmNodes = new RMNode[NUM_OF_NMS]; + for (int i = 0; i < 4; i++) { + mockNMS[i] = rm.registerNode("192.168.0." + i + ":1234", 10 * GB); + rmNodes[i] = rm.getRMContext().getRMNodes().get(mockNMS[i].getNodeId()); + } + } + + @After + public void tearDown() { + if (resourceTypesFile != null && resourceTypesFile.exists()) { + resourceTypesFile.delete(); + } + } + + private void writeTmpResourceTypesFile(File tmpFile) throws IOException { + FileWriter fw = new FileWriter(tmpFile); + try { + Configuration yarnConf = new YarnConfiguration(); + yarnConf.set(YarnConfiguration.RESOURCE_TYPES, VOLUME_RESOURCE_NAME); + yarnConf.set("yarn.resource-types." + + VOLUME_RESOURCE_NAME + ".units", "Mi"); + yarnConf.writeXml(fw); + } finally { + fw.close(); + } + } + + @Test (timeout = 10000L) + public void testVolumeProvisioning() throws Exception { + RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, mockNMS[0]); + Resource resource = Resource.newInstance(1024, 1); + ResourceInformation volumeResource = ResourceInformation + .newInstance(VOLUME_RESOURCE_NAME, "Mi", 1024, + ResourceTypes.COUNTABLE, 0, Long.MAX_VALUE, + ImmutableSet.of(CsiConstants.CSI_VOLUME_RESOURCE_TAG), + ImmutableMap.of( + CsiConstants.CSI_VOLUME_ID, "test-vol-000001", + CsiConstants.CSI_DRIVER_NAME, "hostpath", + CsiConstants.CSI_VOLUME_MOUNT, "/mnt/data" + ) + ); + resource.setResourceInformation(VOLUME_RESOURCE_NAME, volumeResource); + SchedulingRequest sc = SchedulingRequest + .newBuilder().allocationRequestId(0L) + .resourceSizing(ResourceSizing.newInstance(1, resource)) + .build(); + AllocateRequest ar = AllocateRequest.newBuilder() + .schedulingRequests(Arrays.asList(sc)) + .build(); + + am1.allocate(ar); + VolumeStates volumeStates = + rm.getRMContext().getVolumeManager().getVolumeStates(); + Assert.assertNotNull(volumeStates); + VolumeState volumeState = VolumeState.NEW; + while (volumeState != VolumeState.NODE_READY) { + Volume volume = volumeStates + .getVolume(new VolumeId("test-vol-000001")); + if (volume != null) { + volumeState = volume.getVolumeState(); + } + am1.doHeartbeat(); + mockNMS[0].nodeHeartbeat(true); + Thread.sleep(500); + } + rm.stop(); + } + + @Test (timeout = 30000L) + public void testInvalidRequest() throws Exception { + RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, mockNMS[0]); + Resource resource = Resource.newInstance(1024, 1); + ResourceInformation volumeResource = ResourceInformation + .newInstance(VOLUME_RESOURCE_NAME, "Mi", 1024, + ResourceTypes.COUNTABLE, 0, Long.MAX_VALUE, + ImmutableSet.of(CsiConstants.CSI_VOLUME_RESOURCE_TAG), + ImmutableMap.of( + // volume ID is missing... + CsiConstants.CSI_VOLUME_NAME, "test-vol-000001", + CsiConstants.CSI_DRIVER_NAME, "hostpath", + CsiConstants.CSI_VOLUME_MOUNT, "/mnt/data" + ) + ); + resource.setResourceInformation(VOLUME_RESOURCE_NAME, volumeResource); + SchedulingRequest sc = SchedulingRequest + .newBuilder().allocationRequestId(0L) + .resourceSizing(ResourceSizing.newInstance(1, resource)) + .build(); + AllocateRequest ar = AllocateRequest.newBuilder() + .schedulingRequests(Arrays.asList(sc)) + .build(); + + try { + am1.allocate(ar); + Assert.fail("allocate should fail because invalid request received"); + } catch (Exception e) { + Assert.assertTrue(e instanceof InvalidVolumeException); + } + rm.stop(); + } + + @Test (timeout = 30000L) + public void testProvisioningFailures() throws Exception { + RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, mockNMS[0]); + + CsiAdaptorClientProtocol mockedClient = Mockito + .mock(CsiAdaptorClientProtocol.class); + // inject adaptor client + rm.getRMContext().getVolumeManager().setClient(mockedClient); + Mockito.doThrow(new VolumeException("failed")).when(mockedClient) + .validateVolume(); + + Resource resource = Resource.newInstance(1024, 1); + ResourceInformation volumeResource = ResourceInformation + .newInstance(VOLUME_RESOURCE_NAME, "Mi", 1024, + ResourceTypes.COUNTABLE, 0, Long.MAX_VALUE, + ImmutableSet.of(CsiConstants.CSI_VOLUME_RESOURCE_TAG), + ImmutableMap.of( + CsiConstants.CSI_VOLUME_ID, "test-vol-000001", + CsiConstants.CSI_DRIVER_NAME, "hostpath", + CsiConstants.CSI_VOLUME_MOUNT, "/mnt/data" + ) + ); + resource.setResourceInformation(VOLUME_RESOURCE_NAME, volumeResource); + SchedulingRequest sc = SchedulingRequest + .newBuilder().allocationRequestId(0L) + .resourceSizing(ResourceSizing.newInstance(1, resource)) + .build(); + AllocateRequest ar = AllocateRequest.newBuilder() + .schedulingRequests(Arrays.asList(sc)) + .build(); + + try { + am1.allocate(ar); + Assert.fail("allocate should fail"); + } catch (Exception e) { + Assert.assertTrue(e instanceof VolumeProvisioningException); + } + rm.stop(); + } +} \ No newline at end of file