diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java index 74ec65d0fb7..ebcc263a52a 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java @@ -227,6 +227,7 @@ public final class OmUtils { case RemoveAcl: case SetAcl: case AddAcl: + case PurgeKeys: return false; default: LOG.error("CmdType {} is not categorized as readOnly or not.", cmdType); @@ -470,4 +471,15 @@ public final class OmUtils { } return dirFile; } + + /** + * Returns the DB key name of a deleted key in OM metadata store. The + * deleted key name is the _. + * @param key Original key name + * @param timestamp timestamp of deletion + * @return Deleted key name + */ + public static String getDeletedKeyName(String key, long timestamp) { + return key + "_" + timestamp; + } } diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/audit/OMAction.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/audit/OMAction.java index 11abb21eace..908a95bfaea 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/audit/OMAction.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/audit/OMAction.java @@ -37,6 +37,8 @@ public enum OMAction implements AuditAction { UPDATE_VOLUME, UPDATE_BUCKET, UPDATE_KEY, + PURGE_KEYS, + // READ Actions CHECK_VOLUME_ACCESS, LIST_BUCKETS, diff --git a/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto b/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto index 2765102efa6..cc20e861c0b 100644 --- a/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto +++ b/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto @@ -90,6 +90,7 @@ enum Type { SetAcl = 77; GetAcl = 78; + PurgeKeys = 81; } message OMRequest { @@ -125,7 +126,6 @@ message OMRequest { optional CommitKeyRequest commitKeyRequest = 36; optional AllocateBlockRequest allocateBlockRequest = 37; - optional S3CreateBucketRequest createS3BucketRequest = 41; optional S3DeleteBucketRequest deleteS3BucketRequest = 42; optional S3BucketInfoRequest infoS3BucketRequest = 43; @@ -153,6 +153,8 @@ message OMRequest { optional RemoveAclRequest removeAclRequest = 76; optional SetAclRequest setAclRequest = 77; optional GetAclRequest getAclRequest = 78; + + optional PurgeKeysRequest purgeKeysRequest = 81; } message OMResponse { @@ -217,6 +219,8 @@ message OMResponse { optional RemoveAclResponse removeAclResponse = 76; optional SetAclResponse setAclResponse = 77; optional GetAclResponse getAclResponse = 78; + + optional PurgeKeysResponse purgeKeysResponse = 81; } enum Status { @@ -762,6 +766,14 @@ message DeleteKeyResponse { optional uint64 openVersion = 4; } +message PurgeKeysRequest { + repeated string keys = 1; +} + +message PurgeKeysResponse { + +} + message OMTokenProto { enum Type { DELEGATION_TOKEN = 1; diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyPurging.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyPurging.java new file mode 100644 index 00000000000..732fb3445ad --- /dev/null +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyPurging.java @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om; + +import org.apache.hadoop.hdds.client.ReplicationFactor; +import org.apache.hadoop.hdds.client.ReplicationType; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.client.ObjectStore; +import org.apache.hadoop.ozone.client.OzoneBucket; +import org.apache.hadoop.ozone.client.OzoneClientFactory; +import org.apache.hadoop.ozone.client.OzoneVolume; +import org.apache.hadoop.ozone.client.io.OzoneOutputStream; +import org.apache.hadoop.ozone.container.ContainerTestHelper; +import org.apache.hadoop.test.GenericTestUtils; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL; +import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL; + +/** + * Test OM's {@link KeyDeletingService}. + */ +public class TestKeyPurging { + + private static MiniOzoneCluster cluster; + private static ObjectStore store; + private static OzoneManager om; + + private static final int NUM_KEYS = 10; + private static final int KEY_SIZE = 100; + + @Before + public void setup() throws Exception { + OzoneConfiguration conf = new OzoneConfiguration(); + conf.setTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL, 100, + TimeUnit.MILLISECONDS); + conf.setTimeDuration(HDDS_CONTAINER_REPORT_INTERVAL, 200, + TimeUnit.MILLISECONDS); + conf.setBoolean(OMConfigKeys.OZONE_OM_RATIS_ENABLE_KEY, true); + conf.setQuietMode(false); + + cluster = MiniOzoneCluster.newBuilder(conf) + .setNumDatanodes(1) + .setHbInterval(200) + .build(); + cluster.waitForClusterToBeReady(); + store = OzoneClientFactory.getRpcClient(conf).getObjectStore(); + om = cluster.getOzoneManager(); + } + + @After + public void shutdown() { + if (cluster != null) { + cluster.shutdown(); + } + } + + @Test(timeout = 30000) + public void testKeysPurgingByKeyDeletingService() throws Exception { + // Create Volume and Bucket + String volumeName = UUID.randomUUID().toString(); + String bucketName = UUID.randomUUID().toString(); + store.createVolume(volumeName); + OzoneVolume volume = store.getVolume(volumeName); + volume.createBucket(bucketName); + OzoneBucket bucket = volume.getBucket(bucketName); + + // Create some keys and write data into them + String keyBase = UUID.randomUUID().toString(); + String keyString = UUID.randomUUID().toString(); + byte[] data = ContainerTestHelper.getFixedLengthString( + keyString, KEY_SIZE).getBytes(UTF_8); + List keys = new ArrayList<>(NUM_KEYS); + for (int i = 1; i <= NUM_KEYS; i++) { + String keyName = keyBase + "-" + i; + keys.add(keyName); + OzoneOutputStream keyStream = ContainerTestHelper.createKey( + keyName, ReplicationType.STAND_ALONE, ReplicationFactor.ONE, + KEY_SIZE, store, volumeName, bucketName); + keyStream.write(data); + keyStream.close(); + } + + // Delete created keys + for (String key : keys) { + bucket.deleteKey(key); + } + + // Verify that KeyDeletingService picks up deleted keys and purges them + // from DB. + KeyManager keyManager = om.getKeyManager(); + KeyDeletingService keyDeletingService = + (KeyDeletingService) keyManager.getDeletingService(); + + GenericTestUtils.waitFor( + () -> keyDeletingService.getDeletedKeyCount().get() >= NUM_KEYS, + 1000, 10000); + + Assert.assertTrue(keyDeletingService.getRunCount().get() > 1); + + GenericTestUtils.waitFor( + () -> { + try { + return keyManager.getPendingDeletionKeys(Integer.MAX_VALUE) + .size() == 0; + } catch (IOException e) { + return false; + } + }, 1000, 10000); + } +} diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyDeletingService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyDeletingService.java index 9c1dc2db2e7..8be0735330e 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyDeletingService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyDeletingService.java @@ -17,27 +17,34 @@ package org.apache.hadoop.ozone.om; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import com.google.protobuf.ServiceException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol; import org.apache.hadoop.ozone.common.BlockGroup; import org.apache.hadoop.ozone.common.DeleteBlockGroupResult; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PurgeKeysRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type; import org.apache.hadoop.util.Time; import org.apache.hadoop.utils.BackgroundService; import org.apache.hadoop.utils.BackgroundTask; import org.apache.hadoop.utils.BackgroundTaskQueue; import org.apache.hadoop.utils.BackgroundTaskResult; import org.apache.hadoop.utils.BackgroundTaskResult.EmptyTaskResult; -import org.apache.hadoop.utils.db.BatchOperation; -import org.apache.hadoop.utils.db.DBStore; -import org.apache.hadoop.utils.db.Table; import com.google.common.annotations.VisibleForTesting; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_KEY_DELETING_LIMIT_PER_TASK; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_KEY_DELETING_LIMIT_PER_TASK_DEFAULT; + +import org.apache.hadoop.utils.db.BatchOperation; +import org.apache.hadoop.utils.db.DBStore; +import org.apache.hadoop.utils.db.Table; +import org.apache.ratis.protocol.ClientId; import org.rocksdb.RocksDBException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,17 +62,21 @@ public class KeyDeletingService extends BackgroundService { // The thread pool size for key deleting service. private final static int KEY_DELETING_CORE_POOL_SIZE = 2; + private final OzoneManager ozoneManager; private final ScmBlockLocationProtocol scmClient; private final KeyManager manager; + private ClientId clientId = ClientId.randomId(); private final int keyLimitPerTask; private final AtomicLong deletedKeyCount; private final AtomicLong runCount; - public KeyDeletingService(ScmBlockLocationProtocol scmClient, + KeyDeletingService(OzoneManager ozoneManager, + ScmBlockLocationProtocol scmClient, KeyManager manager, long serviceInterval, long serviceTimeout, Configuration conf) { super("KeyDeletingService", serviceInterval, TimeUnit.MILLISECONDS, KEY_DELETING_CORE_POOL_SIZE, serviceTimeout); + this.ozoneManager = ozoneManager; this.scmClient = scmClient; this.manager = manager; this.keyLimitPerTask = conf.getInt(OZONE_KEY_DELETING_LIMIT_PER_TASK, @@ -101,6 +112,21 @@ public class KeyDeletingService extends BackgroundService { return queue; } + private boolean shouldRun() { + if (ozoneManager == null) { + // OzoneManager can be null for testing + return true; + } + return ozoneManager.isLeader(); + } + + private boolean isRatisEnabled() { + if (ozoneManager == null) { + return false; + } + return ozoneManager.isRatisEnabled(); + } + /** * A key deleting task scans OM DB and looking for a certain number of * pending-deletion keys, sends these keys along with their associated blocks @@ -118,26 +144,38 @@ public class KeyDeletingService extends BackgroundService { @Override public BackgroundTaskResult call() throws Exception { - runCount.incrementAndGet(); - try { - long startTime = Time.monotonicNow(); - List keyBlocksList = manager - .getPendingDeletionKeys(keyLimitPerTask); - if (keyBlocksList != null && keyBlocksList.size() > 0) { - List results = - scmClient.deleteKeyBlocks(keyBlocksList); - if (results != null) { - int delCount = deleteAllKeys(results); - LOG.debug("Number of keys deleted: {}, elapsed time: {}ms", - delCount, Time.monotonicNow() - startTime); - deletedKeyCount.addAndGet(delCount); + // Check if this is the Leader OM. If not leader, no need to execute this + // task. + if (shouldRun()) { + runCount.incrementAndGet(); + try { + long startTime = Time.monotonicNow(); + List keyBlocksList = manager + .getPendingDeletionKeys(keyLimitPerTask); + if (keyBlocksList != null && keyBlocksList.size() > 0) { + List results = + scmClient.deleteKeyBlocks(keyBlocksList); + if (results != null) { + int delCount; + if (isRatisEnabled()) { + delCount = submitPurgeKeysRequest(results); + } else { + // TODO: Once HA and non-HA paths are merged, we should have + // only one code path here. Purge keys should go through an + // OMRequest model. + delCount = deleteAllKeys(results); + } + LOG.debug("Number of keys deleted: {}, elapsed time: {}ms", + delCount, Time.monotonicNow() - startTime); + deletedKeyCount.addAndGet(delCount); + } } + } catch (IOException e) { + LOG.error("Error while running delete keys background task. Will " + + "retry at next run.", e); } - } catch (IOException e) { - LOG.error("Error while running delete keys background task. Will " + - "retry at next run.", e); } - // By desing, no one cares about the results of this call back. + // By design, no one cares about the results of this call back. return EmptyTaskResult.newResult(); } @@ -171,5 +209,48 @@ public class KeyDeletingService extends BackgroundService { } return deletedCount; } + + /** + * Submits PurgeKeys request for the keys whose blocks have been deleted + * by SCM. + * + * @param results DeleteBlockGroups returned by SCM. + * @throws IOException on Error + */ + public int submitPurgeKeysRequest(List results) { + List purgeKeysList = new ArrayList<>(); + + // Put all keys to be purged in a list + int deletedCount = 0; + for (DeleteBlockGroupResult result : results) { + if (result.isSuccess()) { + // Add key to PurgeKeys list. + String deletedKey = result.getObjectKey(); + purgeKeysList.add(deletedKey); + LOG.debug("Key {} set to be purged from OM DB", deletedKey); + deletedCount++; + } + } + + PurgeKeysRequest purgeKeysRequest = PurgeKeysRequest.newBuilder() + .addAllKeys(purgeKeysList) + .build(); + + OMRequest omRequest = OMRequest.newBuilder() + .setCmdType(Type.PurgeKeys) + .setPurgeKeysRequest(purgeKeysRequest) + .setClientId(clientId.toString()) + .build(); + + // Submit PurgeKeys request to OM + try { + ozoneManager.getOmServerProtocol().submitRequest(null, omRequest); + } catch (ServiceException e) { + LOG.error("PurgeKey request failed. Will retry at next run."); + return 0; + } + + return deletedCount; + } } } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java index d731413c7ff..7db4a80ebf3 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java @@ -35,6 +35,7 @@ import java.security.GeneralSecurityException; import java.security.PrivilegedExceptionAction; import java.util.stream.Collectors; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Strings; import com.google.protobuf.ByteString; import org.apache.commons.codec.digest.DigestUtils; @@ -138,6 +139,7 @@ public class KeyManagerImpl implements KeyManager { /** * A SCM block client, used to talk to SCM to allocate block during putKey. */ + private final OzoneManager ozoneManager; private final ScmClient scmClient; private final OMMetadataManager metadataManager; private final long scmBlockSize; @@ -154,21 +156,25 @@ public class KeyManagerImpl implements KeyManager { private final PrefixManager prefixManager; + @VisibleForTesting public KeyManagerImpl(ScmBlockLocationProtocol scmBlockClient, OMMetadataManager metadataManager, OzoneConfiguration conf, String omId, OzoneBlockTokenSecretManager secretManager) { - this(new ScmClient(scmBlockClient, null), metadataManager, + this(null, new ScmClient(scmBlockClient, null), metadataManager, conf, omId, secretManager, null, null); } - public KeyManagerImpl(ScmClient scmClient, + public KeyManagerImpl(OzoneManager om, ScmClient scmClient, + OzoneConfiguration conf, String omId) { + this (om, scmClient, om.getMetadataManager(), conf, omId, + om.getBlockTokenMgr(), om.getKmsProvider(), om.getPrefixManager()); + } + + @SuppressWarnings("parameternumber") + private KeyManagerImpl(OzoneManager om, ScmClient scmClient, OMMetadataManager metadataManager, OzoneConfiguration conf, String omId, OzoneBlockTokenSecretManager secretManager, - KeyProviderCryptoExtension kmsProvider, - PrefixManager prefixManager) { - this.scmClient = scmClient; - this.metadataManager = metadataManager; - this.prefixManager = prefixManager; + KeyProviderCryptoExtension kmsProvider, PrefixManager prefixManager) { this.scmBlockSize = (long) conf .getStorageSize(OZONE_SCM_BLOCK_SIZE, OZONE_SCM_BLOCK_SIZE_DEFAULT, StorageUnit.BYTES); @@ -177,13 +183,19 @@ public class KeyManagerImpl implements KeyManager { this.preallocateBlocksMax = conf.getInt( OZONE_KEY_PREALLOCATION_BLOCKS_MAX, OZONE_KEY_PREALLOCATION_BLOCKS_MAX_DEFAULT); - this.omId = omId; - start(conf); - this.secretManager = secretManager; this.grpcBlockTokenEnabled = conf.getBoolean( HDDS_BLOCK_TOKEN_ENABLED, HDDS_BLOCK_TOKEN_ENABLED_DEFAULT); + + this.ozoneManager = om; + this.omId = omId; + this.scmClient = scmClient; + this.metadataManager = metadataManager; + this.prefixManager = prefixManager; + this.secretManager = secretManager; this.kmsProvider = kmsProvider; + + start(conf); } @Override @@ -197,8 +209,9 @@ public class KeyManagerImpl implements KeyManager { OZONE_BLOCK_DELETING_SERVICE_TIMEOUT, OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT, TimeUnit.MILLISECONDS); - keyDeletingService = new KeyDeletingService(scmClient.getBlockClient(), - this, blockDeleteInterval, serviceTimeout, configuration); + keyDeletingService = new KeyDeletingService(ozoneManager, + scmClient.getBlockClient(), this, blockDeleteInterval, + serviceTimeout, configuration); keyDeletingService.start(); } } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java index 69ac2d0bd99..62ee64a9405 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java @@ -268,6 +268,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl private S3SecretManager s3SecretManager; private volatile boolean isOmRpcServerRunning = false; private String omComponent; + private OzoneManagerProtocolServerSideTranslatorPB omServerProtocol; private boolean isRatisEnabled; private OzoneManagerRatisServer omRatisServer; @@ -401,11 +402,13 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl omRpcServer = getRpcServer(conf); omRpcAddress = updateRPCListenAddress(configuration, OZONE_OM_ADDRESS_KEY, omNodeRpcAddr, omRpcServer); + this.scmClient = new ScmClient(scmBlockClient, scmContainerClient); + prefixManager = new PrefixManagerImpl(metadataManager); - keyManager = new KeyManagerImpl(scmClient, metadataManager, - configuration, omStorage.getOmId(), blockTokenMgr, getKmsProvider(), - prefixManager); + keyManager = new KeyManagerImpl(this, scmClient, configuration, + omStorage.getOmId()); + shutdownHook = () -> { saveOmMetrics(); }; @@ -1200,6 +1203,11 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl public KeyProviderCryptoExtension getKmsProvider() { return kmsProvider; } + + public PrefixManager getPrefixManager() { + return prefixManager; + } + /** * Get metadata manager. * @@ -1209,6 +1217,14 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl return metadataManager; } + public OzoneBlockTokenSecretManager getBlockTokenMgr() { + return blockTokenMgr; + } + + public OzoneManagerProtocolServerSideTranslatorPB getOmServerProtocol() { + return omServerProtocol; + } + public OMMetrics getMetrics() { return metrics; } @@ -1328,9 +1344,11 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl RPC.setProtocolEngine(configuration, OzoneManagerProtocolPB.class, ProtobufRpcEngine.class); - BlockingService omService = newReflectiveBlockingService( - new OzoneManagerProtocolServerSideTranslatorPB(this, omRatisServer, - isRatisEnabled)); + this.omServerProtocol = new OzoneManagerProtocolServerSideTranslatorPB( + this, omRatisServer, isRatisEnabled); + + BlockingService omService = newReflectiveBlockingService(omServerProtocol); + return startRpcServer(configuration, omNodeRpcAddr, OzoneManagerProtocolPB.class, omService, handlerCount); @@ -3196,4 +3214,28 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl public long getMaxUserVolumeCount() { return maxUserVolumeCount; } + + /** + * Checks the Leader status of OM Ratis Server. + * Note that this status has a small window of error. It should not be used + * to determine the absolute leader status. + * If it is the leader, the role status is cached till Ratis server + * notifies of leader change. If it is not leader, the role information is + * retrieved through by submitting a GroupInfoRequest to Ratis server. + * + * If ratis is not enabled, then it always returns true. + * + * @return Return true if this node is the leader, false otherwsie. + */ + public boolean isLeader() { + return isRatisEnabled ? omRatisServer.isLeader() : true; + } + + /** + * Return if Ratis is enabled or not. + * @return + */ + public boolean isRatisEnabled() { + return isRatisEnabled; + } } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java index 5b0dc0fb561..d9f713ed162 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java @@ -29,6 +29,7 @@ import org.apache.hadoop.ozone.om.request.key.OMAllocateBlockRequest; import org.apache.hadoop.ozone.om.request.key.OMKeyCommitRequest; import org.apache.hadoop.ozone.om.request.key.OMKeyCreateRequest; import org.apache.hadoop.ozone.om.request.key.OMKeyDeleteRequest; +import org.apache.hadoop.ozone.om.request.key.OMKeyPurgeRequest; import org.apache.hadoop.ozone.om.request.key.OMKeyRenameRequest; import org.apache.hadoop.ozone.om.request.volume.OMVolumeCreateRequest; import org.apache.hadoop.ozone.om.request.volume.OMVolumeDeleteRequest; @@ -96,6 +97,8 @@ public final class OzoneManagerRatisUtils { return new OMDirectoryCreateRequest(omRequest); case CreateFile: return new OMFileCreateRequest(omRequest); + case PurgeKeys: + return new OMKeyPurgeRequest(omRequest); default: // TODO: will update once all request types are implemented. return null; diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyDeleteRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyDeleteRequest.java index 7d9275e3915..f79187e2a2c 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyDeleteRequest.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyDeleteRequest.java @@ -25,6 +25,7 @@ import com.google.common.base.Optional; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Preconditions; import org.apache.hadoop.ozone.audit.AuditLogger; import org.apache.hadoop.ozone.audit.OMAction; import org.apache.hadoop.ozone.om.OMMetadataManager; @@ -44,6 +45,7 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos .OMRequest; import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer; import org.apache.hadoop.ozone.security.acl.OzoneObj; +import org.apache.hadoop.util.Time; import org.apache.hadoop.utils.db.cache.CacheKey; import org.apache.hadoop.utils.db.cache.CacheValue; @@ -63,6 +65,21 @@ public class OMKeyDeleteRequest extends OMKeyRequest { super(omRequest); } + @Override + public OMRequest preExecute(OzoneManager ozoneManager) throws IOException { + DeleteKeyRequest deleteKeyRequest = getOmRequest().getDeleteKeyRequest(); + Preconditions.checkNotNull(deleteKeyRequest); + + OzoneManagerProtocolProtos.KeyArgs keyArgs = deleteKeyRequest.getKeyArgs(); + + OzoneManagerProtocolProtos.KeyArgs.Builder newKeyArgs = + keyArgs.toBuilder().setModificationTime(Time.now()); + + return getOmRequest().toBuilder() + .setDeleteKeyRequest(deleteKeyRequest.toBuilder() + .setKeyArgs(newKeyArgs)).setUserInfo(getUserInfo()).build(); + } + @Override public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, long transactionLogIndex) { @@ -151,12 +168,13 @@ public class OMKeyDeleteRequest extends OMKeyRequest { // return response. if (exception == null) { omMetrics.decNumKeys(); - return new OMKeyDeleteResponse(omKeyInfo, + return new OMKeyDeleteResponse( + omKeyInfo, deleteKeyArgs.getModificationTime(), omResponse.setDeleteKeyResponse( DeleteKeyResponse.newBuilder()).build()); } else { omMetrics.incNumKeyDeleteFails(); - return new OMKeyDeleteResponse(null, + return new OMKeyDeleteResponse(null, 0, createErrorOMResponse(omResponse, exception)); } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyPurgeRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyPurgeRequest.java new file mode 100644 index 00000000000..5eda6768c3a --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyPurgeRequest.java @@ -0,0 +1,48 @@ +package org.apache.hadoop.ozone.om.request.key; + +import org.apache.hadoop.ozone.om.OzoneManager; +import org.apache.hadoop.ozone.om.response.OMClientResponse; +import org.apache.hadoop.ozone.om.response.key.OMKeyPurgeResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PurgeKeysRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +/** + * Handles purging of keys from OM DB. + */ +public class OMKeyPurgeRequest extends OMKeyRequest { + + private static final Logger LOG = + LoggerFactory.getLogger(OMKeyPurgeRequest.class); + + public OMKeyPurgeRequest(OMRequest omRequest) { + super(omRequest); + } + + @Override + public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, + long transactionLogIndex) { + PurgeKeysRequest purgeKeysRequest = getOmRequest().getPurgeKeysRequest(); + List purgeKeysList = purgeKeysRequest.getKeysList(); + + LOG.debug("Processing Purge Keys for {} number of keys.", + purgeKeysList.size()); + + OMResponse omResponse = OMResponse.newBuilder() + .setCmdType(Type.PurgeKeys) + .setPurgeKeysResponse( + OzoneManagerProtocolProtos.PurgeKeysResponse.newBuilder().build()) + .setStatus(Status.OK) + .setSuccess(true) + .build(); + + return new OMKeyPurgeResponse(purgeKeysList, omResponse); + } +} diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyDeleteResponse.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyDeleteResponse.java index 8016f481727..5012035d9eb 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyDeleteResponse.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyDeleteResponse.java @@ -18,6 +18,7 @@ package org.apache.hadoop.ozone.om.response.key; +import org.apache.hadoop.ozone.OmUtils; import org.apache.hadoop.ozone.om.OMMetadataManager; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup; @@ -34,10 +35,13 @@ import java.io.IOException; */ public class OMKeyDeleteResponse extends OMClientResponse { private OmKeyInfo omKeyInfo; + private long deleteTimestamp; - public OMKeyDeleteResponse(OmKeyInfo omKeyInfo, OMResponse omResponse) { + public OMKeyDeleteResponse(OmKeyInfo omKeyInfo, long deletionTime, + OMResponse omResponse) { super(omResponse); this.omKeyInfo = omKeyInfo; + this.deleteTimestamp = deletionTime; } @Override @@ -54,8 +58,14 @@ public class OMKeyDeleteResponse extends OMClientResponse { // If Key is not empty add this to delete table. if (!isKeyEmpty(omKeyInfo)) { + // If a deleted key is put in the table where a key with the same + // name already exists, then the old deleted key information would be + // lost. To differentiate between keys with same name in + // deletedTable, we add the timestamp to the key name. + String deleteKeyName = OmUtils.getDeletedKeyName( + ozoneKey, deleteTimestamp); omMetadataManager.getDeletedTable().putWithBatch(batchOperation, - ozoneKey, omKeyInfo); + deleteKeyName, omKeyInfo); } } } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyPurgeResponse.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyPurgeResponse.java new file mode 100644 index 00000000000..794c0b12a0f --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyPurgeResponse.java @@ -0,0 +1,36 @@ +package org.apache.hadoop.ozone.om.response.key; + +import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.apache.hadoop.ozone.om.response.OMClientResponse; +import org.apache.hadoop.ozone.om.request.key.OMKeyPurgeRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse; +import org.apache.hadoop.utils.db.BatchOperation; + +import java.io.IOException; +import java.util.List; + +/** + * Response for {@link OMKeyPurgeRequest} request. + */ +public class OMKeyPurgeResponse extends OMClientResponse { + + private List purgeKeyList; + + public OMKeyPurgeResponse(List keyList, OMResponse omResponse) { + super(omResponse); + this.purgeKeyList = keyList; + } + + @Override + public void addToDBBatch(OMMetadataManager omMetadataManager, + BatchOperation batchOperation) throws IOException { + + if (getOMResponse().getStatus() == OzoneManagerProtocolProtos.Status.OK) { + for (String key : purgeKeyList) { + omMetadataManager.getDeletedTable().deleteWithBatch(batchOperation, + key); + } + } + } +} diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerHARequestHandlerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerHARequestHandlerImpl.java index c1b4bd8c1a3..4b2120ffd8e 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerHARequestHandlerImpl.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerHARequestHandlerImpl.java @@ -110,6 +110,7 @@ public class OzoneManagerHARequestHandlerImpl case RenameKey: case CreateDirectory: case CreateFile: + case PurgeKeys: //TODO: We don't need to pass transactionID, this will be removed when // complete write requests is changed to new model. And also we can // return OMClientResponse, then adding to doubleBuffer can be taken diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/TestOMRequestUtils.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/TestOMRequestUtils.java index 927341a1deb..40b2bc82938 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/TestOMRequestUtils.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/TestOMRequestUtils.java @@ -20,12 +20,14 @@ package org.apache.hadoop.ozone.om.request; +import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.UUID; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.ozone.OmUtils; import org.apache.hadoop.ozone.om.OMMetadataManager; import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; @@ -242,4 +244,21 @@ public final class TestOMRequestUtils { .setSetVolumePropertyRequest(setVolumePropertyRequest).build(); } + /** + * Deletes key from Key table and adds it to DeletedKeys table. + * @return the deletedKey name + */ + public static String deleteKey(String ozoneKey, + OMMetadataManager omMetadataManager) throws IOException { + // Retrieve the keyInfo + OmKeyInfo omKeyInfo = omMetadataManager.getKeyTable().get(ozoneKey); + + // Delete key from KeyTable and put in DeletedKeyTable + omMetadataManager.getKeyTable().delete(ozoneKey); + String deletedKeyName = OmUtils.getDeletedKeyName(ozoneKey, Time.now()); + omMetadataManager.getDeletedTable().put(deletedKeyName, omKeyInfo); + + return deletedKeyName; + } + } diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyPurgeRequestAndResponse.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyPurgeRequestAndResponse.java new file mode 100644 index 00000000000..573730744a9 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyPurgeRequestAndResponse.java @@ -0,0 +1,146 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.request.key; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.hadoop.ozone.om.request.TestOMRequestUtils; +import org.apache.hadoop.ozone.om.response.OMClientResponse; +import org.apache.hadoop.ozone.om.response.key.OMKeyPurgeResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PurgeKeysRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PurgeKeysResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type; +import org.apache.hadoop.utils.db.BatchOperation; + +/** + * Tests {@link OMKeyPurgeRequest} and {@link OMKeyPurgeResponse}. + */ +public class TestOMKeyPurgeRequestAndResponse extends TestOMKeyRequest { + + private int numKeys = 10; + + /** + * Creates volume, bucket and key entries and adds to OM DB and then + * deletes these keys to move them to deletedKeys table. + */ + private List createAndDeleteKeys() throws Exception { + // Add volume, bucket and key entries to OM DB. + TestOMRequestUtils.addVolumeAndBucketToDB(volumeName, bucketName, + omMetadataManager); + + List ozoneKeyNames = new ArrayList<>(numKeys); + for (int i = 1; i <= numKeys; i++) { + String key = keyName + "-" + i; + TestOMRequestUtils.addKeyToTable(false, volumeName, bucketName, key, + clientID, replicationType, replicationFactor, omMetadataManager); + ozoneKeyNames.add( + omMetadataManager.getOzoneKey(volumeName, bucketName, key)); + } + + List deletedKeyNames = new ArrayList<>(numKeys); + for (String ozoneKey : ozoneKeyNames) { + String deletedKeyName = TestOMRequestUtils.deleteKey( + ozoneKey, omMetadataManager); + deletedKeyNames.add(deletedKeyName); + } + + return deletedKeyNames; + } + + /** + * Create OMRequest which encapsulates DeleteKeyRequest. + * @return OMRequest + */ + private OMRequest createPurgeKeysRequest(List deletedKeys) { + PurgeKeysRequest purgeKeysRequest = PurgeKeysRequest.newBuilder() + .addAllKeys(deletedKeys) + .build(); + + return OMRequest.newBuilder() + .setPurgeKeysRequest(purgeKeysRequest) + .setCmdType(Type.PurgeKeys) + .setClientId(UUID.randomUUID().toString()) + .build(); + } + + @Test + public void testValidateAndUpdateCache() throws Exception { + // Create and Delete keys. The keys should be moved to DeletedKeys table + List deletedKeyNames = createAndDeleteKeys(); + + // The keys should be present in the DeletedKeys table before purging + for (String deletedKey : deletedKeyNames) { + Assert.assertTrue(omMetadataManager.getDeletedTable().isExist( + deletedKey)); + } + + // Create PurgeKeysRequest to purge the deleted keys + OMRequest omRequest = createPurgeKeysRequest(deletedKeyNames); + + OMRequest preExecutedRequest = preExecute(omRequest); + OMKeyPurgeRequest omKeyPurgeRequest = + new OMKeyPurgeRequest(preExecutedRequest); + + OMClientResponse omClientResponse = + omKeyPurgeRequest.validateAndUpdateCache(ozoneManager, 100L); + + OMResponse omResponse = OMResponse.newBuilder() + .setPurgeKeysResponse(PurgeKeysResponse.getDefaultInstance()) + .setCmdType(Type.PurgeKeys) + .setStatus(Status.OK) + .build(); + + BatchOperation batchOperation = + omMetadataManager.getStore().initBatchOperation(); + + OMKeyPurgeResponse omKeyPurgeResponse = + new OMKeyPurgeResponse(deletedKeyNames, omResponse); + omKeyPurgeResponse.addToDBBatch(omMetadataManager, batchOperation); + + // Do manual commit and see whether addToBatch is successful or not. + omMetadataManager.getStore().commitBatchOperation(batchOperation); + + // The keys should not exist in the DeletedKeys table + for (String deletedKey : deletedKeyNames) { + Assert.assertFalse(omMetadataManager.getDeletedTable().isExist( + deletedKey)); + } + } + + private OMRequest preExecute(OMRequest originalOmRequest) throws IOException { + OMKeyPurgeRequest omKeyPurgeRequest = + new OMKeyPurgeRequest(originalOmRequest); + + OMRequest modifiedOmRequest = omKeyPurgeRequest.preExecute(ozoneManager); + + // Will not be equal, as UserInfo will be set. + Assert.assertNotEquals(originalOmRequest, modifiedOmRequest); + + return modifiedOmRequest; + } +} diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyDeleteResponse.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyDeleteResponse.java index 62d28f4fb0e..da96e0c1cba 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyDeleteResponse.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyDeleteResponse.java @@ -21,7 +21,9 @@ package org.apache.hadoop.ozone.om.response.key; import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.pipeline.PipelineID; +import org.apache.hadoop.ozone.OmUtils; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; +import org.apache.hadoop.util.Time; import org.junit.Assert; import org.junit.Test; @@ -50,11 +52,15 @@ public class TestOMKeyDeleteResponse extends TestOMKeyResponse { .setCmdType(OzoneManagerProtocolProtos.Type.DeleteKey) .build(); + long deletionTime = Time.now(); + OMKeyDeleteResponse omKeyDeleteResponse = - new OMKeyDeleteResponse(omKeyInfo, omResponse); + new OMKeyDeleteResponse(omKeyInfo, deletionTime, omResponse); String ozoneKey = omMetadataManager.getOzoneKey(volumeName, bucketName, keyName); + String deletedOzoneKeyName = OmUtils.getDeletedKeyName( + ozoneKey, deletionTime); TestOMRequestUtils.addKeyToTable(false, volumeName, bucketName, keyName, clientID, replicationType, replicationFactor, omMetadataManager); @@ -69,7 +75,8 @@ public class TestOMKeyDeleteResponse extends TestOMKeyResponse { // As default key entry does not have any blocks, it should not be in // deletedKeyTable. - Assert.assertFalse(omMetadataManager.getDeletedTable().isExist(ozoneKey)); + Assert.assertFalse(omMetadataManager.getDeletedTable().isExist( + deletedOzoneKeyName)); } @Test @@ -110,9 +117,13 @@ public class TestOMKeyDeleteResponse extends TestOMKeyResponse { .setStatus(OzoneManagerProtocolProtos.Status.OK) .setCmdType(OzoneManagerProtocolProtos.Type.DeleteKey) .build(); + long deletionTime = Time.now(); OMKeyDeleteResponse omKeyDeleteResponse = - new OMKeyDeleteResponse(omKeyInfo, omResponse); + new OMKeyDeleteResponse(omKeyInfo, deletionTime, omResponse); + + String deletedOzoneKeyName = OmUtils.getDeletedKeyName( + ozoneKey, deletionTime); Assert.assertTrue(omMetadataManager.getKeyTable().isExist(ozoneKey)); omKeyDeleteResponse.addToDBBatch(omMetadataManager, batchOperation); @@ -123,7 +134,8 @@ public class TestOMKeyDeleteResponse extends TestOMKeyResponse { Assert.assertFalse(omMetadataManager.getKeyTable().isExist(ozoneKey)); // Key has blocks, it should not be in deletedKeyTable. - Assert.assertTrue(omMetadataManager.getDeletedTable().isExist(ozoneKey)); + Assert.assertTrue(omMetadataManager.getDeletedTable().isExist( + deletedOzoneKeyName)); } @@ -140,7 +152,7 @@ public class TestOMKeyDeleteResponse extends TestOMKeyResponse { .build(); OMKeyDeleteResponse omKeyDeleteResponse = - new OMKeyDeleteResponse(omKeyInfo, omResponse); + new OMKeyDeleteResponse(omKeyInfo, Time.now(), omResponse); String ozoneKey = omMetadataManager.getOzoneKey(volumeName, bucketName, keyName);