diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmDeleteVolumeResponse.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmDeleteVolumeResponse.java new file mode 100644 index 00000000000..6e966745601 --- /dev/null +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmDeleteVolumeResponse.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.om.helpers; + +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .VolumeList; + +/** + * OM response for delete volume request for a ozone volume. + */ +public class OmDeleteVolumeResponse { + private String volume; + private String owner; + private VolumeList updatedVolumeList; + + public OmDeleteVolumeResponse(String volume, String owner, + VolumeList updatedVolumeList) { + this.volume = volume; + this.owner = owner; + this.updatedVolumeList = updatedVolumeList; + } + + public String getVolume() { + return volume; + } + + public String getOwner() { + return owner; + } + + public VolumeList getUpdatedVolumeList() { + return updatedVolumeList; + } +} \ No newline at end of file diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmVolumeArgs.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmVolumeArgs.java index 08c17eccfc1..7b25d78704f 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmVolumeArgs.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmVolumeArgs.java @@ -36,10 +36,10 @@ import com.google.common.base.Preconditions; */ public final class OmVolumeArgs extends WithMetadata implements Auditable { private final String adminName; - private final String ownerName; + private String ownerName; private final String volume; - private final long creationTime; - private final long quotaInBytes; + private long creationTime; + private long quotaInBytes; private final OmOzoneAclMap aclMap; /** @@ -64,6 +64,19 @@ public final class OmVolumeArgs extends WithMetadata implements Auditable { this.creationTime = creationTime; } + + public void setOwnerName(String newOwner) { + this.ownerName = newOwner; + } + + public void setQuotaInBytes(long quotaInBytes) { + this.quotaInBytes = quotaInBytes; + } + + public void setCreationTime(long time) { + this.creationTime = time; + } + /** * Returns the Admin Name. * @return String. diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmVolumeOwnerChangeResponse.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmVolumeOwnerChangeResponse.java new file mode 100644 index 00000000000..b691c730f12 --- /dev/null +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmVolumeOwnerChangeResponse.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.om.helpers; + +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .VolumeList; + +/** + * OM response for owner change request for a ozone volume. + */ +public class OmVolumeOwnerChangeResponse { + private VolumeList originalOwnerVolumeList; + private VolumeList newOwnerVolumeList; + private OmVolumeArgs newOwnerVolumeArgs; + private String originalOwner; + + public OmVolumeOwnerChangeResponse(VolumeList originalOwnerVolumeList, + VolumeList newOwnerVolumeList, OmVolumeArgs newOwnerVolumeArgs, + String originalOwner) { + this.originalOwnerVolumeList = originalOwnerVolumeList; + this.newOwnerVolumeList = newOwnerVolumeList; + this.newOwnerVolumeArgs = newOwnerVolumeArgs; + this.originalOwner = originalOwner; + } + + public String getOriginalOwner() { + return originalOwner; + } + + public VolumeList getOriginalOwnerVolumeList() { + return originalOwnerVolumeList; + } + + public VolumeList getNewOwnerVolumeList() { + return newOwnerVolumeList; + } + + public OmVolumeArgs getNewOwnerVolumeArgs() { + return newOwnerVolumeArgs; + } +} diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerHAProtocol.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerHAProtocol.java index f5989979cf6..ad2bc316f5e 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerHAProtocol.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerHAProtocol.java @@ -18,15 +18,20 @@ package org.apache.hadoop.ozone.om.protocol; +import org.apache.hadoop.ozone.om.helpers.OmDeleteVolumeResponse; import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo; +import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs; +import org.apache.hadoop.ozone.om.helpers.OmVolumeOwnerChangeResponse; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos .KeyArgs; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos .KeyInfo; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos .KeyLocation; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .VolumeList; import java.io.IOException; @@ -88,4 +93,77 @@ public interface OzoneManagerHAProtocol { */ OmMultipartInfo applyInitiateMultipartUpload(OmKeyArgs omKeyArgs, String multipartUploadID) throws IOException; + + /** + * Start Create Volume Transaction. + * @param omVolumeArgs + * @return VolumeList + * @throws IOException + */ + VolumeList startCreateVolume(OmVolumeArgs omVolumeArgs) throws IOException; + + /** + * Apply Create Volume changes to OM DB. + * @param omVolumeArgs + * @param volumeList + * @throws IOException + */ + void applyCreateVolume(OmVolumeArgs omVolumeArgs, + VolumeList volumeList) throws IOException; + + /** + * Start setOwner Transaction. + * @param volume + * @param owner + * @return OmVolumeOwnerChangeResponse + * @throws IOException + */ + OmVolumeOwnerChangeResponse startSetOwner(String volume, + String owner) throws IOException; + + /** + * Apply Set Quota changes to OM DB. + * @param oldOwner + * @param oldOwnerVolumeList + * @param newOwnerVolumeList + * @param newOwnerVolumeArgs + * @throws IOException + */ + void applySetOwner(String oldOwner, VolumeList oldOwnerVolumeList, + VolumeList newOwnerVolumeList, OmVolumeArgs newOwnerVolumeArgs) + throws IOException; + + /** + * Start Set Quota Transaction. + * @param volume + * @param quota + * @return OmVolumeArgs + * @throws IOException + */ + OmVolumeArgs startSetQuota(String volume, long quota) throws IOException; + + /** + * Apply Set Quota Changes to OM DB. + * @param omVolumeArgs + * @throws IOException + */ + void applySetQuota(OmVolumeArgs omVolumeArgs) throws IOException; + + /** + * Start Delete Volume Transaction. + * @param volume + * @return OmDeleteVolumeResponse + * @throws IOException + */ + OmDeleteVolumeResponse startDeleteVolume(String volume) throws IOException; + + /** + * Apply Delete Volume changes to OM DB. + * @param volume + * @param owner + * @param newVolumeList + * @throws IOException + */ + void applyDeleteVolume(String volume, String owner, + VolumeList newVolumeList) throws IOException; } diff --git a/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto b/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto index ffd1eba8f81..4536e8754b2 100644 --- a/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto +++ b/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto @@ -273,6 +273,7 @@ message VolumeInfo { */ message CreateVolumeRequest { required VolumeInfo volumeInfo = 1; + optional VolumeList volumeList = 2; } message CreateVolumeResponse { @@ -290,6 +291,10 @@ message SetVolumePropertyRequest { required string volumeName = 1; optional string ownerName = 2; optional uint64 quotaInBytes = 3; + optional string originalOwner = 4; + optional VolumeList oldOwnerVolumeList = 5; + optional VolumeList newOwnerVolumeList = 6; + optional VolumeInfo volumeInfo = 7; } message SetVolumePropertyResponse { @@ -326,6 +331,8 @@ message InfoVolumeResponse { */ message DeleteVolumeRequest { required string volumeName = 1; + optional string owner = 2; + optional VolumeList volumeList = 3; } message DeleteVolumeResponse { diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmMetrics.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmMetrics.java index b72117ea1cc..5d739c2988a 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmMetrics.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmMetrics.java @@ -87,11 +87,11 @@ public class TestOmMetrics { ozoneManager, "volumeManager"); VolumeManager mockVm = Mockito.spy(volumeManager); - Mockito.doNothing().when(mockVm).createVolume(null); - Mockito.doNothing().when(mockVm).deleteVolume(null); + Mockito.doReturn(null).when(mockVm).createVolume(null); + Mockito.doReturn(null).when(mockVm).deleteVolume(null); Mockito.doReturn(null).when(mockVm).getVolumeInfo(null); Mockito.doReturn(true).when(mockVm).checkVolumeAccess(null, null); - Mockito.doNothing().when(mockVm).setOwner(null, null); + Mockito.doReturn(null).when(mockVm).setOwner(null, null); Mockito.doReturn(null).when(mockVm).listVolumes(null, null, null, 0); HddsWhiteboxTestUtils.setInternalState( diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java index 06009e20803..5f62af131c3 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java @@ -24,11 +24,13 @@ import org.apache.hadoop.hdfs.LogVerificationAppender; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.ozone.MiniOzoneHAClusterImpl; +import org.apache.hadoop.ozone.OzoneTestUtils; import org.apache.hadoop.ozone.client.ObjectStore; import org.apache.hadoop.ozone.client.OzoneBucket; import org.apache.hadoop.ozone.client.OzoneClient; import org.apache.hadoop.ozone.client.io.OzoneInputStream; import org.apache.hadoop.ozone.client.io.OzoneOutputStream; +import org.apache.hadoop.ozone.om.exceptions.OMException; import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider; import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo; import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo; @@ -123,6 +125,33 @@ public class TestOzoneManagerHA { } } + @Test + public void testAllVolumeOperations() throws Exception { + String userName = "user" + RandomStringUtils.randomNumeric(5); + String adminName = "admin" + RandomStringUtils.randomNumeric(5); + String volumeName = "volume" + RandomStringUtils.randomNumeric(5); + + VolumeArgs createVolumeArgs = VolumeArgs.newBuilder() + .setOwner(userName) + .setAdmin(adminName) + .build(); + + objectStore.createVolume(volumeName, createVolumeArgs); + OzoneVolume retVolumeinfo = objectStore.getVolume(volumeName); + + Assert.assertTrue(retVolumeinfo.getName().equals(volumeName)); + Assert.assertTrue(retVolumeinfo.getOwner().equals(userName)); + Assert.assertTrue(retVolumeinfo.getAdmin().equals(adminName)); + + objectStore.deleteVolume(volumeName); + + OzoneTestUtils.expectOmException(OMException.ResultCodes.VOLUME_NOT_FOUND, + () -> objectStore.getVolume(volumeName)); + + OzoneTestUtils.expectOmException(OMException.ResultCodes.VOLUME_NOT_FOUND, + () -> objectStore.deleteVolume(volumeName)); + } + /** * Test a client request when all OM nodes are running. The request should * succeed. diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java index 7a87b537f24..e7f1e87e0da 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java @@ -71,6 +71,8 @@ import org.apache.hadoop.ipc.Server; import org.apache.hadoop.ozone.OzoneIllegalArgumentException; import org.apache.hadoop.ozone.OzoneSecurityUtil; import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider; +import org.apache.hadoop.ozone.om.helpers.OmDeleteVolumeResponse; +import org.apache.hadoop.ozone.om.helpers.OmVolumeOwnerChangeResponse; import org.apache.hadoop.ozone.om.helpers.S3SecretValue; import org.apache.hadoop.ozone.om.protocol.OzoneManagerServerProtocol; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos @@ -79,6 +81,8 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos .KeyInfo; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos .KeyLocation; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .VolumeList; import org.apache.hadoop.ozone.security.OzoneSecurityException; import org.apache.hadoop.ozone.security.OzoneTokenIdentifier; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; @@ -1630,6 +1634,79 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl } } + @Override + public VolumeList startCreateVolume(OmVolumeArgs args) throws IOException { + // TODO: Need to add metrics and Audit log for HA requests + if(isAclEnabled) { + checkAcls(ResourceType.VOLUME, StoreType.OZONE, ACLType.CREATE, + args.getVolume(), null, null); + } + VolumeList volumeList = volumeManager.createVolume(args); + return volumeList; + } + + public void applyCreateVolume(OmVolumeArgs omVolumeArgs, + VolumeList volumeList) throws IOException { + // TODO: Need to add metrics and Audit log for HA requests + volumeManager.applyCreateVolume(omVolumeArgs, volumeList); + } + + @Override + public OmVolumeOwnerChangeResponse startSetOwner(String volume, + String owner) throws IOException { + // TODO: Need to add metrics and Audit log for HA requests + if (isAclEnabled) { + checkAcls(ResourceType.VOLUME, StoreType.OZONE, ACLType.WRITE_ACL, volume, + null, null); + } + return volumeManager.setOwner(volume, owner); + } + + @Override + public void applySetOwner(String oldOwner, VolumeList oldOwnerVolumeList, + VolumeList newOwnerVolumeList, OmVolumeArgs newOwnerVolumeArgs) + throws IOException { + // TODO: Need to add metrics and Audit log for HA requests + volumeManager.applySetOwner(oldOwner, oldOwnerVolumeList, + newOwnerVolumeList, newOwnerVolumeArgs); + } + + @Override + public OmVolumeArgs startSetQuota(String volume, long quota) + throws IOException { + // TODO: Need to add metrics and Audit log for HA requests + if (isAclEnabled) { + checkAcls(ResourceType.VOLUME, StoreType.OZONE, ACLType.WRITE_ACL, volume, + null, null); + } + return volumeManager.setQuota(volume, quota); + } + + @Override + public void applySetQuota(OmVolumeArgs omVolumeArgs) throws IOException { + // TODO: Need to add metrics and Audit log for HA requests + volumeManager.applySetQuota(omVolumeArgs); + } + + @Override + public OmDeleteVolumeResponse startDeleteVolume(String volume) + throws IOException { + if(isAclEnabled) { + checkAcls(ResourceType.VOLUME, StoreType.OZONE, ACLType.DELETE, volume, + null, null); + } + // TODO: Need to add metrics and Audit log for HA requests + return volumeManager.deleteVolume(volume); + } + + @Override + public void applyDeleteVolume(String volume, String owner, + VolumeList newVolumeList) throws IOException { + // TODO: Need to add metrics and Audit log for HA requests + volumeManager.applyDeleteVolume(volume, owner, newVolumeList); + } + + /** * Checks if current caller has acl permissions. * diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/S3BucketManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/S3BucketManagerImpl.java index d0d84f8dd27..440a45e8647 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/S3BucketManagerImpl.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/S3BucketManagerImpl.java @@ -50,6 +50,7 @@ public class S3BucketManagerImpl implements S3BucketManager { private final OMMetadataManager omMetadataManager; private final VolumeManager volumeManager; private final BucketManager bucketManager; + private final boolean isRatisEnabled; /** * Construct an S3 Bucket Manager Object. @@ -66,6 +67,9 @@ public class S3BucketManagerImpl implements S3BucketManager { this.omMetadataManager = omMetadataManager; this.volumeManager = volumeManager; this.bucketManager = bucketManager; + isRatisEnabled = configuration.getBoolean( + OMConfigKeys.OZONE_OM_RATIS_ENABLE_KEY, + OMConfigKeys.OZONE_OM_RATIS_ENABLE_DEFAULT); } @Override @@ -166,7 +170,12 @@ public class S3BucketManagerImpl implements S3BucketManager { .setVolume(ozoneVolumeName) .setQuotaInBytes(OzoneConsts.MAX_QUOTA_IN_BYTES) .build(); - volumeManager.createVolume(args); + if (isRatisEnabled) { + // When ratis is enabled we need to call apply also. + volumeManager.applyCreateVolume(args, volumeManager.createVolume(args)); + } else { + volumeManager.createVolume(args); + } } catch (OMException exp) { newVolumeCreate = false; if (exp.getResult().compareTo(VOLUME_ALREADY_EXISTS) == 0) { diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/VolumeManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/VolumeManager.java index f25fce4ca20..a4e20c72967 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/VolumeManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/VolumeManager.java @@ -16,9 +16,13 @@ */ package org.apache.hadoop.ozone.om; +import org.apache.hadoop.ozone.om.helpers.OmDeleteVolumeResponse; import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs; +import org.apache.hadoop.ozone.om.helpers.OmVolumeOwnerChangeResponse; import org.apache.hadoop.ozone.protocol.proto .OzoneManagerProtocolProtos.OzoneAclInfo; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .VolumeList; import java.io.IOException; import java.util.List; @@ -32,7 +36,17 @@ public interface VolumeManager { * Create a new volume. * @param args - Volume args to create a volume */ - void createVolume(OmVolumeArgs args) throws IOException; + VolumeList createVolume(OmVolumeArgs args) + throws IOException; + + /** + * Apply Create Volume changes to OM DB. + * @param omVolumeArgs + * @param volumeList + * @throws IOException + */ + void applyCreateVolume(OmVolumeArgs omVolumeArgs, + VolumeList volumeList) throws IOException; /** * Changes the owner of a volume. @@ -41,7 +55,20 @@ public interface VolumeManager { * @param owner - Name of the owner. * @throws IOException */ - void setOwner(String volume, String owner) throws IOException; + OmVolumeOwnerChangeResponse setOwner(String volume, String owner) + throws IOException; + + /** + * Apply Set Owner changes to OM DB. + * @param oldOwner + * @param oldOwnerVolumeList + * @param newOwnerVolumeList + * @param newOwnerVolumeArgs + * @throws IOException + */ + void applySetOwner(String oldOwner, VolumeList oldOwnerVolumeList, + VolumeList newOwnerVolumeList, OmVolumeArgs newOwnerVolumeArgs) + throws IOException; /** * Changes the Quota on a volume. @@ -50,7 +77,14 @@ public interface VolumeManager { * @param quota - Quota in bytes. * @throws IOException */ - void setQuota(String volume, long quota) throws IOException; + OmVolumeArgs setQuota(String volume, long quota) throws IOException; + + /** + * Apply Set Quota changes to OM DB. + * @param omVolumeArgs + * @throws IOException + */ + void applySetQuota(OmVolumeArgs omVolumeArgs) throws IOException; /** * Gets the volume information. @@ -66,7 +100,17 @@ public interface VolumeManager { * @param volume - Name of the volume. * @throws IOException */ - void deleteVolume(String volume) throws IOException; + OmDeleteVolumeResponse deleteVolume(String volume) throws IOException; + + /** + * Apply Delete Volume changes to OM DB. + * @param volume + * @param owner + * @param newVolumeList + * @throws IOException + */ + void applyDeleteVolume(String volume, String owner, + VolumeList newVolumeList) throws IOException; /** * Checks if the specified user with a role can access this volume. diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/VolumeManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/VolumeManagerImpl.java index c83f73359f5..872d7b66740 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/VolumeManagerImpl.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/VolumeManagerImpl.java @@ -22,17 +22,18 @@ import java.util.List; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.ozone.om.exceptions.OMException; +import org.apache.hadoop.ozone.om.helpers.OmDeleteVolumeResponse; import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs; +import org.apache.hadoop.ozone.om.helpers.OmVolumeOwnerChangeResponse; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneAclInfo; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.VolumeList; -import org.apache.hadoop.utils.RocksDBStore; import org.apache.hadoop.utils.db.BatchOperation; import com.google.common.base.Preconditions; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_USER_MAX_VOLUME; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_USER_MAX_VOLUME_DEFAULT; import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes; -import org.rocksdb.RocksDBException; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,6 +46,7 @@ public class VolumeManagerImpl implements VolumeManager { private final OMMetadataManager metadataManager; private final int maxUserVolumeCount; + private final boolean isRatisEnabled; /** * Constructor. @@ -52,15 +54,18 @@ public class VolumeManagerImpl implements VolumeManager { * @throws IOException */ public VolumeManagerImpl(OMMetadataManager metadataManager, - OzoneConfiguration conf) throws IOException { + OzoneConfiguration conf) { this.metadataManager = metadataManager; this.maxUserVolumeCount = conf.getInt(OZONE_OM_USER_MAX_VOLUME, OZONE_OM_USER_MAX_VOLUME_DEFAULT); + isRatisEnabled = conf.getBoolean( + OMConfigKeys.OZONE_OM_RATIS_ENABLE_KEY, + OMConfigKeys.OZONE_OM_RATIS_ENABLE_DEFAULT); } // Helpers to add and delete volume from user list - private void addVolumeToOwnerList(String volume, String owner, - BatchOperation batchOperation) throws IOException { + private VolumeList addVolumeToOwnerList(String volume, String owner) + throws IOException { // Get the volume list String dbUserKey = metadataManager.getUserKey(owner); VolumeList volumeList = metadataManager.getUserTable().get(dbUserKey); @@ -72,22 +77,22 @@ public class VolumeManagerImpl implements VolumeManager { // Check the volume count if (prevVolList.size() >= maxUserVolumeCount) { LOG.debug("Too many volumes for user:{}", owner); - throw new OMException(ResultCodes.USER_TOO_MANY_VOLUMES); + throw new OMException("Too many volumes for user:" + owner, + ResultCodes.USER_TOO_MANY_VOLUMES); } // Add the new volume to the list prevVolList.add(volume); VolumeList newVolList = VolumeList.newBuilder() .addAllVolumeNames(prevVolList).build(); - metadataManager.getUserTable().putWithBatch(batchOperation, - dbUserKey, newVolList); + + return newVolList; } - private void delVolumeFromOwnerList(String volume, String owner, - BatchOperation batch) throws RocksDBException, IOException { + private VolumeList delVolumeFromOwnerList(String volume, String owner) + throws IOException { // Get the volume list - String dbUserKey = metadataManager.getUserKey(owner); - VolumeList volumeList = metadataManager.getUserTable().get(dbUserKey); + VolumeList volumeList = metadataManager.getUserTable().get(owner); List prevVolList = new ArrayList<>(); if (volumeList != null) { prevVolList.addAll(volumeList.getVolumeNamesList()); @@ -98,58 +103,90 @@ public class VolumeManagerImpl implements VolumeManager { // Remove the volume from the list prevVolList.remove(volume); - if (prevVolList.size() == 0) { - metadataManager.getUserTable().deleteWithBatch(batch, dbUserKey); - } else { - VolumeList newVolList = VolumeList.newBuilder() - .addAllVolumeNames(prevVolList).build(); - metadataManager.getUserTable().putWithBatch(batch, - dbUserKey, newVolList); - } + VolumeList newVolList = VolumeList.newBuilder() + .addAllVolumeNames(prevVolList).build(); + return newVolList; } /** * Creates a volume. - * @param args - OmVolumeArgs. + * @param omVolumeArgs - OmVolumeArgs. + * @return VolumeList */ @Override - public void createVolume(OmVolumeArgs args) throws IOException { - Preconditions.checkNotNull(args); - metadataManager.getLock().acquireUserLock(args.getOwnerName()); - metadataManager.getLock().acquireVolumeLock(args.getVolume()); + public VolumeList createVolume(OmVolumeArgs omVolumeArgs) throws IOException { + Preconditions.checkNotNull(omVolumeArgs); + metadataManager.getLock().acquireUserLock(omVolumeArgs.getOwnerName()); + metadataManager.getLock().acquireVolumeLock(omVolumeArgs.getVolume()); try { - String dbVolumeKey = metadataManager.getVolumeKey(args.getVolume()); + String dbVolumeKey = metadataManager.getVolumeKey( + omVolumeArgs.getVolume()); + String dbUserKey = metadataManager.getUserKey( + omVolumeArgs.getOwnerName()); OmVolumeArgs volumeInfo = metadataManager.getVolumeTable().get(dbVolumeKey); // Check of the volume already exists if (volumeInfo != null) { - LOG.debug("volume:{} already exists", args.getVolume()); + LOG.debug("volume:{} already exists", omVolumeArgs.getVolume()); throw new OMException(ResultCodes.VOLUME_ALREADY_EXISTS); } - try (BatchOperation batch = metadataManager.getStore() - .initBatchOperation()) { - // Write the vol info - metadataManager.getVolumeTable().putWithBatch(batch, - dbVolumeKey, args); + VolumeList volumeList = addVolumeToOwnerList(omVolumeArgs.getVolume(), + omVolumeArgs.getOwnerName()); - // Add volume to user list - addVolumeToOwnerList(args.getVolume(), args.getOwnerName(), batch); - metadataManager.getStore().commitBatchOperation(batch); + // Set creation time + omVolumeArgs.setCreationTime(System.currentTimeMillis()); + + if (!isRatisEnabled) { + createVolumeCommitToDB(omVolumeArgs, volumeList, dbVolumeKey, + dbUserKey); } - LOG.debug("created volume:{} user:{}", args.getVolume(), - args.getOwnerName()); + LOG.debug("created volume:{} user:{}", omVolumeArgs.getVolume(), + omVolumeArgs.getOwnerName()); + return volumeList; } catch (IOException ex) { if (!(ex instanceof OMException)) { LOG.error("Volume creation failed for user:{} volume:{}", - args.getOwnerName(), args.getVolume(), ex); - } else { - throw (IOException) ex; + omVolumeArgs.getOwnerName(), omVolumeArgs.getVolume(), ex); } + throw ex; } finally { - metadataManager.getLock().releaseVolumeLock(args.getVolume()); - metadataManager.getLock().releaseUserLock(args.getOwnerName()); + metadataManager.getLock().releaseVolumeLock(omVolumeArgs.getVolume()); + metadataManager.getLock().releaseUserLock(omVolumeArgs.getOwnerName()); + } + } + + + @Override + public void applyCreateVolume(OmVolumeArgs omVolumeArgs, + VolumeList volumeList) throws IOException { + // Do we need to hold lock in apply Transactions requests? + String dbVolumeKey = metadataManager.getVolumeKey(omVolumeArgs.getVolume()); + String dbUserKey = metadataManager.getUserKey(omVolumeArgs.getOwnerName()); + try { + createVolumeCommitToDB(omVolumeArgs, volumeList, dbVolumeKey, dbUserKey); + } catch (IOException ex) { + LOG.error("Volume creation failed for user:{} volume:{}", + omVolumeArgs.getOwnerName(), omVolumeArgs.getVolume(), ex); + throw ex; + } + } + + private void createVolumeCommitToDB(OmVolumeArgs omVolumeArgs, + VolumeList volumeList, String dbVolumeKey, String dbUserKey) + throws IOException { + try (BatchOperation batch = metadataManager.getStore() + .initBatchOperation()) { + // Write the vol info + metadataManager.getVolumeTable().putWithBatch(batch, dbVolumeKey, + omVolumeArgs); + metadataManager.getUserTable().putWithBatch(batch, dbUserKey, + volumeList); + // Add volume to user list + metadataManager.getStore().commitBatchOperation(batch); + } catch (IOException ex) { + throw ex; } } @@ -161,7 +198,8 @@ public class VolumeManagerImpl implements VolumeManager { * @throws IOException */ @Override - public void setOwner(String volume, String owner) throws IOException { + public OmVolumeOwnerChangeResponse setOwner(String volume, String owner) + throws IOException { Preconditions.checkNotNull(volume); Preconditions.checkNotNull(owner); metadataManager.getLock().acquireUserLock(owner); @@ -179,49 +217,84 @@ public class VolumeManagerImpl implements VolumeManager { Preconditions.checkState(volume.equals(volumeArgs.getVolume())); - try (BatchOperation batch = metadataManager.getStore() - .initBatchOperation()) { - delVolumeFromOwnerList(volume, volumeArgs.getOwnerName(), batch); - addVolumeToOwnerList(volume, owner, batch); + String originalOwner = + metadataManager.getUserKey(volumeArgs.getOwnerName()); + VolumeList oldOwnerVolumeList = delVolumeFromOwnerList(volume, + originalOwner); - OmVolumeArgs newVolumeArgs = - OmVolumeArgs.newBuilder().setVolume(volumeArgs.getVolume()) - .setAdminName(volumeArgs.getAdminName()) - .setOwnerName(owner) - .setQuotaInBytes(volumeArgs.getQuotaInBytes()) - .setCreationTime(volumeArgs.getCreationTime()) - .build(); + String newOwner = metadataManager.getUserKey(owner); + VolumeList newOwnerVolumeList = addVolumeToOwnerList(volume, newOwner); - metadataManager.getVolumeTable().putWithBatch(batch, - dbVolumeKey, newVolumeArgs); - metadataManager.getStore().commitBatchOperation(batch); + volumeArgs.setOwnerName(owner); + if (!isRatisEnabled) { + setOwnerCommitToDB(oldOwnerVolumeList, newOwnerVolumeList, + volumeArgs, owner); } - } catch (RocksDBException | IOException ex) { + return new OmVolumeOwnerChangeResponse(oldOwnerVolumeList, + newOwnerVolumeList, volumeArgs, originalOwner); + } catch (IOException ex) { if (!(ex instanceof OMException)) { LOG.error("Changing volume ownership failed for user:{} volume:{}", owner, volume, ex); } - if(ex instanceof RocksDBException) { - throw RocksDBStore.toIOException("Volume creation failed.", - (RocksDBException) ex); - } else { - throw (IOException) ex; - } + throw ex; } finally { metadataManager.getLock().releaseVolumeLock(volume); metadataManager.getLock().releaseUserLock(owner); } } + @Override + public void applySetOwner(String oldOwner, VolumeList oldOwnerVolumeList, + VolumeList newOwnerVolumeList, OmVolumeArgs newOwnerVolumeArgs) + throws IOException { + try { + setOwnerCommitToDB(oldOwnerVolumeList, newOwnerVolumeList, + newOwnerVolumeArgs, oldOwner); + } catch (IOException ex) { + LOG.error("Changing volume ownership failed for user:{} volume:{}", + newOwnerVolumeArgs.getOwnerName(), newOwnerVolumeArgs.getVolume(), + ex); + throw ex; + } + } + + + private void setOwnerCommitToDB(VolumeList oldOwnerVolumeList, + VolumeList newOwnerVolumeList, OmVolumeArgs newOwnerVolumeArgs, + String oldOwner) throws IOException { + try (BatchOperation batch = metadataManager.getStore() + .initBatchOperation()) { + if (oldOwnerVolumeList.getVolumeNamesList().size() == 0) { + metadataManager.getUserTable().deleteWithBatch(batch, oldOwner); + } else { + metadataManager.getUserTable().putWithBatch(batch, oldOwner, + oldOwnerVolumeList); + } + metadataManager.getUserTable().putWithBatch(batch, + newOwnerVolumeArgs.getOwnerName(), + newOwnerVolumeList); + + String dbVolumeKey = + metadataManager.getVolumeKey(newOwnerVolumeArgs.getVolume()); + metadataManager.getVolumeTable().putWithBatch(batch, + dbVolumeKey, newOwnerVolumeArgs); + metadataManager.getStore().commitBatchOperation(batch); + } + } + + /** * Changes the Quota on a volume. * * @param volume - Name of the volume. * @param quota - Quota in bytes. + * + * @return OmVolumeArgs * @throws IOException */ @Override - public void setQuota(String volume, long quota) throws IOException { + public OmVolumeArgs setQuota(String volume, long quota) throws IOException { Preconditions.checkNotNull(volume); metadataManager.getLock().acquireVolumeLock(volume); try { @@ -235,15 +308,13 @@ public class VolumeManagerImpl implements VolumeManager { Preconditions.checkState(volume.equals(volumeArgs.getVolume())); - OmVolumeArgs newVolumeArgs = - OmVolumeArgs.newBuilder() - .setVolume(volumeArgs.getVolume()) - .setAdminName(volumeArgs.getAdminName()) - .setOwnerName(volumeArgs.getOwnerName()) - .setQuotaInBytes(quota) - .setCreationTime(volumeArgs.getCreationTime()).build(); - metadataManager.getVolumeTable().put(dbVolumeKey, newVolumeArgs); + volumeArgs.setQuotaInBytes(quota); + + if (!isRatisEnabled) { + metadataManager.getVolumeTable().put(dbVolumeKey, volumeArgs); + } + return volumeArgs; } catch (IOException ex) { if (!(ex instanceof OMException)) { LOG.error("Changing volume quota failed for volume:{} quota:{}", volume, @@ -255,6 +326,19 @@ public class VolumeManagerImpl implements VolumeManager { } } + @Override + public void applySetQuota(OmVolumeArgs omVolumeArgs) throws IOException { + try { + String dbVolumeKey = metadataManager.getVolumeKey( + omVolumeArgs.getVolume()); + metadataManager.getVolumeTable().put(dbVolumeKey, omVolumeArgs); + } catch (IOException ex) { + LOG.error("Changing volume quota failed for volume:{} quota:{}", + omVolumeArgs.getVolume(), omVolumeArgs.getQuotaInBytes(), ex); + throw ex; + } + } + /** * Gets the volume information. * @param volume - Volume name. @@ -290,10 +374,12 @@ public class VolumeManagerImpl implements VolumeManager { * Deletes an existing empty volume. * * @param volume - Name of the volume. + * + * @return OmDeleteVolumeResponse * @throws IOException */ @Override - public void deleteVolume(String volume) throws IOException { + public OmDeleteVolumeResponse deleteVolume(String volume) throws IOException { Preconditions.checkNotNull(volume); String owner; metadataManager.getLock().acquireVolumeLock(volume); @@ -305,7 +391,6 @@ public class VolumeManagerImpl implements VolumeManager { metadataManager.getLock().acquireUserLock(owner); metadataManager.getLock().acquireVolumeLock(volume); try { - String dbVolumeKey = metadataManager.getVolumeKey(volume); OmVolumeArgs volumeArgs = metadataManager.getVolumeTable().get(dbVolumeKey); @@ -322,28 +407,54 @@ public class VolumeManagerImpl implements VolumeManager { Preconditions.checkState(volume.equals(volumeArgs.getVolume())); // delete the volume from the owner list // as well as delete the volume entry - try (BatchOperation batch = metadataManager.getStore() - .initBatchOperation()) { - delVolumeFromOwnerList(volume, volumeArgs.getOwnerName(), batch); - metadataManager.getVolumeTable().deleteWithBatch(batch, dbVolumeKey); - metadataManager.getStore().commitBatchOperation(batch); + VolumeList newVolumeList = delVolumeFromOwnerList(volume, + volumeArgs.getOwnerName()); + + if (!isRatisEnabled) { + deleteVolumeCommitToDB(newVolumeList, + volume, owner); } - } catch (RocksDBException| IOException ex) { + return new OmDeleteVolumeResponse(volume, owner, newVolumeList); + } catch (IOException ex) { if (!(ex instanceof OMException)) { LOG.error("Delete volume failed for volume:{}", volume, ex); } - if(ex instanceof RocksDBException) { - throw RocksDBStore.toIOException("Volume creation failed.", - (RocksDBException) ex); - } else { - throw (IOException) ex; - } + throw ex; } finally { metadataManager.getLock().releaseVolumeLock(volume); metadataManager.getLock().releaseUserLock(owner); } } + @Override + public void applyDeleteVolume(String volume, String owner, + VolumeList newVolumeList) throws IOException { + try { + deleteVolumeCommitToDB(newVolumeList, volume, owner); + } catch (IOException ex) { + LOG.error("Delete volume failed for volume:{}", volume, + ex); + throw ex; + } + } + + private void deleteVolumeCommitToDB(VolumeList newVolumeList, + String volume, String owner) throws IOException { + try (BatchOperation batch = metadataManager.getStore() + .initBatchOperation()) { + String dbUserKey = metadataManager.getUserKey(owner); + if (newVolumeList.getVolumeNamesList().size() == 0) { + metadataManager.getUserTable().deleteWithBatch(batch, dbUserKey); + } else { + metadataManager.getUserTable().putWithBatch(batch, dbUserKey, + newVolumeList); + } + metadataManager.getVolumeTable().deleteWithBatch(batch, + metadataManager.getVolumeKey(volume)); + metadataManager.getStore().commitBatchOperation(batch); + } + } + /** * Checks if the specified user with a role can access this volume. * diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java index 2f3445af306..919709c562f 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java @@ -27,6 +27,7 @@ import java.util.concurrent.CompletableFuture; import org.apache.hadoop.ozone.container.common.transport.server.ratis .ContainerStateMachine; import org.apache.hadoop.ozone.om.OzoneManager; +import org.apache.hadoop.ozone.om.exceptions.OMException; import org.apache.hadoop.ozone.om.helpers.OMRatisHelper; import org.apache.hadoop.ozone.om.protocol.OzoneManagerServerProtocol; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; @@ -36,8 +37,8 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos .OMRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos .OMResponse; -import org.apache.hadoop.ozone.protocolPB.OzoneManagerRequestHandler; -import org.apache.hadoop.ozone.protocolPB.RequestHandler; +import org.apache.hadoop.ozone.protocolPB.OzoneManagerHARequestHandler; +import org.apache.hadoop.ozone.protocolPB.OzoneManagerHARequestHandlerImpl; import org.apache.hadoop.util.Time; import org.apache.ratis.proto.RaftProtos; import org.apache.ratis.protocol.Message; @@ -67,14 +68,14 @@ public class OzoneManagerStateMachine extends BaseStateMachine { new SimpleStateMachineStorage(); private final OzoneManagerRatisServer omRatisServer; private final OzoneManagerServerProtocol ozoneManager; - private RequestHandler handler; + private OzoneManagerHARequestHandler handler; private RaftGroupId raftGroupId; private long lastAppliedIndex = 0; public OzoneManagerStateMachine(OzoneManagerRatisServer ratisServer) { this.omRatisServer = ratisServer; this.ozoneManager = omRatisServer.getOzoneManager(); - this.handler = new OzoneManagerRequestHandler(ozoneManager); + this.handler = new OzoneManagerHARequestHandlerImpl(ozoneManager); } /** @@ -185,21 +186,53 @@ public class OzoneManagerStateMachine extends BaseStateMachine { private TransactionContext handleStartTransactionRequests( RaftClientRequest raftClientRequest, OMRequest omRequest) { - switch (omRequest.getCmdType()) { - case AllocateBlock: - return handleAllocateBlock(raftClientRequest, omRequest); - case CreateKey: - return handleCreateKeyRequest(raftClientRequest, omRequest); - case InitiateMultiPartUpload: - return handleInitiateMultipartUpload(raftClientRequest, omRequest); - default: - return TransactionContext.newBuilder() + OMRequest newOmRequest = null; + try { + switch (omRequest.getCmdType()) { + case CreateVolume: + case SetVolumeProperty: + case DeleteVolume: + newOmRequest = handler.handleStartTransaction(omRequest); + break; + case AllocateBlock: + return handleAllocateBlock(raftClientRequest, omRequest); + case CreateKey: + return handleCreateKeyRequest(raftClientRequest, omRequest); + case InitiateMultiPartUpload: + return handleInitiateMultipartUpload(raftClientRequest, omRequest); + default: + return TransactionContext.newBuilder() + .setClientRequest(raftClientRequest) + .setStateMachine(this) + .setServerRole(RaftProtos.RaftPeerRole.LEADER) + .setLogData(raftClientRequest.getMessage().getContent()) + .build(); + } + } catch (IOException ex) { + TransactionContext transactionContext = TransactionContext.newBuilder() .setClientRequest(raftClientRequest) .setStateMachine(this) .setServerRole(RaftProtos.RaftPeerRole.LEADER) - .setLogData(raftClientRequest.getMessage().getContent()) .build(); + if (ex instanceof OMException) { + IOException ioException = + new IOException(ex.getMessage() + STATUS_CODE + + ((OMException) ex).getResult()); + transactionContext.setException(ioException); + } else { + transactionContext.setException(ex); + } + LOG.error("Exception in startTransaction for cmdType " + + omRequest.getCmdType(), ex); + return transactionContext; } + TransactionContext transactionContext = TransactionContext.newBuilder() + .setClientRequest(raftClientRequest) + .setStateMachine(this) + .setServerRole(RaftProtos.RaftPeerRole.LEADER) + .setLogData(OMRatisHelper.convertRequestToByteString(newOmRequest)) + .build(); + return transactionContext; } private TransactionContext handleInitiateMultipartUpload( @@ -367,7 +400,7 @@ public class OzoneManagerStateMachine extends BaseStateMachine { * @throws ServiceException */ private Message runCommand(OMRequest request, long trxLogIndex) { - OMResponse response = handler.handle(request); + OMResponse response = handler.handleApplyTransaction(request); lastAppliedIndex = trxLogIndex; return OMRatisHelper.convertResponseToMessage(response); } @@ -394,7 +427,7 @@ public class OzoneManagerStateMachine extends BaseStateMachine { } @VisibleForTesting - public void setHandler(RequestHandler handler) { + public void setHandler(OzoneManagerHARequestHandler handler) { this.handler = handler; } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerHARequestHandler.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerHARequestHandler.java new file mode 100644 index 00000000000..1ccac3bedc3 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerHARequestHandler.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.protocolPB; + +import java.io.IOException; + +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .OMRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .OMResponse; + +/** + * Handler to handle OM requests in OM HA. + */ +public interface OzoneManagerHARequestHandler extends RequestHandler { + + /** + * Handle start Transaction Requests from OzoneManager StateMachine. + * @param omRequest + * @return OMRequest - New OM Request which will be applied during apply + * Transaction + * @throws IOException + */ + OMRequest handleStartTransaction(OMRequest omRequest) throws IOException; + + /** + * Handle Apply Transaction Requests from OzoneManager StateMachine. + * @param omRequest + * @return OMResponse + */ + OMResponse handleApplyTransaction(OMRequest omRequest); + +} diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerHARequestHandlerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerHARequestHandlerImpl.java new file mode 100644 index 00000000000..aada6e1dd6e --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerHARequestHandlerImpl.java @@ -0,0 +1,247 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.protocolPB; + +import java.io.IOException; + +import org.apache.hadoop.ozone.om.helpers.OmDeleteVolumeResponse; +import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs; +import org.apache.hadoop.ozone.om.helpers.OmVolumeOwnerChangeResponse; +import org.apache.hadoop.ozone.om.protocol.OzoneManagerServerProtocol; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .CreateVolumeRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .CreateVolumeResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .DeleteVolumeRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .DeleteVolumeResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .OMRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .OMResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .SetVolumePropertyRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .SetVolumePropertyResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .Status; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .Type; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .VolumeInfo; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .VolumeList; + +/** + * Command Handler for OM requests. OM State Machine calls this handler for + * deserializing the client request and sending it to OM. + */ +public class OzoneManagerHARequestHandlerImpl + extends OzoneManagerRequestHandler implements OzoneManagerHARequestHandler { + + public OzoneManagerHARequestHandlerImpl(OzoneManagerServerProtocol om) { + super(om); + } + + @Override + public OMRequest handleStartTransaction(OMRequest omRequest) + throws IOException { + LOG.debug("Received OMRequest: {}, ", omRequest); + Type cmdType = omRequest.getCmdType(); + OMRequest newOmRequest = null; + switch (cmdType) { + case CreateVolume: + newOmRequest = handleCreateVolumeStart(omRequest); + break; + case SetVolumeProperty: + newOmRequest = handleSetVolumePropertyStart(omRequest); + break; + case DeleteVolume: + newOmRequest = handleDeleteVolumeStart(omRequest); + break; + default: + throw new IOException("Unrecognized Command Type:" + cmdType); + } + return newOmRequest; + } + + + @Override + public OMResponse handleApplyTransaction(OMRequest omRequest) { + LOG.debug("Received OMRequest: {}, ", omRequest); + Type cmdType = omRequest.getCmdType(); + OMResponse.Builder responseBuilder = + OMResponse.newBuilder().setCmdType(cmdType) + .setStatus(Status.OK); + try { + switch (cmdType) { + case CreateVolume: + responseBuilder.setCreateVolumeResponse( + handleCreateVolumeApply(omRequest)); + break; + case SetVolumeProperty: + responseBuilder.setSetVolumePropertyResponse( + handleSetVolumePropertyApply(omRequest)); + break; + case DeleteVolume: + responseBuilder.setDeleteVolumeResponse( + handleDeleteVolumeApply(omRequest)); + break; + default: + // As all request types are not changed so we need to call handle + // here. + return handle(omRequest); + } + responseBuilder.setSuccess(true); + } catch (IOException ex) { + responseBuilder.setSuccess(false); + responseBuilder.setStatus(exceptionToResponseStatus(ex)); + if (ex.getMessage() != null) { + responseBuilder.setMessage(ex.getMessage()); + } + } + return responseBuilder.build(); + } + + + private OMRequest handleCreateVolumeStart(OMRequest omRequest) + throws IOException { + VolumeInfo volumeInfo = omRequest.getCreateVolumeRequest().getVolumeInfo(); + OzoneManagerProtocolProtos.VolumeList volumeList = + getOzoneManagerServerProtocol().startCreateVolume( + OmVolumeArgs.getFromProtobuf(volumeInfo)); + + CreateVolumeRequest createVolumeRequest = + CreateVolumeRequest.newBuilder().setVolumeInfo(volumeInfo) + .setVolumeList(volumeList).build(); + return omRequest.toBuilder().setCreateVolumeRequest(createVolumeRequest) + .build(); + } + + private CreateVolumeResponse handleCreateVolumeApply(OMRequest omRequest) + throws IOException { + OzoneManagerProtocolProtos.VolumeInfo volumeInfo = + omRequest.getCreateVolumeRequest().getVolumeInfo(); + VolumeList volumeList = + omRequest.getCreateVolumeRequest().getVolumeList(); + getOzoneManagerServerProtocol().applyCreateVolume( + OmVolumeArgs.getFromProtobuf(volumeInfo), + volumeList); + return CreateVolumeResponse.newBuilder().build(); + } + + private OMRequest handleSetVolumePropertyStart(OMRequest omRequest) + throws IOException { + SetVolumePropertyRequest setVolumePropertyRequest = + omRequest.getSetVolumePropertyRequest(); + String volume = setVolumePropertyRequest.getVolumeName(); + OMRequest newOmRequest = null; + if (setVolumePropertyRequest.hasQuotaInBytes()) { + long quota = setVolumePropertyRequest.getQuotaInBytes(); + OmVolumeArgs omVolumeArgs = + getOzoneManagerServerProtocol().startSetQuota(volume, quota); + SetVolumePropertyRequest newSetVolumePropertyRequest = + SetVolumePropertyRequest.newBuilder().setVolumeName(volume) + .setVolumeInfo(omVolumeArgs.getProtobuf()).build(); + newOmRequest = + omRequest.toBuilder().setSetVolumePropertyRequest( + newSetVolumePropertyRequest).build(); + } else { + String owner = setVolumePropertyRequest.getOwnerName(); + OmVolumeOwnerChangeResponse omVolumeOwnerChangeResponse = + getOzoneManagerServerProtocol().startSetOwner(volume, owner); + // If volumeLists become large and as ratis writes the request to disk we + // might take more space if the lists become very big in size. We might + // need to revisit this if it becomes problem + SetVolumePropertyRequest newSetVolumePropertyRequest = + SetVolumePropertyRequest.newBuilder().setVolumeName(volume) + .setOwnerName(owner) + .setOriginalOwner(omVolumeOwnerChangeResponse.getOriginalOwner()) + .setNewOwnerVolumeList( + omVolumeOwnerChangeResponse.getNewOwnerVolumeList()) + .setOldOwnerVolumeList( + omVolumeOwnerChangeResponse.getOriginalOwnerVolumeList()) + .setVolumeInfo( + omVolumeOwnerChangeResponse.getNewOwnerVolumeArgs() + .getProtobuf()).build(); + newOmRequest = + omRequest.toBuilder().setSetVolumePropertyRequest( + newSetVolumePropertyRequest).build(); + } + return newOmRequest; + } + + + private SetVolumePropertyResponse handleSetVolumePropertyApply( + OMRequest omRequest) throws IOException { + SetVolumePropertyRequest setVolumePropertyRequest = + omRequest.getSetVolumePropertyRequest(); + + if (setVolumePropertyRequest.hasQuotaInBytes()) { + getOzoneManagerServerProtocol().applySetQuota( + OmVolumeArgs.getFromProtobuf( + setVolumePropertyRequest.getVolumeInfo())); + } else { + getOzoneManagerServerProtocol().applySetOwner( + setVolumePropertyRequest.getOriginalOwner(), + setVolumePropertyRequest.getOldOwnerVolumeList(), + setVolumePropertyRequest.getNewOwnerVolumeList(), + OmVolumeArgs.getFromProtobuf( + setVolumePropertyRequest.getVolumeInfo())); + } + return SetVolumePropertyResponse.newBuilder().build(); + } + + private OMRequest handleDeleteVolumeStart(OMRequest omRequest) + throws IOException { + DeleteVolumeRequest deleteVolumeRequest = + omRequest.getDeleteVolumeRequest(); + + String volume = deleteVolumeRequest.getVolumeName(); + + OmDeleteVolumeResponse omDeleteVolumeResponse = + getOzoneManagerServerProtocol().startDeleteVolume(volume); + + DeleteVolumeRequest newDeleteVolumeRequest = + DeleteVolumeRequest.newBuilder().setVolumeList( + omDeleteVolumeResponse.getUpdatedVolumeList()) + .setVolumeName(omDeleteVolumeResponse.getVolume()) + .setOwner(omDeleteVolumeResponse.getOwner()).build(); + + return omRequest.toBuilder().setDeleteVolumeRequest( + newDeleteVolumeRequest).build(); + + } + + + private DeleteVolumeResponse handleDeleteVolumeApply(OMRequest omRequest) + throws IOException { + + DeleteVolumeRequest deleteVolumeRequest = + omRequest.getDeleteVolumeRequest(); + + getOzoneManagerServerProtocol().applyDeleteVolume( + deleteVolumeRequest.getVolumeName(), deleteVolumeRequest.getOwner(), + deleteVolumeRequest.getVolumeList()); + + return DeleteVolumeResponse.newBuilder().build(); + } + +} diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java index 04b4881245b..1200d17d463 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java @@ -364,7 +364,7 @@ public class OzoneManagerRequestHandler implements RequestHandler { } // Convert and exception to corresponding status code - private Status exceptionToResponseStatus(IOException ex) { + protected Status exceptionToResponseStatus(IOException ex) { if (ex instanceof OMException) { return Status.values()[((OMException) ex).getResult().ordinal()]; } else { @@ -1027,4 +1027,8 @@ public class OzoneManagerRequestHandler implements RequestHandler { .setKeyInfo(impl.lookupFile(omKeyArgs).getProtobuf()) .build(); } + + protected OzoneManagerServerProtocol getOzoneManagerServerProtocol() { + return impl; + } } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/RequestHandler.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/RequestHandler.java index 367efd4ffda..f19dc48023b 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/RequestHandler.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/RequestHandler.java @@ -28,7 +28,6 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos. */ public interface RequestHandler { - /** * Handle the OmRequest, and returns OmResponse. * @param request @@ -36,6 +35,7 @@ public interface RequestHandler { */ OMResponse handle(OMRequest request); + /** * Validates that the incoming OM request has required parameters. * TODO: Add more validation checks before writing the request to Ratis log. diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerStateMachine.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerStateMachine.java index 9613582fb7e..b14eac74a77 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerStateMachine.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerStateMachine.java @@ -39,8 +39,8 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos .OMRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos .OMResponse; -import org.apache.hadoop.ozone.protocolPB.OzoneManagerRequestHandler; -import org.apache.hadoop.ozone.protocolPB.RequestHandler; +import org.apache.hadoop.ozone.protocolPB.OzoneManagerHARequestHandler; +import org.apache.hadoop.ozone.protocolPB.OzoneManagerHARequestHandlerImpl; import org.apache.ratis.proto.RaftProtos; import org.apache.ratis.protocol.ClientId; import org.apache.ratis.protocol.Message; @@ -72,7 +72,7 @@ public class TestOzoneManagerStateMachine { private OzoneConfiguration conf; private OzoneManagerRatisServer omRatisServer; private String omID; - private RequestHandler requestHandler; + private OzoneManagerHARequestHandler requestHandler; private RaftGroupId raftGroupId; private OzoneManagerStateMachine ozoneManagerStateMachine; @@ -105,7 +105,7 @@ public class TestOzoneManagerStateMachine { ozoneManagerStateMachine = new OzoneManagerStateMachine(omRatisServer); - requestHandler = Mockito.mock(OzoneManagerRequestHandler.class); + requestHandler = Mockito.mock(OzoneManagerHARequestHandlerImpl.class); raftGroupId = omRatisServer.getRaftGroup().getGroupId(); ozoneManagerStateMachine.setHandler(requestHandler);