HADOOP-13282. S3 blob etags to be made visible in S3A status/getFileChecksum() calls.
Contributed by Steve Loughran
This commit is contained in:
parent
ef450df443
commit
c8ff0cc304
|
@ -0,0 +1,90 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.fs.store;
|
||||||
|
|
||||||
|
import java.io.DataInput;
|
||||||
|
import java.io.DataOutput;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.FileChecksum;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* An etag as a checksum.
|
||||||
|
* Consider these suitable for checking if an object has changed, but
|
||||||
|
* not suitable for comparing two different objects for equivalence,
|
||||||
|
* especially between object stores.
|
||||||
|
*/
|
||||||
|
public class EtagChecksum extends FileChecksum {
|
||||||
|
|
||||||
|
/** The algorithm name: {@value}. */
|
||||||
|
private static final String ETAG = "etag";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Etag string.
|
||||||
|
*/
|
||||||
|
private String eTag = "";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create with an empty etag.
|
||||||
|
*/
|
||||||
|
public EtagChecksum() {
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create with a string etag.
|
||||||
|
* @param eTag etag
|
||||||
|
*/
|
||||||
|
public EtagChecksum(String eTag) {
|
||||||
|
this.eTag = eTag;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getAlgorithmName() {
|
||||||
|
return ETAG;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getLength() {
|
||||||
|
return eTag.getBytes(StandardCharsets.UTF_8).length;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public byte[] getBytes() {
|
||||||
|
return eTag != null
|
||||||
|
? eTag.getBytes(StandardCharsets.UTF_8)
|
||||||
|
: new byte[0];
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void write(DataOutput out) throws IOException {
|
||||||
|
out.writeUTF(eTag != null ? eTag : "");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void readFields(DataInput in) throws IOException {
|
||||||
|
eTag = in.readUTF();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "etag: \"" + eTag + '"';
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,28 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This package is for classes to be shared across object stores; for internal
|
||||||
|
* use within the hadoop-* modules only. No stability guarantees.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
@InterfaceStability.Unstable
|
||||||
|
package org.apache.hadoop.fs.store;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
|
@ -0,0 +1,85 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.fs.store;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import org.apache.hadoop.io.DataInputBuffer;
|
||||||
|
import org.apache.hadoop.io.DataOutputBuffer;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Unit test of etag operations.
|
||||||
|
*/
|
||||||
|
public class TestEtagChecksum extends Assert {
|
||||||
|
|
||||||
|
private final EtagChecksum empty1 = tag("");
|
||||||
|
private final EtagChecksum empty2 = tag("");
|
||||||
|
private final EtagChecksum valid1 = tag("valid");
|
||||||
|
private final EtagChecksum valid2 = tag("valid");
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testEmptyTagsEqual() {
|
||||||
|
assertEquals(empty1, empty2);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testEmptyTagRoundTrip() throws Throwable {
|
||||||
|
assertEquals(empty1, roundTrip(empty1));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testValidTagsEqual() {
|
||||||
|
assertEquals(valid1, valid2);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testValidTagRoundTrip() throws Throwable {
|
||||||
|
assertEquals(valid1, roundTrip(valid1));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testValidAndEmptyTagsDontMatch() {
|
||||||
|
assertNotEquals(valid1, empty1);
|
||||||
|
assertNotEquals(valid1, tag("other valid one"));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDifferentTagsDontMatch() {
|
||||||
|
assertNotEquals(valid1, tag("other valid one"));
|
||||||
|
}
|
||||||
|
|
||||||
|
private EtagChecksum tag(String t) {
|
||||||
|
return new EtagChecksum(t);
|
||||||
|
}
|
||||||
|
|
||||||
|
private EtagChecksum roundTrip(EtagChecksum tag) throws IOException {
|
||||||
|
try (DataOutputBuffer dob = new DataOutputBuffer();
|
||||||
|
DataInputBuffer dib = new DataInputBuffer()) {
|
||||||
|
tag.write(dob);
|
||||||
|
dib.reset(dob.getData(), dob.getLength());
|
||||||
|
EtagChecksum t2 = new EtagChecksum();
|
||||||
|
t2.readFields(dib);
|
||||||
|
return t2;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -112,6 +112,7 @@ import org.apache.hadoop.fs.s3a.s3guard.PathMetadata;
|
||||||
import org.apache.hadoop.fs.s3a.s3guard.S3Guard;
|
import org.apache.hadoop.fs.s3a.s3guard.S3Guard;
|
||||||
import org.apache.hadoop.fs.s3native.S3xLoginHelper;
|
import org.apache.hadoop.fs.s3native.S3xLoginHelper;
|
||||||
import org.apache.hadoop.io.retry.RetryPolicies;
|
import org.apache.hadoop.io.retry.RetryPolicies;
|
||||||
|
import org.apache.hadoop.fs.store.EtagChecksum;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.util.BlockingThreadPoolExecutorService;
|
import org.apache.hadoop.util.BlockingThreadPoolExecutorService;
|
||||||
import org.apache.hadoop.util.Progressable;
|
import org.apache.hadoop.util.Progressable;
|
||||||
|
@ -538,6 +539,14 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
|
||||||
return inputPolicy;
|
return inputPolicy;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the encryption algorithm of this endpoint.
|
||||||
|
* @return the encryption algorithm.
|
||||||
|
*/
|
||||||
|
public S3AEncryptionMethods getServerSideEncryptionAlgorithm() {
|
||||||
|
return serverSideEncryptionAlgorithm;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Demand create the directory allocator, then create a temporary file.
|
* Demand create the directory allocator, then create a temporary file.
|
||||||
* {@link LocalDirAllocator#createTmpFileForWrite(String, long, Configuration)}.
|
* {@link LocalDirAllocator#createTmpFileForWrite(String, long, Configuration)}.
|
||||||
|
@ -1069,6 +1078,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
|
||||||
* @throws IOException IO and object access problems.
|
* @throws IOException IO and object access problems.
|
||||||
*/
|
*/
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
|
@Retries.RetryRaw
|
||||||
public ObjectMetadata getObjectMetadata(Path path) throws IOException {
|
public ObjectMetadata getObjectMetadata(Path path) throws IOException {
|
||||||
return getObjectMetadata(pathToKey(path));
|
return getObjectMetadata(pathToKey(path));
|
||||||
}
|
}
|
||||||
|
@ -2934,6 +2944,36 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
|
||||||
return super.isFile(f);
|
return super.isFile(f);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the etag of a object at the path via HEAD request and return it
|
||||||
|
* as a checksum object. This has the whatever guarantees about equivalence
|
||||||
|
* the S3 implementation offers.
|
||||||
|
* <ol>
|
||||||
|
* <li>If a tag has not changed, consider the object unchanged.</li>
|
||||||
|
* <li>Two tags being different does not imply the data is different.</li>
|
||||||
|
* </ol>
|
||||||
|
* Different S3 implementations may offer different guarantees.
|
||||||
|
* @param f The file path
|
||||||
|
* @param length The length of the file range for checksum calculation
|
||||||
|
* @return The EtagChecksum or null if checksums are not supported.
|
||||||
|
* @throws IOException IO failure
|
||||||
|
* @see <a href="http://docs.aws.amazon.com/AmazonS3/latest/API/RESTCommonResponseHeaders.html">Common Response Headers</a>
|
||||||
|
*/
|
||||||
|
|
||||||
|
public EtagChecksum getFileChecksum(Path f, final long length)
|
||||||
|
throws IOException {
|
||||||
|
Preconditions.checkArgument(length >= 0);
|
||||||
|
Path path = qualify(f);
|
||||||
|
LOG.debug("getFileChecksum({})", path);
|
||||||
|
return once("getFileChecksum", path.toString(),
|
||||||
|
() -> {
|
||||||
|
// this always does a full HEAD to the object
|
||||||
|
ObjectMetadata headers = getObjectMetadata(path);
|
||||||
|
String eTag = headers.getETag();
|
||||||
|
return eTag != null ? new EtagChecksum(eTag) : null;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* {@inheritDoc}.
|
* {@inheritDoc}.
|
||||||
*
|
*
|
||||||
|
|
|
@ -18,21 +18,24 @@
|
||||||
|
|
||||||
package org.apache.hadoop.fs.s3a;
|
package org.apache.hadoop.fs.s3a;
|
||||||
|
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
|
||||||
import org.apache.hadoop.fs.FileAlreadyExistsException;
|
|
||||||
import org.apache.hadoop.fs.Path;
|
|
||||||
import org.apache.hadoop.fs.contract.ContractTestUtils;
|
|
||||||
import org.apache.hadoop.test.LambdaTestUtils;
|
|
||||||
|
|
||||||
import com.amazonaws.services.s3.model.ObjectMetadata;
|
|
||||||
import com.amazonaws.services.s3.model.PutObjectRequest;
|
|
||||||
import com.amazonaws.services.s3.model.PutObjectResult;
|
|
||||||
import org.junit.Test;
|
|
||||||
|
|
||||||
import java.io.ByteArrayInputStream;
|
import java.io.ByteArrayInputStream;
|
||||||
import java.io.FileNotFoundException;
|
import java.io.FileNotFoundException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.concurrent.Callable;
|
import java.nio.charset.StandardCharsets;
|
||||||
|
|
||||||
|
import com.amazonaws.services.s3.model.ObjectMetadata;
|
||||||
|
import com.amazonaws.services.s3.model.PutObjectRequest;
|
||||||
|
import org.junit.Assume;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
|
import org.apache.hadoop.fs.FileAlreadyExistsException;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.store.EtagChecksum;
|
||||||
|
import org.apache.hadoop.test.LambdaTestUtils;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
|
||||||
|
import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tests of the S3A FileSystem which don't have a specific home and can share
|
* Tests of the S3A FileSystem which don't have a specific home and can share
|
||||||
|
@ -40,6 +43,8 @@ import java.util.concurrent.Callable;
|
||||||
*/
|
*/
|
||||||
public class ITestS3AMiscOperations extends AbstractS3ATestBase {
|
public class ITestS3AMiscOperations extends AbstractS3ATestBase {
|
||||||
|
|
||||||
|
private static final byte[] HELLO = "hello".getBytes(StandardCharsets.UTF_8);
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testCreateNonRecursiveSuccess() throws IOException {
|
public void testCreateNonRecursiveSuccess() throws IOException {
|
||||||
Path shouldWork = path("nonrecursivenode");
|
Path shouldWork = path("nonrecursivenode");
|
||||||
|
@ -58,7 +63,7 @@ public class ITestS3AMiscOperations extends AbstractS3ATestBase {
|
||||||
@Test(expected = FileAlreadyExistsException.class)
|
@Test(expected = FileAlreadyExistsException.class)
|
||||||
public void testCreateNonRecursiveParentIsFile() throws IOException {
|
public void testCreateNonRecursiveParentIsFile() throws IOException {
|
||||||
Path parent = path("/file.txt");
|
Path parent = path("/file.txt");
|
||||||
ContractTestUtils.touch(getFileSystem(), parent);
|
touch(getFileSystem(), parent);
|
||||||
createNonRecursive(new Path(parent, "fail"));
|
createNonRecursive(new Path(parent, "fail"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -73,12 +78,7 @@ public class ITestS3AMiscOperations extends AbstractS3ATestBase {
|
||||||
new ByteArrayInputStream("PUT".getBytes()),
|
new ByteArrayInputStream("PUT".getBytes()),
|
||||||
metadata);
|
metadata);
|
||||||
LambdaTestUtils.intercept(IllegalStateException.class,
|
LambdaTestUtils.intercept(IllegalStateException.class,
|
||||||
new Callable<PutObjectResult>() {
|
() -> fs.putObjectDirect(put));
|
||||||
@Override
|
|
||||||
public PutObjectResult call() throws Exception {
|
|
||||||
return fs.putObjectDirect(put);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
assertPathDoesNotExist("put object was created", path);
|
assertPathDoesNotExist("put object was created", path);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -87,4 +87,103 @@ public class ITestS3AMiscOperations extends AbstractS3ATestBase {
|
||||||
(short) 3, (short) 4096,
|
(short) 3, (short) 4096,
|
||||||
null);
|
null);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Touch a path, return the full path.
|
||||||
|
* @param name relative name
|
||||||
|
* @return the path
|
||||||
|
* @throws IOException IO failure
|
||||||
|
*/
|
||||||
|
Path touchFile(String name) throws IOException {
|
||||||
|
Path path = path(name);
|
||||||
|
touch(getFileSystem(), path);
|
||||||
|
return path;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a file with the data, return the path.
|
||||||
|
* @param name relative name
|
||||||
|
* @param data data to write
|
||||||
|
* @return the path
|
||||||
|
* @throws IOException IO failure
|
||||||
|
*/
|
||||||
|
Path mkFile(String name, byte[] data) throws IOException {
|
||||||
|
final Path f = path(name);
|
||||||
|
createFile(getFileSystem(), f, true, data);
|
||||||
|
return f;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The assumption here is that 0-byte files uploaded in a single PUT
|
||||||
|
* always have the same checksum, including stores with encryption.
|
||||||
|
* @throws Throwable on a failure
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testEmptyFileChecksums() throws Throwable {
|
||||||
|
final S3AFileSystem fs = getFileSystem();
|
||||||
|
Path file1 = touchFile("file1");
|
||||||
|
EtagChecksum checksum1 = fs.getFileChecksum(file1, 0);
|
||||||
|
LOG.info("Checksum for {}: {}", file1, checksum1);
|
||||||
|
assertNotNull("file 1 checksum", checksum1);
|
||||||
|
assertNotEquals("file 1 checksum", 0, checksum1.getLength());
|
||||||
|
assertEquals("checksums", checksum1,
|
||||||
|
fs.getFileChecksum(touchFile("file2"), 0));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Verify that different file contents have different
|
||||||
|
* checksums, and that that they aren't the same as the empty file.
|
||||||
|
* @throws Throwable failure
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testNonEmptyFileChecksums() throws Throwable {
|
||||||
|
final S3AFileSystem fs = getFileSystem();
|
||||||
|
final Path file3 = mkFile("file3", HELLO);
|
||||||
|
final EtagChecksum checksum1 = fs.getFileChecksum(file3, 0);
|
||||||
|
assertNotNull("file 3 checksum", checksum1);
|
||||||
|
final Path file4 = touchFile("file4");
|
||||||
|
final EtagChecksum checksum2 = fs.getFileChecksum(file4, 0);
|
||||||
|
assertNotEquals("checksums", checksum1, checksum2);
|
||||||
|
// overwrite
|
||||||
|
createFile(fs, file4, true,
|
||||||
|
"hello, world".getBytes(StandardCharsets.UTF_8));
|
||||||
|
assertNotEquals(checksum2, fs.getFileChecksum(file4, 0));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Verify that on an unencrypted store, the checksum of two non-empty
|
||||||
|
* (single PUT) files is the same if the data is the same.
|
||||||
|
* This will fail if the bucket has S3 default encryption enabled.
|
||||||
|
* @throws Throwable failure
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testNonEmptyFileChecksumsUnencrypted() throws Throwable {
|
||||||
|
Assume.assumeTrue(encryptionAlgorithm().equals(S3AEncryptionMethods.NONE));
|
||||||
|
final S3AFileSystem fs = getFileSystem();
|
||||||
|
final EtagChecksum checksum1 =
|
||||||
|
fs.getFileChecksum(mkFile("file5", HELLO), 0);
|
||||||
|
assertNotNull("file 3 checksum", checksum1);
|
||||||
|
assertEquals("checksums", checksum1,
|
||||||
|
fs.getFileChecksum(mkFile("file6", HELLO), 0));
|
||||||
|
}
|
||||||
|
|
||||||
|
private S3AEncryptionMethods encryptionAlgorithm() {
|
||||||
|
return getFileSystem().getServerSideEncryptionAlgorithm();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNegativeLength() throws Throwable {
|
||||||
|
LambdaTestUtils.intercept(IllegalArgumentException.class,
|
||||||
|
() -> getFileSystem().getFileChecksum(mkFile("negative", HELLO), -1));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testLengthPastEOF() throws Throwable {
|
||||||
|
final S3AFileSystem fs = getFileSystem();
|
||||||
|
Path f = mkFile("file5", HELLO);
|
||||||
|
assertEquals(
|
||||||
|
fs.getFileChecksum(f, HELLO.length),
|
||||||
|
fs.getFileChecksum(f, HELLO.length * 2));
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue