diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index f3c08197898..8872adcb429 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -32,12 +32,14 @@ import java.util.Collection; import java.util.Collections; import java.util.Date; import java.util.EnumSet; +import java.util.Iterator; import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.Objects; +import java.util.TreeSet; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; @@ -386,6 +388,12 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, */ private ArnResource accessPoint; + /** + * A cache of files that should be deleted when the FileSystem is closed + * or the JVM is exited. + */ + private final Set deleteOnExit = new TreeSet<>(); + /** Add any deprecated keys. */ @SuppressWarnings("deprecation") private static void addDeprecatedKeys() { @@ -3063,6 +3071,24 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, @AuditEntryPoint public boolean delete(Path f, boolean recursive) throws IOException { checkNotClosed(); + return deleteWithoutCloseCheck(f, recursive); + } + + /** + * Same as delete(), except that it does not check if fs is closed. + * + * @param f the path to delete. + * @param recursive if path is a directory and set to + * true, the directory is deleted else throws an exception. In + * case of a file the recursive can be set to either true or false. + * @return true if the path existed and then was deleted; false if there + * was no path in the first place, or the corner cases of root path deletion + * have surfaced. + * @throws IOException due to inability to delete a directory or file. + */ + + @VisibleForTesting + protected boolean deleteWithoutCloseCheck(Path f, boolean recursive) throws IOException { final Path path = qualify(f); // span covers delete, getFileStatus, fake directory operations. try (AuditSpan span = createSpan(INVOCATION_DELETE.getSymbol(), @@ -3804,6 +3830,61 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, } } + /** + * This override bypasses checking for existence. + * + * @param f the path to delete; this may be unqualified. + * @return true, always. * @param f the path to delete. + * @return true if deleteOnExit is successful, otherwise false. + * @throws IOException IO failure + */ + @Override + public boolean deleteOnExit(Path f) throws IOException { + Path qualifedPath = makeQualified(f); + synchronized (deleteOnExit) { + deleteOnExit.add(qualifedPath); + } + return true; + } + + /** + * Cancel the scheduled deletion of the path when the FileSystem is closed. + * @param f the path to cancel deletion + * @return true if the path was found in the delete-on-exit list. + */ + @Override + public boolean cancelDeleteOnExit(Path f) { + Path qualifedPath = makeQualified(f); + synchronized (deleteOnExit) { + return deleteOnExit.remove(qualifedPath); + } + } + + /** + * Delete all paths that were marked as delete-on-exit. This recursively + * deletes all files and directories in the specified paths. It does not + * check if file exists and filesystem is closed. + * + * The time to process this operation is {@code O(paths)}, with the actual + * time dependent on the time for existence and deletion operations to + * complete, successfully or not. + */ + @Override + protected void processDeleteOnExit() { + synchronized (deleteOnExit) { + for (Iterator iter = deleteOnExit.iterator(); iter.hasNext();) { + Path path = iter.next(); + try { + deleteWithoutCloseCheck(path, true); + } catch (IOException e) { + LOG.info("Ignoring failure to deleteOnExit for path {}", path); + LOG.debug("The exception for deleteOnExit is {}", e); + } + iter.remove(); + } + } + } + /** * Close the filesystem. This shuts down all transfers. * @throws IOException IO problem diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ADeleteOnExit.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ADeleteOnExit.java new file mode 100644 index 00000000000..31c58de629b --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ADeleteOnExit.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs.s3a; + +import org.junit.Test; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile; +import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset; + +/** + * Test deleteOnExit for S3A. + * The following cases for deleteOnExit are tested: + * 1. A nonexist file, which is added to deleteOnExit set. + * 2. An existing file + * 3. A file is added to deleteOnExist set first, then created. + * 4. A directory with some files under it. + */ +public class ITestS3ADeleteOnExit extends AbstractS3ATestBase { + + private static final String PARENT_DIR_PATH_STR = "testDeleteOnExitDir"; + private static final String NON_EXIST_FILE_PATH_STR = + PARENT_DIR_PATH_STR + "/nonExistFile"; + private static final String INORDER_FILE_PATH_STR = + PARENT_DIR_PATH_STR + "/inOrderFile"; + private static final String OUT_OF_ORDER_FILE_PATH_STR = + PARENT_DIR_PATH_STR + "/outOfOrderFile"; + private static final String SUBDIR_PATH_STR = + PARENT_DIR_PATH_STR + "/subDir"; + private static final String FILE_UNDER_SUBDIR_PATH_STR = + SUBDIR_PATH_STR + "/subDirFile"; + + @Test + public void testDeleteOnExit() throws Exception { + FileSystem fs = getFileSystem(); + + // Get a new filesystem object which is same as fs. + FileSystem s3aFs = new S3AFileSystem(); + s3aFs.initialize(fs.getUri(), fs.getConf()); + Path nonExistFilePath = path(NON_EXIST_FILE_PATH_STR); + Path inOrderFilePath = path(INORDER_FILE_PATH_STR); + Path outOfOrderFilePath = path(OUT_OF_ORDER_FILE_PATH_STR); + Path subDirPath = path(SUBDIR_PATH_STR); + Path fileUnderSubDirPath = path(FILE_UNDER_SUBDIR_PATH_STR); + + // 1. set up the test directory. + Path dir = path("testDeleteOnExitDir"); + s3aFs.mkdirs(dir); + + // 2. Add a nonexisting file to DeleteOnExit set. + s3aFs.deleteOnExit(nonExistFilePath); + assertPathDoesNotExist("File " + NON_EXIST_FILE_PATH_STR + " should not exist", + nonExistFilePath); + + // 3. create a file and then add it to DeleteOnExit set. + byte[] data = dataset(16, 'a', 26); + createFile(s3aFs, inOrderFilePath, true, data); + assertPathExists("File " + INORDER_FILE_PATH_STR + " should exist", inOrderFilePath); + s3aFs.deleteOnExit(inOrderFilePath); + + // 4. add a path to DeleteOnExit set first, then create it. + s3aFs.deleteOnExit(outOfOrderFilePath); + createFile(s3aFs, outOfOrderFilePath, true, data); + assertPathExists("File " + OUT_OF_ORDER_FILE_PATH_STR + " should exist", + outOfOrderFilePath); + + // 5. create a subdirectory, a file under it, and add subdirectory DeleteOnExit set. + s3aFs.mkdirs(subDirPath); + s3aFs.deleteOnExit(subDirPath); + createFile(s3aFs, fileUnderSubDirPath, true, data); + assertPathExists("Directory " + SUBDIR_PATH_STR + " should exist", subDirPath); + assertPathExists("File " + FILE_UNDER_SUBDIR_PATH_STR + " should exist", + fileUnderSubDirPath); + + s3aFs.close(); + + // After s3aFs is closed, make sure that all files/directories in deleteOnExit + // set are deleted. + assertPathDoesNotExist("File " + NON_EXIST_FILE_PATH_STR + " should not exist", + nonExistFilePath); + assertPathDoesNotExist("File " + INORDER_FILE_PATH_STR + " should not exist", + inOrderFilePath); + assertPathDoesNotExist("File " + OUT_OF_ORDER_FILE_PATH_STR + " should not exist", + outOfOrderFilePath); + assertPathDoesNotExist("Directory " + SUBDIR_PATH_STR + " should not exist", + subDirPath); + } +} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3ADeleteOnExit.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3ADeleteOnExit.java new file mode 100644 index 00000000000..62a99d72092 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3ADeleteOnExit.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import static org.apache.hadoop.fs.s3a.Constants.FS_S3A; +import static org.junit.Assert.assertEquals; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.net.URI; +import java.util.Date; + +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.model.GetObjectMetadataRequest; +import com.amazonaws.services.s3.model.ObjectMetadata; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; + +import org.junit.Test; +import org.mockito.ArgumentMatcher; + +/** + * deleteOnExit test for S3A. + */ +public class TestS3ADeleteOnExit extends AbstractS3AMockTest { + + static class TestS3AFileSystem extends S3AFileSystem { + private int deleteOnDnExitCount; + + public int getDeleteOnDnExitCount() { + return deleteOnDnExitCount; + } + + @Override + public boolean deleteOnExit(Path f) throws IOException { + deleteOnDnExitCount++; + return super.deleteOnExit(f); + } + + // This is specifically designed for deleteOnExit processing. + // In this specific case, deleteWithoutCloseCheck() will only be called in the path of + // processDeleteOnExit. + @Override + protected boolean deleteWithoutCloseCheck(Path f, boolean recursive) throws IOException { + boolean result = super.deleteWithoutCloseCheck(f, recursive); + deleteOnDnExitCount--; + return result; + } + } + + @Test + public void testDeleteOnExit() throws Exception { + Configuration conf = createConfiguration(); + TestS3AFileSystem testFs = new TestS3AFileSystem(); + URI uri = URI.create(FS_S3A + "://" + BUCKET); + // unset S3CSE property from config to avoid pathIOE. + conf.unset(Constants.S3_ENCRYPTION_ALGORITHM); + testFs.initialize(uri, conf); + AmazonS3 testS3 = testFs.getAmazonS3ClientForTesting("mocking"); + + Path path = new Path("/file"); + String key = path.toUri().getPath().substring(1); + ObjectMetadata meta = new ObjectMetadata(); + meta.setContentLength(1L); + meta.setLastModified(new Date(2L)); + when(testS3.getObjectMetadata(argThat(correctGetMetadataRequest(BUCKET, key)))) + .thenReturn(meta); + + testFs.deleteOnExit(path); + testFs.close(); + assertEquals(0, testFs.getDeleteOnDnExitCount()); + } + + private ArgumentMatcher correctGetMetadataRequest( + String bucket, String key) { + return request -> request != null + && request.getBucketName().equals(bucket) + && request.getKey().equals(key); + } +}