HADOOP-16193. Add extra S3A MPU test to see what happens if a file is created during the MPU. Contributed by Steve Loughran
This commit is contained in:
parent
ee7c261e1e
commit
69ddb36876
|
@ -17,20 +17,31 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.fs.contract.s3a;
|
package org.apache.hadoop.fs.contract.s3a;
|
||||||
|
|
||||||
|
import java.io.ByteArrayInputStream;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.junit.Test;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
|
import org.apache.hadoop.fs.MultipartUploader;
|
||||||
|
import org.apache.hadoop.fs.PartHandle;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.UploadHandle;
|
||||||
import org.apache.hadoop.fs.contract.AbstractFSContract;
|
import org.apache.hadoop.fs.contract.AbstractFSContract;
|
||||||
import org.apache.hadoop.fs.contract.AbstractContractMultipartUploaderTest;
|
import org.apache.hadoop.fs.contract.AbstractContractMultipartUploaderTest;
|
||||||
import org.apache.hadoop.fs.contract.ContractTestUtils;
|
import org.apache.hadoop.fs.contract.ContractTestUtils;
|
||||||
import org.apache.hadoop.fs.s3a.S3AFileSystem;
|
import org.apache.hadoop.fs.s3a.S3AFileSystem;
|
||||||
import org.apache.hadoop.fs.s3a.WriteOperationHelper;
|
import org.apache.hadoop.fs.s3a.WriteOperationHelper;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
|
||||||
import static org.apache.hadoop.fs.s3a.S3ATestConstants.*;
|
import static org.apache.hadoop.fs.s3a.S3ATestConstants.*;
|
||||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
|
import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
|
||||||
import static org.apache.hadoop.fs.s3a.scale.AbstractSTestS3AHugeFiles.DEFAULT_HUGE_PARTITION_SIZE;
|
import static org.apache.hadoop.fs.s3a.scale.AbstractSTestS3AHugeFiles.DEFAULT_HUGE_PARTITION_SIZE;
|
||||||
|
import static org.apache.hadoop.test.LambdaTestUtils.eventually;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test MultipartUploader with S3A.
|
* Test MultipartUploader with S3A.
|
||||||
|
@ -159,4 +170,47 @@ public class ITestS3AContractMultipartUploader extends
|
||||||
public void testMultipartUploadReverseOrder() throws Exception {
|
public void testMultipartUploadReverseOrder() throws Exception {
|
||||||
ContractTestUtils.skip("skipped for speed");
|
ContractTestUtils.skip("skipped for speed");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This creates and then deletes a zero-byte file while an upload
|
||||||
|
* is in progress, and verifies that the uploaded file is ultimately
|
||||||
|
* visible.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testMultipartOverlapWithTransientFile() throws Throwable {
|
||||||
|
// until there's a way to explicitly ask for a multipart uploader from a
|
||||||
|
// specific FS, explicitly create one bonded to the raw FS.
|
||||||
|
describe("testMultipartOverlapWithTransientFile");
|
||||||
|
S3AFileSystem fs = getFileSystem();
|
||||||
|
Path path = path("testMultipartOverlapWithTransientFile");
|
||||||
|
fs.delete(path, true);
|
||||||
|
MultipartUploader mpu = mpu(1);
|
||||||
|
UploadHandle upload1 = mpu.initialize(path);
|
||||||
|
byte[] dataset = dataset(1024, '0', 10);
|
||||||
|
final Map<Integer, PartHandle> handles = new HashMap<>();
|
||||||
|
LOG.info("Uploading multipart entry");
|
||||||
|
PartHandle value = mpu.putPart(path, new ByteArrayInputStream(dataset), 1,
|
||||||
|
upload1,
|
||||||
|
dataset.length);
|
||||||
|
// upload 1K
|
||||||
|
handles.put(1, value);
|
||||||
|
// confirm the path is absent
|
||||||
|
ContractTestUtils.assertPathDoesNotExist(fs,
|
||||||
|
"path being uploaded", path);
|
||||||
|
// now create an empty file
|
||||||
|
ContractTestUtils.touch(fs, path);
|
||||||
|
final FileStatus touchStatus = fs.getFileStatus(path);
|
||||||
|
LOG.info("0-byte file has been created: {}", touchStatus);
|
||||||
|
fs.delete(path, false);
|
||||||
|
// now complete the upload
|
||||||
|
mpu.complete(path, handles, upload1);
|
||||||
|
|
||||||
|
// wait for the data to arrive
|
||||||
|
eventually(timeToBecomeConsistentMillis(), 500, () -> {
|
||||||
|
FileStatus mpuStatus = fs.getFileStatus(path);
|
||||||
|
assertTrue("File is empty in " + mpuStatus, mpuStatus.getLen() > 0);
|
||||||
|
return mpuStatus;
|
||||||
|
});
|
||||||
|
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue