HADOOP-12292. Make use of DeleteObjects optional. (Thomas Demoor via stevel)
This commit is contained in:
parent
e4c01b8b1c
commit
60474a769e
|
@ -445,6 +445,9 @@ Release 2.8.0 - UNRELEASED
|
|||
|
||||
HADOOP-12755. Fix typo in defaultFS warning message. (wang)
|
||||
|
||||
HADOOP-12292. Make use of DeleteObjects optional.
|
||||
(Thomas Demoor via stevel)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
HADOOP-11785. Reduce the number of listStatus operation in distcp
|
||||
|
|
|
@ -888,6 +888,15 @@ for ldap providers in the same way as above does.
|
|||
<description>Threshold before uploads or copies use parallel multipart operations.</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.s3a.multiobjectdelete.enable</name>
|
||||
<value>true</value>
|
||||
<description>When enabled, multiple single-object delete requests are replaced by
|
||||
a single 'delete multiple objects'-request, reducing the number of requests.
|
||||
Beware: legacy S3-compatible object stores might not support this request.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.s3a.acl.default</name>
|
||||
<description>Set a canned ACL for newly created and copied objects. Value may be private,
|
||||
|
|
|
@ -111,6 +111,7 @@
|
|||
<exec-maven-plugin.version>1.3.1</exec-maven-plugin.version>
|
||||
<make-maven-plugin.version>1.0-beta-1</make-maven-plugin.version>
|
||||
<native-maven-plugin.version>1.0-alpha-8</native-maven-plugin.version>
|
||||
<surefire.fork.timeout>900</surefire.fork.timeout>
|
||||
</properties>
|
||||
|
||||
<dependencyManagement>
|
||||
|
@ -1172,7 +1173,7 @@
|
|||
<artifactId>maven-surefire-plugin</artifactId>
|
||||
<configuration>
|
||||
<reuseForks>false</reuseForks>
|
||||
<forkedProcessTimeoutInSeconds>900</forkedProcessTimeoutInSeconds>
|
||||
<forkedProcessTimeoutInSeconds>${surefire.fork.timeout}</forkedProcessTimeoutInSeconds>
|
||||
<argLine>${maven-surefire-plugin.argLine}</argLine>
|
||||
<environmentVariables>
|
||||
<HADOOP_COMMON_HOME>${hadoop.common.build.dir}</HADOOP_COMMON_HOME>
|
||||
|
|
|
@ -84,6 +84,9 @@ public class Constants {
|
|||
public static final String MIN_MULTIPART_THRESHOLD = "fs.s3a.multipart.threshold";
|
||||
public static final long DEFAULT_MIN_MULTIPART_THRESHOLD = Integer.MAX_VALUE;
|
||||
|
||||
//enable multiobject-delete calls?
|
||||
public static final String ENABLE_MULTI_DELETE = "fs.s3a.multiobjectdelete.enable";
|
||||
|
||||
// comma separated list of directories
|
||||
public static final String BUFFER_DIR = "fs.s3a.buffer.dir";
|
||||
|
||||
|
|
|
@ -41,6 +41,7 @@ import com.amazonaws.auth.AWSCredentialsProviderChain;
|
|||
import com.amazonaws.auth.InstanceProfileCredentialsProvider;
|
||||
import com.amazonaws.services.s3.AmazonS3Client;
|
||||
import com.amazonaws.services.s3.model.CannedAccessControlList;
|
||||
import com.amazonaws.services.s3.model.DeleteObjectRequest;
|
||||
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
|
||||
import com.amazonaws.services.s3.model.ListObjectsRequest;
|
||||
import com.amazonaws.services.s3.model.ObjectListing;
|
||||
|
@ -85,6 +86,7 @@ public class S3AFileSystem extends FileSystem {
|
|||
private String bucket;
|
||||
private int maxKeys;
|
||||
private long partSize;
|
||||
private boolean enableMultiObjectsDelete;
|
||||
private TransferManager transfers;
|
||||
private ThreadPoolExecutor threadPoolExecutor;
|
||||
private long multiPartThreshold;
|
||||
|
@ -252,6 +254,7 @@ public class S3AFileSystem extends FileSystem {
|
|||
partSize = conf.getLong(MULTIPART_SIZE, DEFAULT_MULTIPART_SIZE);
|
||||
multiPartThreshold = conf.getLong(MIN_MULTIPART_THRESHOLD,
|
||||
DEFAULT_MIN_MULTIPART_THRESHOLD);
|
||||
enableMultiObjectsDelete = conf.getBoolean(ENABLE_MULTI_DELETE, true);
|
||||
|
||||
if (partSize < 5 * 1024 * 1024) {
|
||||
LOG.error(MULTIPART_SIZE + " must be at least 5 MB");
|
||||
|
@ -580,11 +583,7 @@ public class S3AFileSystem extends FileSystem {
|
|||
copyFile(summary.getKey(), newDstKey);
|
||||
|
||||
if (keysToDelete.size() == MAX_ENTRIES_TO_DELETE) {
|
||||
DeleteObjectsRequest deleteRequest =
|
||||
new DeleteObjectsRequest(bucket).withKeys(keysToDelete);
|
||||
s3.deleteObjects(deleteRequest);
|
||||
statistics.incrementWriteOps(1);
|
||||
keysToDelete.clear();
|
||||
removeKeys(keysToDelete, true);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -592,11 +591,8 @@ public class S3AFileSystem extends FileSystem {
|
|||
objects = s3.listNextBatchOfObjects(objects);
|
||||
statistics.incrementReadOps(1);
|
||||
} else {
|
||||
if (keysToDelete.size() > 0) {
|
||||
DeleteObjectsRequest deleteRequest =
|
||||
new DeleteObjectsRequest(bucket).withKeys(keysToDelete);
|
||||
s3.deleteObjects(deleteRequest);
|
||||
statistics.incrementWriteOps(1);
|
||||
if (!keysToDelete.isEmpty()) {
|
||||
removeKeys(keysToDelete, false);
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
@ -610,6 +606,36 @@ public class S3AFileSystem extends FileSystem {
|
|||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* A helper method to delete a list of keys on a s3-backend.
|
||||
*
|
||||
* @param keysToDelete collection of keys to delete on the s3-backend
|
||||
* @param clearKeys clears the keysToDelete-list after processing the list
|
||||
* when set to true
|
||||
*/
|
||||
private void removeKeys(List<DeleteObjectsRequest.KeyVersion> keysToDelete,
|
||||
boolean clearKeys) {
|
||||
if (enableMultiObjectsDelete) {
|
||||
DeleteObjectsRequest deleteRequest
|
||||
= new DeleteObjectsRequest(bucket).withKeys(keysToDelete);
|
||||
s3.deleteObjects(deleteRequest);
|
||||
statistics.incrementWriteOps(1);
|
||||
} else {
|
||||
int writeops = 0;
|
||||
|
||||
for (DeleteObjectsRequest.KeyVersion keyVersion : keysToDelete) {
|
||||
s3.deleteObject(
|
||||
new DeleteObjectRequest(bucket, keyVersion.getKey()));
|
||||
writeops++;
|
||||
}
|
||||
|
||||
statistics.incrementWriteOps(writeops);
|
||||
}
|
||||
if (clearKeys) {
|
||||
keysToDelete.clear();
|
||||
}
|
||||
}
|
||||
|
||||
/** Delete a file.
|
||||
*
|
||||
* @param f the path to delete.
|
||||
|
@ -684,11 +710,7 @@ public class S3AFileSystem extends FileSystem {
|
|||
}
|
||||
|
||||
if (keys.size() == MAX_ENTRIES_TO_DELETE) {
|
||||
DeleteObjectsRequest deleteRequest =
|
||||
new DeleteObjectsRequest(bucket).withKeys(keys);
|
||||
s3.deleteObjects(deleteRequest);
|
||||
statistics.incrementWriteOps(1);
|
||||
keys.clear();
|
||||
removeKeys(keys, true);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -697,10 +719,7 @@ public class S3AFileSystem extends FileSystem {
|
|||
statistics.incrementReadOps(1);
|
||||
} else {
|
||||
if (!keys.isEmpty()) {
|
||||
DeleteObjectsRequest deleteRequest =
|
||||
new DeleteObjectsRequest(bucket).withKeys(keys);
|
||||
s3.deleteObjects(deleteRequest);
|
||||
statistics.incrementWriteOps(1);
|
||||
removeKeys(keys, false);
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -272,6 +272,15 @@ If you do any of these: change your credentials immediately!
|
|||
<description>Threshold before uploads or copies use parallel multipart operations.</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.s3a.multiobjectdelete.enable</name>
|
||||
<value>false</value>
|
||||
<description>When enabled, multiple single-object delete requests are replaced by
|
||||
a single 'delete multiple objects'-request, reducing the number of requests.
|
||||
Beware: legacy S3-compatible object stores might not support this request.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.s3a.acl.default</name>
|
||||
<description>Set a canned ACL for newly created and copied objects. Value may be private,
|
||||
|
|
|
@ -36,13 +36,21 @@ import static org.junit.Assume.assumeTrue;
|
|||
|
||||
/**
|
||||
* Base class for scale tests; here is where the common scale configuration
|
||||
* keys are defined
|
||||
* keys are defined.
|
||||
*/
|
||||
public class S3AScaleTestBase {
|
||||
|
||||
public static final String SCALE_TEST = "scale.test.";
|
||||
|
||||
/**
|
||||
* The number of operations to perform: {@value}
|
||||
*/
|
||||
public static final String KEY_OPERATION_COUNT =
|
||||
SCALE_TEST + "operation.count";
|
||||
|
||||
/**
|
||||
* The default number of operations to perform: {@value}
|
||||
*/
|
||||
public static final long DEFAULT_OPERATION_COUNT = 2005;
|
||||
|
||||
protected S3AFileSystem fs;
|
||||
|
@ -71,6 +79,7 @@ public class S3AScaleTestBase {
|
|||
@Before
|
||||
public void setUp() throws Exception {
|
||||
conf = createConfiguration();
|
||||
LOG.info("Scale test operation count = {}", getOperationCount());
|
||||
fs = S3ATestUtils.createTestFileSystem(conf);
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,40 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.s3a.scale;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.s3a.Constants;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
public class TestS3ADeleteFilesOneByOne extends TestS3ADeleteManyFiles {
|
||||
|
||||
@Override
|
||||
protected Configuration createConfiguration() {
|
||||
Configuration configuration = super.createConfiguration();
|
||||
configuration.setBoolean(Constants.ENABLE_MULTI_DELETE, false);
|
||||
return configuration;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOpenCreate() throws IOException {
|
||||
|
||||
}
|
||||
}
|
|
@ -61,7 +61,7 @@ public class TestS3ADeleteManyFiles extends S3AScaleTestBase {
|
|||
// use Executor to speed up file creation
|
||||
ExecutorService exec = Executors.newFixedThreadPool(16);
|
||||
final ExecutorCompletionService<Boolean> completionService =
|
||||
new ExecutorCompletionService<Boolean>(exec);
|
||||
new ExecutorCompletionService<>(exec);
|
||||
try {
|
||||
final byte[] data = ContractTestUtils.dataset(testBufferSize, 'a', 'z');
|
||||
|
||||
|
|
Loading…
Reference in New Issue