HADOOP-18340. deleteOnExit does not work with S3AFileSystem (#4608)

Contributed by Huaxiang Sun
This commit is contained in:
huaxiangsun 2022-08-11 12:25:13 -07:00 committed by Steve Loughran
parent a0e2ab2974
commit 1b9135e3b5
No known key found for this signature in database
GPG Key ID: D22CF846DBB162A0
3 changed files with 284 additions and 0 deletions

View File

@ -32,12 +32,14 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.Objects;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
@ -386,6 +388,12 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
*/
private ArnResource accessPoint;
/**
* A cache of files that should be deleted when the FileSystem is closed
* or the JVM is exited.
*/
private final Set<Path> deleteOnExit = new TreeSet<>();
/** Add any deprecated keys. */
@SuppressWarnings("deprecation")
private static void addDeprecatedKeys() {
@ -3063,6 +3071,24 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
@AuditEntryPoint
public boolean delete(Path f, boolean recursive) throws IOException {
checkNotClosed();
return deleteWithoutCloseCheck(f, recursive);
}
/**
* Same as delete(), except that it does not check if fs is closed.
*
* @param f the path to delete.
* @param recursive if path is a directory and set to
* true, the directory is deleted else throws an exception. In
* case of a file the recursive can be set to either true or false.
* @return true if the path existed and then was deleted; false if there
* was no path in the first place, or the corner cases of root path deletion
* have surfaced.
* @throws IOException due to inability to delete a directory or file.
*/
@VisibleForTesting
protected boolean deleteWithoutCloseCheck(Path f, boolean recursive) throws IOException {
final Path path = qualify(f);
// span covers delete, getFileStatus, fake directory operations.
try (AuditSpan span = createSpan(INVOCATION_DELETE.getSymbol(),
@ -3804,6 +3830,61 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
}
}
/**
* This override bypasses checking for existence.
*
* @param f the path to delete; this may be unqualified.
* @return true, always. * @param f the path to delete.
* @return true if deleteOnExit is successful, otherwise false.
* @throws IOException IO failure
*/
@Override
public boolean deleteOnExit(Path f) throws IOException {
Path qualifedPath = makeQualified(f);
synchronized (deleteOnExit) {
deleteOnExit.add(qualifedPath);
}
return true;
}
/**
* Cancel the scheduled deletion of the path when the FileSystem is closed.
* @param f the path to cancel deletion
* @return true if the path was found in the delete-on-exit list.
*/
@Override
public boolean cancelDeleteOnExit(Path f) {
Path qualifedPath = makeQualified(f);
synchronized (deleteOnExit) {
return deleteOnExit.remove(qualifedPath);
}
}
/**
* Delete all paths that were marked as delete-on-exit. This recursively
* deletes all files and directories in the specified paths. It does not
* check if file exists and filesystem is closed.
*
* The time to process this operation is {@code O(paths)}, with the actual
* time dependent on the time for existence and deletion operations to
* complete, successfully or not.
*/
@Override
protected void processDeleteOnExit() {
synchronized (deleteOnExit) {
for (Iterator<Path> iter = deleteOnExit.iterator(); iter.hasNext();) {
Path path = iter.next();
try {
deleteWithoutCloseCheck(path, true);
} catch (IOException e) {
LOG.info("Ignoring failure to deleteOnExit for path {}", path);
LOG.debug("The exception for deleteOnExit is {}", e);
}
iter.remove();
}
}
}
/**
* Close the filesystem. This shuts down all transfers.
* @throws IOException IO problem

View File

@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a;
import org.junit.Test;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
/**
* Test deleteOnExit for S3A.
* The following cases for deleteOnExit are tested:
* 1. A nonexist file, which is added to deleteOnExit set.
* 2. An existing file
* 3. A file is added to deleteOnExist set first, then created.
* 4. A directory with some files under it.
*/
public class ITestS3ADeleteOnExit extends AbstractS3ATestBase {
private static final String PARENT_DIR_PATH_STR = "testDeleteOnExitDir";
private static final String NON_EXIST_FILE_PATH_STR =
PARENT_DIR_PATH_STR + "/nonExistFile";
private static final String INORDER_FILE_PATH_STR =
PARENT_DIR_PATH_STR + "/inOrderFile";
private static final String OUT_OF_ORDER_FILE_PATH_STR =
PARENT_DIR_PATH_STR + "/outOfOrderFile";
private static final String SUBDIR_PATH_STR =
PARENT_DIR_PATH_STR + "/subDir";
private static final String FILE_UNDER_SUBDIR_PATH_STR =
SUBDIR_PATH_STR + "/subDirFile";
@Test
public void testDeleteOnExit() throws Exception {
FileSystem fs = getFileSystem();
// Get a new filesystem object which is same as fs.
FileSystem s3aFs = new S3AFileSystem();
s3aFs.initialize(fs.getUri(), fs.getConf());
Path nonExistFilePath = path(NON_EXIST_FILE_PATH_STR);
Path inOrderFilePath = path(INORDER_FILE_PATH_STR);
Path outOfOrderFilePath = path(OUT_OF_ORDER_FILE_PATH_STR);
Path subDirPath = path(SUBDIR_PATH_STR);
Path fileUnderSubDirPath = path(FILE_UNDER_SUBDIR_PATH_STR);
// 1. set up the test directory.
Path dir = path("testDeleteOnExitDir");
s3aFs.mkdirs(dir);
// 2. Add a nonexisting file to DeleteOnExit set.
s3aFs.deleteOnExit(nonExistFilePath);
assertPathDoesNotExist("File " + NON_EXIST_FILE_PATH_STR + " should not exist",
nonExistFilePath);
// 3. create a file and then add it to DeleteOnExit set.
byte[] data = dataset(16, 'a', 26);
createFile(s3aFs, inOrderFilePath, true, data);
assertPathExists("File " + INORDER_FILE_PATH_STR + " should exist", inOrderFilePath);
s3aFs.deleteOnExit(inOrderFilePath);
// 4. add a path to DeleteOnExit set first, then create it.
s3aFs.deleteOnExit(outOfOrderFilePath);
createFile(s3aFs, outOfOrderFilePath, true, data);
assertPathExists("File " + OUT_OF_ORDER_FILE_PATH_STR + " should exist",
outOfOrderFilePath);
// 5. create a subdirectory, a file under it, and add subdirectory DeleteOnExit set.
s3aFs.mkdirs(subDirPath);
s3aFs.deleteOnExit(subDirPath);
createFile(s3aFs, fileUnderSubDirPath, true, data);
assertPathExists("Directory " + SUBDIR_PATH_STR + " should exist", subDirPath);
assertPathExists("File " + FILE_UNDER_SUBDIR_PATH_STR + " should exist",
fileUnderSubDirPath);
s3aFs.close();
// After s3aFs is closed, make sure that all files/directories in deleteOnExit
// set are deleted.
assertPathDoesNotExist("File " + NON_EXIST_FILE_PATH_STR + " should not exist",
nonExistFilePath);
assertPathDoesNotExist("File " + INORDER_FILE_PATH_STR + " should not exist",
inOrderFilePath);
assertPathDoesNotExist("File " + OUT_OF_ORDER_FILE_PATH_STR + " should not exist",
outOfOrderFilePath);
assertPathDoesNotExist("Directory " + SUBDIR_PATH_STR + " should not exist",
subDirPath);
}
}

View File

@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a;
import static org.apache.hadoop.fs.s3a.Constants.FS_S3A;
import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.net.URI;
import java.util.Date;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.GetObjectMetadataRequest;
import com.amazonaws.services.s3.model.ObjectMetadata;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.junit.Test;
import org.mockito.ArgumentMatcher;
/**
* deleteOnExit test for S3A.
*/
public class TestS3ADeleteOnExit extends AbstractS3AMockTest {
static class TestS3AFileSystem extends S3AFileSystem {
private int deleteOnDnExitCount;
public int getDeleteOnDnExitCount() {
return deleteOnDnExitCount;
}
@Override
public boolean deleteOnExit(Path f) throws IOException {
deleteOnDnExitCount++;
return super.deleteOnExit(f);
}
// This is specifically designed for deleteOnExit processing.
// In this specific case, deleteWithoutCloseCheck() will only be called in the path of
// processDeleteOnExit.
@Override
protected boolean deleteWithoutCloseCheck(Path f, boolean recursive) throws IOException {
boolean result = super.deleteWithoutCloseCheck(f, recursive);
deleteOnDnExitCount--;
return result;
}
}
@Test
public void testDeleteOnExit() throws Exception {
Configuration conf = createConfiguration();
TestS3AFileSystem testFs = new TestS3AFileSystem();
URI uri = URI.create(FS_S3A + "://" + BUCKET);
// unset S3CSE property from config to avoid pathIOE.
conf.unset(Constants.S3_ENCRYPTION_ALGORITHM);
testFs.initialize(uri, conf);
AmazonS3 testS3 = testFs.getAmazonS3ClientForTesting("mocking");
Path path = new Path("/file");
String key = path.toUri().getPath().substring(1);
ObjectMetadata meta = new ObjectMetadata();
meta.setContentLength(1L);
meta.setLastModified(new Date(2L));
when(testS3.getObjectMetadata(argThat(correctGetMetadataRequest(BUCKET, key))))
.thenReturn(meta);
testFs.deleteOnExit(path);
testFs.close();
assertEquals(0, testFs.getDeleteOnDnExitCount());
}
private ArgumentMatcher<GetObjectMetadataRequest> correctGetMetadataRequest(
String bucket, String key) {
return request -> request != null
&& request.getBucketName().equals(bucket)
&& request.getKey().equals(key);
}
}