HADOOP-17414. Magic committer files don't have the count of bytes written collected by spark (#2530)
This needs SPARK-33739 in the matching spark branch in order to work Contributed by Steve Loughran. Change-Id: I4fe75b057159e35aacc072da3cb7343467c0c3f1
This commit is contained in:
parent
bd85f6acea
commit
fb603e81f0
|
@ -18,6 +18,8 @@
|
|||
|
||||
package org.apache.hadoop.fs.statistics;
|
||||
|
||||
import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.stubDurationTracker;
|
||||
|
||||
/**
|
||||
* Interface for a source of duration tracking.
|
||||
*
|
||||
|
@ -36,12 +38,16 @@ public interface DurationTrackerFactory {
|
|||
* by the given count.
|
||||
*
|
||||
* The expected use is within a try-with-resources clause.
|
||||
*
|
||||
* The default implementation returns a stub duration tracker.
|
||||
* @param key statistic key prefix
|
||||
* @param count #of times to increment the matching counter in this
|
||||
* operation.
|
||||
* @return an object to close after an operation completes.
|
||||
*/
|
||||
DurationTracker trackDuration(String key, long count);
|
||||
default DurationTracker trackDuration(String key, long count) {
|
||||
return stubDurationTracker();
|
||||
}
|
||||
|
||||
/**
|
||||
* Initiate a duration tracking operation by creating/returning
|
||||
|
|
|
@ -130,6 +130,24 @@ public final class StoreStatisticNames {
|
|||
/** {@value}. */
|
||||
public static final String OP_TRUNCATE = "op_truncate";
|
||||
|
||||
/* The XAttr API */
|
||||
|
||||
/** Invoke {@code getXAttrs(Path path)}: {@value}. */
|
||||
public static final String OP_XATTR_GET_MAP = "op_xattr_get_map";
|
||||
|
||||
/** Invoke {@code getXAttr(Path, String)}: {@value}. */
|
||||
public static final String OP_XATTR_GET_NAMED = "op_xattr_get_named";
|
||||
|
||||
/**
|
||||
* Invoke {@code getXAttrs(Path path, List<String> names)}: {@value}.
|
||||
*/
|
||||
public static final String OP_XATTR_GET_NAMED_MAP =
|
||||
"op_xattr_get_named_map";
|
||||
|
||||
/** Invoke {@code listXAttrs(Path path)}: {@value}. */
|
||||
public static final String OP_XATTR_LIST = "op_xattr_list";
|
||||
|
||||
|
||||
/** {@value}. */
|
||||
public static final String DELEGATION_TOKENS_ISSUED
|
||||
= "delegation_tokens_issued";
|
||||
|
|
|
@ -1048,4 +1048,10 @@ public final class Constants {
|
|||
public static final String STORE_CAPABILITY_DIRECTORY_MARKER_ACTION_DELETE
|
||||
= "fs.s3a.capability.directory.marker.action.delete";
|
||||
|
||||
/**
|
||||
* To comply with the XAttr rules, all headers of the object retrieved
|
||||
* through the getXAttr APIs have the prefix: {@value}.
|
||||
*/
|
||||
public static final String XA_HEADER_PREFIX = "header.";
|
||||
|
||||
}
|
||||
|
|
|
@ -447,7 +447,7 @@ class S3ABlockOutputStream extends OutputStream implements
|
|||
final PutObjectRequest putObjectRequest = uploadData.hasFile() ?
|
||||
writeOperationHelper.createPutObjectRequest(key, uploadData.getFile())
|
||||
: writeOperationHelper.createPutObjectRequest(key,
|
||||
uploadData.getUploadStream(), size);
|
||||
uploadData.getUploadStream(), size, null);
|
||||
BlockUploadProgress callback =
|
||||
new BlockUploadProgress(
|
||||
block, progressListener, now());
|
||||
|
|
|
@ -108,6 +108,7 @@ import org.apache.hadoop.fs.s3a.impl.CopyOutcome;
|
|||
import org.apache.hadoop.fs.s3a.impl.DeleteOperation;
|
||||
import org.apache.hadoop.fs.s3a.impl.DirectoryPolicy;
|
||||
import org.apache.hadoop.fs.s3a.impl.DirectoryPolicyImpl;
|
||||
import org.apache.hadoop.fs.s3a.impl.HeaderProcessing;
|
||||
import org.apache.hadoop.fs.s3a.impl.InternalConstants;
|
||||
import org.apache.hadoop.fs.s3a.impl.ListingOperationCallbacks;
|
||||
import org.apache.hadoop.fs.s3a.impl.MultiObjectDeleteSupport;
|
||||
|
@ -331,6 +332,11 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|||
*/
|
||||
private DirectoryPolicy directoryPolicy;
|
||||
|
||||
/**
|
||||
* Header processing for XAttr.
|
||||
*/
|
||||
private HeaderProcessing headerProcessing;
|
||||
|
||||
/**
|
||||
* Context accessors for re-use.
|
||||
*/
|
||||
|
@ -457,6 +463,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|||
magicCommitterEnabled ? "is" : "is not");
|
||||
committerIntegration = new MagicCommitIntegration(
|
||||
this, magicCommitterEnabled);
|
||||
// header processing for rename and magic committer
|
||||
headerProcessing = new HeaderProcessing(createStoreContext());
|
||||
|
||||
// instantiate S3 Select support
|
||||
selectBinding = new SelectBinding(writeHelper);
|
||||
|
@ -1781,14 +1789,15 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|||
|
||||
/**
|
||||
* Low-level call to get at the object metadata.
|
||||
* @param path path to the object
|
||||
* @param path path to the object. This will be qualified.
|
||||
* @return metadata
|
||||
* @throws IOException IO and object access problems.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
@Retries.RetryTranslated
|
||||
public ObjectMetadata getObjectMetadata(Path path) throws IOException {
|
||||
return getObjectMetadata(path, null, invoker, null);
|
||||
return getObjectMetadata(makeQualified(path), null, invoker,
|
||||
"getObjectMetadata");
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1800,31 +1809,17 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|||
* @return metadata
|
||||
* @throws IOException IO and object access problems.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
@Retries.RetryTranslated
|
||||
public ObjectMetadata getObjectMetadata(Path path,
|
||||
private ObjectMetadata getObjectMetadata(Path path,
|
||||
ChangeTracker changeTracker, Invoker changeInvoker, String operation)
|
||||
throws IOException {
|
||||
checkNotClosed();
|
||||
return once("getObjectMetadata", path.toString(),
|
||||
String key = pathToKey(path);
|
||||
return once(operation, path.toString(),
|
||||
() ->
|
||||
// this always does a full HEAD to the object
|
||||
getObjectMetadata(
|
||||
pathToKey(path), changeTracker, changeInvoker, operation));
|
||||
}
|
||||
|
||||
/**
|
||||
* Get all the headers of the object of a path, if the object exists.
|
||||
* @param path path to probe
|
||||
* @return an immutable map of object headers.
|
||||
* @throws IOException failure of the query
|
||||
*/
|
||||
@Retries.RetryTranslated
|
||||
public Map<String, Object> getObjectHeaders(Path path) throws IOException {
|
||||
LOG.debug("getObjectHeaders({})", path);
|
||||
checkNotClosed();
|
||||
incrementReadOperations();
|
||||
return getObjectMetadata(path).getRawMetadata();
|
||||
key, changeTracker, changeInvoker, operation));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -2021,7 +2016,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|||
@Retries.RetryRaw
|
||||
@VisibleForTesting
|
||||
ObjectMetadata getObjectMetadata(String key) throws IOException {
|
||||
return getObjectMetadata(key, null, invoker,null);
|
||||
return getObjectMetadata(key, null, invoker, "getObjectMetadata");
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -4099,59 +4094,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|||
* @return a copy of {@link ObjectMetadata} with only relevant attributes
|
||||
*/
|
||||
private ObjectMetadata cloneObjectMetadata(ObjectMetadata source) {
|
||||
// This approach may be too brittle, especially if
|
||||
// in future there are new attributes added to ObjectMetadata
|
||||
// that we do not explicitly call to set here
|
||||
ObjectMetadata ret = newObjectMetadata(source.getContentLength());
|
||||
|
||||
// Possibly null attributes
|
||||
// Allowing nulls to pass breaks it during later use
|
||||
if (source.getCacheControl() != null) {
|
||||
ret.setCacheControl(source.getCacheControl());
|
||||
}
|
||||
if (source.getContentDisposition() != null) {
|
||||
ret.setContentDisposition(source.getContentDisposition());
|
||||
}
|
||||
if (source.getContentEncoding() != null) {
|
||||
ret.setContentEncoding(source.getContentEncoding());
|
||||
}
|
||||
if (source.getContentMD5() != null) {
|
||||
ret.setContentMD5(source.getContentMD5());
|
||||
}
|
||||
if (source.getContentType() != null) {
|
||||
ret.setContentType(source.getContentType());
|
||||
}
|
||||
if (source.getExpirationTime() != null) {
|
||||
ret.setExpirationTime(source.getExpirationTime());
|
||||
}
|
||||
if (source.getExpirationTimeRuleId() != null) {
|
||||
ret.setExpirationTimeRuleId(source.getExpirationTimeRuleId());
|
||||
}
|
||||
if (source.getHttpExpiresDate() != null) {
|
||||
ret.setHttpExpiresDate(source.getHttpExpiresDate());
|
||||
}
|
||||
if (source.getLastModified() != null) {
|
||||
ret.setLastModified(source.getLastModified());
|
||||
}
|
||||
if (source.getOngoingRestore() != null) {
|
||||
ret.setOngoingRestore(source.getOngoingRestore());
|
||||
}
|
||||
if (source.getRestoreExpirationTime() != null) {
|
||||
ret.setRestoreExpirationTime(source.getRestoreExpirationTime());
|
||||
}
|
||||
if (source.getSSEAlgorithm() != null) {
|
||||
ret.setSSEAlgorithm(source.getSSEAlgorithm());
|
||||
}
|
||||
if (source.getSSECustomerAlgorithm() != null) {
|
||||
ret.setSSECustomerAlgorithm(source.getSSECustomerAlgorithm());
|
||||
}
|
||||
if (source.getSSECustomerKeyMd5() != null) {
|
||||
ret.setSSECustomerKeyMd5(source.getSSECustomerKeyMd5());
|
||||
}
|
||||
|
||||
for (Map.Entry<String, String> e : source.getUserMetadata().entrySet()) {
|
||||
ret.addUserMetadata(e.getKey(), e.getValue());
|
||||
}
|
||||
getHeaderProcessing().cloneObjectMetadata(source, ret);
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
@ -4382,6 +4326,37 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get header processing support.
|
||||
* @return the header processing of this instance.
|
||||
*/
|
||||
private HeaderProcessing getHeaderProcessing() {
|
||||
return headerProcessing;
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] getXAttr(final Path path, final String name)
|
||||
throws IOException {
|
||||
return getHeaderProcessing().getXAttr(path, name);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, byte[]> getXAttrs(final Path path) throws IOException {
|
||||
return getHeaderProcessing().getXAttrs(path);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, byte[]> getXAttrs(final Path path,
|
||||
final List<String> names)
|
||||
throws IOException {
|
||||
return getHeaderProcessing().getXAttrs(path, names);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> listXAttrs(final Path path) throws IOException {
|
||||
return getHeaderProcessing().listXAttrs(path);
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}.
|
||||
*
|
||||
|
@ -5088,5 +5063,11 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|||
return S3AFileSystem.this.makeQualified(path);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ObjectMetadata getObjectMetadata(final String key)
|
||||
throws IOException {
|
||||
return once("getObjectMetadata", key, () ->
|
||||
S3AFileSystem.this.getObjectMetadata(key));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -120,7 +120,7 @@ import static org.apache.hadoop.fs.s3a.Statistic.*;
|
|||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Evolving
|
||||
public class S3AInstrumentation implements Closeable, MetricsSource,
|
||||
CountersAndGauges, IOStatisticsSource, DurationTrackerFactory {
|
||||
CountersAndGauges, IOStatisticsSource {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(
|
||||
S3AInstrumentation.class);
|
||||
|
||||
|
|
|
@ -157,6 +157,24 @@ public enum Statistic {
|
|||
"Calls of rename()",
|
||||
TYPE_COUNTER),
|
||||
|
||||
/* The XAttr API metrics are all durations */
|
||||
INVOCATION_XATTR_GET_MAP(
|
||||
StoreStatisticNames.OP_XATTR_GET_MAP,
|
||||
"Calls of getXAttrs(Path path)",
|
||||
TYPE_DURATION),
|
||||
INVOCATION_XATTR_GET_NAMED(
|
||||
StoreStatisticNames.OP_XATTR_GET_NAMED,
|
||||
"Calls of getXAttr(Path, String)",
|
||||
TYPE_DURATION),
|
||||
INVOCATION_XATTR_GET_NAMED_MAP(
|
||||
StoreStatisticNames.OP_XATTR_GET_NAMED_MAP,
|
||||
"Calls of xattr()",
|
||||
TYPE_DURATION),
|
||||
INVOCATION_OP_XATTR_LIST(
|
||||
StoreStatisticNames.OP_XATTR_LIST,
|
||||
"Calls of getXAttrs(Path path, List<String> names)",
|
||||
TYPE_DURATION),
|
||||
|
||||
/* Object IO */
|
||||
OBJECT_COPY_REQUESTS(StoreStatisticNames.OBJECT_COPY_REQUESTS,
|
||||
"Object copy requests",
|
||||
|
|
|
@ -25,6 +25,7 @@ import java.io.IOException;
|
|||
import java.io.InputStream;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import com.amazonaws.services.s3.model.AmazonS3Exception;
|
||||
|
@ -172,12 +173,19 @@ public class WriteOperationHelper implements WriteOperations {
|
|||
* @param destKey destination key
|
||||
* @param inputStream source data.
|
||||
* @param length size, if known. Use -1 for not known
|
||||
* @param headers optional map of custom headers.
|
||||
* @return the request
|
||||
*/
|
||||
public PutObjectRequest createPutObjectRequest(String destKey,
|
||||
InputStream inputStream, long length) {
|
||||
InputStream inputStream,
|
||||
long length,
|
||||
final Map<String, String> headers) {
|
||||
ObjectMetadata objectMetadata = newObjectMetadata(length);
|
||||
if (headers != null) {
|
||||
objectMetadata.setUserMetadata(headers);
|
||||
}
|
||||
return owner.newPutObjectRequest(destKey,
|
||||
newObjectMetadata(length),
|
||||
objectMetadata,
|
||||
inputStream);
|
||||
}
|
||||
|
||||
|
|
|
@ -24,6 +24,7 @@ import java.io.FileNotFoundException;
|
|||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import com.amazonaws.services.s3.model.CompleteMultipartUploadResult;
|
||||
|
@ -77,10 +78,13 @@ public interface WriteOperations {
|
|||
* @param destKey destination key
|
||||
* @param inputStream source data.
|
||||
* @param length size, if known. Use -1 for not known
|
||||
* @param headers optional map of custom headers.
|
||||
* @return the request
|
||||
*/
|
||||
PutObjectRequest createPutObjectRequest(String destKey,
|
||||
InputStream inputStream, long length);
|
||||
InputStream inputStream,
|
||||
long length,
|
||||
@Nullable Map<String, String> headers);
|
||||
|
||||
/**
|
||||
* Create a {@link PutObjectRequest} request to upload a file.
|
||||
|
|
|
@ -21,6 +21,7 @@ package org.apache.hadoop.fs.s3a.commit;
|
|||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
|
||||
import static org.apache.hadoop.fs.s3a.Constants.XA_HEADER_PREFIX;
|
||||
import static org.apache.hadoop.mapreduce.lib.output.PathOutputCommitterFactory.COMMITTER_FACTORY_SCHEME_PATTERN;
|
||||
|
||||
/**
|
||||
|
@ -316,4 +317,17 @@ public final class CommitConstants {
|
|||
public static final boolean DEFAULT_S3A_COMMITTER_GENERATE_UUID =
|
||||
false;
|
||||
|
||||
/**
|
||||
* Magic Marker header to declare final file length on magic uploads
|
||||
* marker objects: {@value}.
|
||||
*/
|
||||
public static final String X_HEADER_MAGIC_MARKER =
|
||||
"x-hadoop-s3a-magic-data-length";
|
||||
|
||||
/**
|
||||
* XAttr name of magic marker, with "header." prefix: {@value}.
|
||||
*/
|
||||
public static final String XA_MAGIC_MARKER = XA_HEADER_PREFIX
|
||||
+ X_HEADER_MAGIC_MARKER;
|
||||
|
||||
}
|
||||
|
|
|
@ -26,6 +26,7 @@ import java.io.IOException;
|
|||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
|
@ -39,6 +40,7 @@ import org.slf4j.LoggerFactory;
|
|||
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.LocatedFileStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.PathFilter;
|
||||
|
@ -50,6 +52,7 @@ import org.apache.hadoop.fs.s3a.WriteOperationHelper;
|
|||
import org.apache.hadoop.fs.s3a.commit.files.PendingSet;
|
||||
import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
|
||||
import org.apache.hadoop.fs.s3a.commit.files.SuccessData;
|
||||
import org.apache.hadoop.fs.s3a.impl.HeaderProcessing;
|
||||
import org.apache.hadoop.fs.s3a.impl.InternalConstants;
|
||||
import org.apache.hadoop.fs.s3a.s3guard.BulkOperationState;
|
||||
import org.apache.hadoop.fs.s3a.statistics.CommitterStatistics;
|
||||
|
@ -606,6 +609,29 @@ public class CommitOperations implements IOStatisticsSource {
|
|||
return new CommitContext(writeOperations.initiateCommitOperation(path));
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the magic file length of a file.
|
||||
* If the FS doesn't support the API, the attribute is missing or
|
||||
* the parse to long fails, then Optional.empty() is returned.
|
||||
* Static for some easier testability.
|
||||
* @param fs filesystem
|
||||
* @param path path
|
||||
* @return either a length or None.
|
||||
* @throws IOException on error
|
||||
* */
|
||||
public static Optional<Long> extractMagicFileLength(FileSystem fs, Path path)
|
||||
throws IOException {
|
||||
byte[] bytes;
|
||||
try {
|
||||
bytes = fs.getXAttr(path, XA_MAGIC_MARKER);
|
||||
} catch (UnsupportedOperationException e) {
|
||||
// FS doesn't support xattr.
|
||||
LOG.debug("Filesystem {} doesn't support XAttr API", fs);
|
||||
return Optional.empty();
|
||||
}
|
||||
return HeaderProcessing.extractXAttrLongValue(bytes);
|
||||
}
|
||||
|
||||
/**
|
||||
* Commit context.
|
||||
*
|
||||
|
|
|
@ -20,7 +20,9 @@ package org.apache.hadoop.fs.s3a.commit.magic;
|
|||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import com.amazonaws.services.s3.model.PartETag;
|
||||
import com.amazonaws.services.s3.model.PutObjectRequest;
|
||||
|
@ -37,6 +39,8 @@ import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
|
|||
import org.apache.hadoop.fs.statistics.IOStatistics;
|
||||
import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot;
|
||||
|
||||
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.X_HEADER_MAGIC_MARKER;
|
||||
|
||||
/**
|
||||
* Put tracker for Magic commits.
|
||||
* <p>Important</p>: must not directly or indirectly import a class which
|
||||
|
@ -122,13 +126,6 @@ public class MagicCommitTracker extends PutTracker {
|
|||
Preconditions.checkArgument(!parts.isEmpty(),
|
||||
"No uploaded parts to save");
|
||||
|
||||
// put a 0-byte file with the name of the original under-magic path
|
||||
PutObjectRequest originalDestPut = writer.createPutObjectRequest(
|
||||
originalDestKey,
|
||||
new ByteArrayInputStream(EMPTY),
|
||||
0);
|
||||
writer.uploadObject(originalDestPut);
|
||||
|
||||
// build the commit summary
|
||||
SinglePendingCommit commitData = new SinglePendingCommit();
|
||||
commitData.touch(System.currentTimeMillis());
|
||||
|
@ -150,9 +147,19 @@ public class MagicCommitTracker extends PutTracker {
|
|||
PutObjectRequest put = writer.createPutObjectRequest(
|
||||
pendingPartKey,
|
||||
new ByteArrayInputStream(bytes),
|
||||
bytes.length);
|
||||
bytes.length, null);
|
||||
writer.uploadObject(put);
|
||||
|
||||
// Add the final file length as a header
|
||||
Map<String, String> headers = new HashMap<>();
|
||||
headers.put(X_HEADER_MAGIC_MARKER, Long.toString(bytesWritten));
|
||||
// now put a 0-byte file with the name of the original under-magic path
|
||||
PutObjectRequest originalDestPut = writer.createPutObjectRequest(
|
||||
originalDestKey,
|
||||
new ByteArrayInputStream(EMPTY),
|
||||
0,
|
||||
headers);
|
||||
writer.uploadObject(originalDestPut);
|
||||
return false;
|
||||
}
|
||||
|
||||
|
|
|
@ -22,6 +22,8 @@ import java.io.File;
|
|||
import java.io.IOException;
|
||||
import java.nio.file.AccessDeniedException;
|
||||
|
||||
import com.amazonaws.services.s3.model.ObjectMetadata;
|
||||
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.s3a.Retries;
|
||||
|
||||
|
@ -81,4 +83,15 @@ public interface ContextAccessors {
|
|||
* @return possibly new path.
|
||||
*/
|
||||
Path makeQualified(Path path);
|
||||
|
||||
/**
|
||||
* Retrieve the object metadata.
|
||||
*
|
||||
* @param key key to retrieve.
|
||||
* @return metadata
|
||||
* @throws IOException IO and object access problems.
|
||||
*/
|
||||
@Retries.RetryTranslated
|
||||
ObjectMetadata getObjectMetadata(String key) throws IOException;
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,500 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.s3a.impl;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.TreeMap;
|
||||
|
||||
import com.amazonaws.services.s3.Headers;
|
||||
import com.amazonaws.services.s3.model.ObjectMetadata;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.s3a.Statistic;
|
||||
import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext;
|
||||
|
||||
import static org.apache.hadoop.fs.s3a.Constants.XA_HEADER_PREFIX;
|
||||
import static org.apache.hadoop.fs.s3a.Statistic.INVOCATION_OP_XATTR_LIST;
|
||||
import static org.apache.hadoop.fs.s3a.Statistic.INVOCATION_XATTR_GET_MAP;
|
||||
import static org.apache.hadoop.fs.s3a.Statistic.INVOCATION_XATTR_GET_NAMED;
|
||||
import static org.apache.hadoop.fs.s3a.Statistic.INVOCATION_XATTR_GET_NAMED_MAP;
|
||||
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.X_HEADER_MAGIC_MARKER;
|
||||
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration;
|
||||
|
||||
/**
|
||||
* Part of the S3A FS where object headers are
|
||||
* processed.
|
||||
* Implements all the various XAttr read operations.
|
||||
* Those APIs all expect byte arrays back.
|
||||
* Metadata cloning is also implemented here, so as
|
||||
* to stay in sync with custom header logic.
|
||||
*
|
||||
* The standard header names are extracted from the AWS SDK.
|
||||
* The S3A connector does not (currently) support setting them,
|
||||
* though it would be possible to do so through the createFile()
|
||||
* builder API.
|
||||
*/
|
||||
public class HeaderProcessing extends AbstractStoreOperation {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(
|
||||
HeaderProcessing.class);
|
||||
|
||||
/**
|
||||
* An empty buffer.
|
||||
*/
|
||||
private static final byte[] EMPTY = new byte[0];
|
||||
|
||||
|
||||
/**
|
||||
* Standard HTTP header found on some S3 objects: {@value}.
|
||||
*/
|
||||
public static final String XA_CACHE_CONTROL =
|
||||
XA_HEADER_PREFIX + Headers.CACHE_CONTROL;
|
||||
/**
|
||||
* Standard HTTP header found on some S3 objects: {@value}.
|
||||
*/
|
||||
public static final String XA_CONTENT_DISPOSITION =
|
||||
XA_HEADER_PREFIX + Headers.CONTENT_DISPOSITION;
|
||||
|
||||
/**
|
||||
* Standard HTTP header found on some S3 objects: {@value}.
|
||||
*/
|
||||
public static final String XA_CONTENT_ENCODING =
|
||||
XA_HEADER_PREFIX + Headers.CONTENT_ENCODING;
|
||||
|
||||
/**
|
||||
* Standard HTTP header found on some S3 objects: {@value}.
|
||||
*/
|
||||
public static final String XA_CONTENT_LANGUAGE =
|
||||
XA_HEADER_PREFIX + Headers.CONTENT_LANGUAGE;
|
||||
|
||||
/**
|
||||
* Length XAttr: {@value}.
|
||||
*/
|
||||
public static final String XA_CONTENT_LENGTH =
|
||||
XA_HEADER_PREFIX + Headers.CONTENT_LENGTH;
|
||||
|
||||
/**
|
||||
* Standard HTTP header found on some S3 objects: {@value}.
|
||||
*/
|
||||
public static final String XA_CONTENT_MD5 =
|
||||
XA_HEADER_PREFIX + Headers.CONTENT_MD5;
|
||||
|
||||
/**
|
||||
* Content range: {@value}.
|
||||
* This is returned on GET requests with ranges.
|
||||
*/
|
||||
public static final String XA_CONTENT_RANGE =
|
||||
XA_HEADER_PREFIX + Headers.CONTENT_RANGE;
|
||||
|
||||
/**
|
||||
* Content type: may be set when uploading.
|
||||
* {@value}.
|
||||
*/
|
||||
public static final String XA_CONTENT_TYPE =
|
||||
XA_HEADER_PREFIX + Headers.CONTENT_TYPE;
|
||||
|
||||
/**
|
||||
* Etag Header {@value}.
|
||||
* Also accessible via {@code ObjectMetadata.getEtag()}, where
|
||||
* it can be retrieved via {@code getFileChecksum(path)} if
|
||||
* the S3A connector is enabled.
|
||||
*/
|
||||
public static final String XA_ETAG = XA_HEADER_PREFIX + Headers.ETAG;
|
||||
|
||||
|
||||
/**
|
||||
* last modified XAttr: {@value}.
|
||||
*/
|
||||
public static final String XA_LAST_MODIFIED =
|
||||
XA_HEADER_PREFIX + Headers.LAST_MODIFIED;
|
||||
|
||||
/* AWS Specific Headers. May not be found on other S3 endpoints. */
|
||||
|
||||
/**
|
||||
* object archive status; empty if not on S3 Glacier
|
||||
* (i.e all normal files should be non-archived as
|
||||
* S3A and applications don't handle archived data)
|
||||
* Value {@value}.
|
||||
*/
|
||||
public static final String XA_ARCHIVE_STATUS =
|
||||
XA_HEADER_PREFIX + Headers.ARCHIVE_STATUS;
|
||||
|
||||
/**
|
||||
* Object legal hold status. {@value}.
|
||||
*/
|
||||
public static final String XA_OBJECT_LOCK_LEGAL_HOLD_STATUS =
|
||||
XA_HEADER_PREFIX + Headers.OBJECT_LOCK_LEGAL_HOLD_STATUS;
|
||||
|
||||
/**
|
||||
* Object lock mode. {@value}.
|
||||
*/
|
||||
public static final String XA_OBJECT_LOCK_MODE =
|
||||
XA_HEADER_PREFIX + Headers.OBJECT_LOCK_MODE;
|
||||
|
||||
/**
|
||||
* ISO8601 expiry date of object lock hold. {@value}.
|
||||
*/
|
||||
public static final String XA_OBJECT_LOCK_RETAIN_UNTIL_DATE =
|
||||
XA_HEADER_PREFIX + Headers.OBJECT_LOCK_RETAIN_UNTIL_DATE;
|
||||
|
||||
/**
|
||||
* Replication status for cross-region replicated objects. {@value}.
|
||||
*/
|
||||
public static final String XA_OBJECT_REPLICATION_STATUS =
|
||||
XA_HEADER_PREFIX + Headers.OBJECT_REPLICATION_STATUS;
|
||||
|
||||
/**
|
||||
* Version ID; empty for non-versioned buckets/data. {@value}.
|
||||
*/
|
||||
public static final String XA_S3_VERSION_ID =
|
||||
XA_HEADER_PREFIX + Headers.S3_VERSION_ID;
|
||||
|
||||
/**
|
||||
* The server-side encryption algorithm to use
|
||||
* with AWS-managed keys: {@value}.
|
||||
*/
|
||||
public static final String XA_SERVER_SIDE_ENCRYPTION =
|
||||
XA_HEADER_PREFIX + Headers.SERVER_SIDE_ENCRYPTION;
|
||||
|
||||
/**
|
||||
* Storage Class XAttr: {@value}.
|
||||
*/
|
||||
public static final String XA_STORAGE_CLASS =
|
||||
XA_HEADER_PREFIX + Headers.STORAGE_CLASS;
|
||||
|
||||
/**
|
||||
* Standard headers which are retrieved from HEAD Requests
|
||||
* and set as XAttrs if the response included the relevant header.
|
||||
*/
|
||||
public static final String[] XA_STANDARD_HEADERS = {
|
||||
/* HTTP standard headers */
|
||||
XA_CACHE_CONTROL,
|
||||
XA_CONTENT_DISPOSITION,
|
||||
XA_CONTENT_ENCODING,
|
||||
XA_CONTENT_LANGUAGE,
|
||||
XA_CONTENT_LENGTH,
|
||||
XA_CONTENT_MD5,
|
||||
XA_CONTENT_RANGE,
|
||||
XA_CONTENT_TYPE,
|
||||
XA_ETAG,
|
||||
XA_LAST_MODIFIED,
|
||||
/* aws headers */
|
||||
XA_ARCHIVE_STATUS,
|
||||
XA_OBJECT_LOCK_LEGAL_HOLD_STATUS,
|
||||
XA_OBJECT_LOCK_MODE,
|
||||
XA_OBJECT_LOCK_RETAIN_UNTIL_DATE,
|
||||
XA_OBJECT_REPLICATION_STATUS,
|
||||
XA_S3_VERSION_ID,
|
||||
XA_SERVER_SIDE_ENCRYPTION,
|
||||
XA_STORAGE_CLASS,
|
||||
};
|
||||
|
||||
/**
|
||||
* Content type of generic binary objects.
|
||||
* This is the default for uploaded objects.
|
||||
*/
|
||||
public static final String CONTENT_TYPE_OCTET_STREAM =
|
||||
"application/octet-stream";
|
||||
|
||||
/**
|
||||
* XML content type : {@value}.
|
||||
* This is application/xml, not text/xml, and is
|
||||
* what a HEAD of / returns as the type of a root path.
|
||||
*/
|
||||
public static final String CONTENT_TYPE_APPLICATION_XML =
|
||||
"application/xml";
|
||||
|
||||
/**
|
||||
* Construct.
|
||||
* @param storeContext store context.
|
||||
*/
|
||||
public HeaderProcessing(final StoreContext storeContext) {
|
||||
super(storeContext);
|
||||
}
|
||||
|
||||
/**
|
||||
* Query the store, get all the headers into a map. Each Header
|
||||
* has the "header." prefix.
|
||||
* Caller must have read access.
|
||||
* The value of each header is the string value of the object
|
||||
* UTF-8 encoded.
|
||||
* @param path path of object.
|
||||
* @param statistic statistic to use for duration tracking.
|
||||
* @return the headers
|
||||
* @throws IOException failure, including file not found.
|
||||
*/
|
||||
private Map<String, byte[]> retrieveHeaders(
|
||||
final Path path,
|
||||
final Statistic statistic) throws IOException {
|
||||
StoreContext context = getStoreContext();
|
||||
ContextAccessors accessors = context.getContextAccessors();
|
||||
String objectKey = accessors.pathToKey(path);
|
||||
ObjectMetadata md;
|
||||
String symbol = statistic.getSymbol();
|
||||
S3AStatisticsContext instrumentation = context.getInstrumentation();
|
||||
try {
|
||||
md = trackDuration(instrumentation, symbol, () ->
|
||||
accessors.getObjectMetadata(objectKey));
|
||||
} catch (FileNotFoundException e) {
|
||||
// no entry. It could be a directory, so try again.
|
||||
md = trackDuration(instrumentation, symbol, () ->
|
||||
accessors.getObjectMetadata(objectKey + "/"));
|
||||
}
|
||||
// all user metadata
|
||||
Map<String, String> rawHeaders = md.getUserMetadata();
|
||||
Map<String, byte[]> headers = new TreeMap<>();
|
||||
rawHeaders.forEach((key, value) ->
|
||||
headers.put(XA_HEADER_PREFIX + key, encodeBytes(value)));
|
||||
|
||||
// and add the usual content length &c, if set
|
||||
maybeSetHeader(headers, XA_CACHE_CONTROL,
|
||||
md.getCacheControl());
|
||||
maybeSetHeader(headers, XA_CONTENT_DISPOSITION,
|
||||
md.getContentDisposition());
|
||||
maybeSetHeader(headers, XA_CONTENT_ENCODING,
|
||||
md.getContentEncoding());
|
||||
maybeSetHeader(headers, XA_CONTENT_LANGUAGE,
|
||||
md.getContentLanguage());
|
||||
maybeSetHeader(headers, XA_CONTENT_LENGTH,
|
||||
md.getContentLength());
|
||||
maybeSetHeader(headers, XA_CONTENT_MD5,
|
||||
md.getContentMD5());
|
||||
maybeSetHeader(headers, XA_CONTENT_RANGE,
|
||||
md.getContentRange());
|
||||
maybeSetHeader(headers, XA_CONTENT_TYPE,
|
||||
md.getContentType());
|
||||
maybeSetHeader(headers, XA_ETAG,
|
||||
md.getETag());
|
||||
maybeSetHeader(headers, XA_LAST_MODIFIED,
|
||||
md.getLastModified());
|
||||
|
||||
// AWS custom headers
|
||||
maybeSetHeader(headers, XA_ARCHIVE_STATUS,
|
||||
md.getArchiveStatus());
|
||||
maybeSetHeader(headers, XA_OBJECT_LOCK_LEGAL_HOLD_STATUS,
|
||||
md.getObjectLockLegalHoldStatus());
|
||||
maybeSetHeader(headers, XA_OBJECT_LOCK_MODE,
|
||||
md.getObjectLockMode());
|
||||
maybeSetHeader(headers, XA_OBJECT_LOCK_RETAIN_UNTIL_DATE,
|
||||
md.getObjectLockRetainUntilDate());
|
||||
maybeSetHeader(headers, XA_OBJECT_REPLICATION_STATUS,
|
||||
md.getReplicationStatus());
|
||||
maybeSetHeader(headers, XA_S3_VERSION_ID,
|
||||
md.getVersionId());
|
||||
maybeSetHeader(headers, XA_SERVER_SIDE_ENCRYPTION,
|
||||
md.getSSEAlgorithm());
|
||||
maybeSetHeader(headers, XA_STORAGE_CLASS,
|
||||
md.getStorageClass());
|
||||
maybeSetHeader(headers, XA_STORAGE_CLASS,
|
||||
md.getReplicationStatus());
|
||||
return headers;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set a header if the value is non null.
|
||||
*
|
||||
* @param headers header map
|
||||
* @param name header name
|
||||
* @param value value to encode.
|
||||
*/
|
||||
private void maybeSetHeader(
|
||||
final Map<String, byte[]> headers,
|
||||
final String name,
|
||||
final Object value) {
|
||||
if (value != null) {
|
||||
headers.put(name, encodeBytes(value));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Stringify an object and return its bytes in UTF-8 encoding.
|
||||
* @param s source
|
||||
* @return encoded object or an empty buffer
|
||||
*/
|
||||
public static byte[] encodeBytes(@Nullable Object s) {
|
||||
return s == null
|
||||
? EMPTY
|
||||
: s.toString().getBytes(StandardCharsets.UTF_8);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the string value from the bytes.
|
||||
* if null : return null, otherwise the UTF-8 decoded
|
||||
* bytes.
|
||||
* @param bytes source bytes
|
||||
* @return decoded value
|
||||
*/
|
||||
public static String decodeBytes(byte[] bytes) {
|
||||
return bytes == null
|
||||
? null
|
||||
: new String(bytes, StandardCharsets.UTF_8);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get an XAttr name and value for a file or directory.
|
||||
* @param path Path to get extended attribute
|
||||
* @param name XAttr name.
|
||||
* @return byte[] XAttr value or null
|
||||
* @throws IOException IO failure
|
||||
*/
|
||||
public byte[] getXAttr(Path path, String name) throws IOException {
|
||||
return retrieveHeaders(path, INVOCATION_XATTR_GET_NAMED).get(name);
|
||||
}
|
||||
|
||||
/**
|
||||
* See {@code FileSystem.getXAttrs(path}.
|
||||
*
|
||||
* @param path Path to get extended attributes
|
||||
* @return Map describing the XAttrs of the file or directory
|
||||
* @throws IOException IO failure
|
||||
*/
|
||||
public Map<String, byte[]> getXAttrs(Path path) throws IOException {
|
||||
return retrieveHeaders(path, INVOCATION_XATTR_GET_MAP);
|
||||
}
|
||||
|
||||
/**
|
||||
* See {@code FileSystem.listXAttrs(path)}.
|
||||
* @param path Path to get extended attributes
|
||||
* @return List of supported XAttrs
|
||||
* @throws IOException IO failure
|
||||
*/
|
||||
public List<String> listXAttrs(final Path path) throws IOException {
|
||||
return new ArrayList<>(retrieveHeaders(path, INVOCATION_OP_XATTR_LIST)
|
||||
.keySet());
|
||||
}
|
||||
|
||||
/**
|
||||
* See {@code FileSystem.getXAttrs(path, names}.
|
||||
* @param path Path to get extended attributes
|
||||
* @param names XAttr names.
|
||||
* @return Map describing the XAttrs of the file or directory
|
||||
* @throws IOException IO failure
|
||||
*/
|
||||
public Map<String, byte[]> getXAttrs(Path path, List<String> names)
|
||||
throws IOException {
|
||||
Map<String, byte[]> headers = retrieveHeaders(path,
|
||||
INVOCATION_XATTR_GET_NAMED_MAP);
|
||||
Map<String, byte[]> result = new TreeMap<>();
|
||||
headers.entrySet().stream()
|
||||
.filter(entry -> names.contains(entry.getKey()))
|
||||
.forEach(entry -> result.put(entry.getKey(), entry.getValue()));
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert an XAttr byte array to a long.
|
||||
* testability.
|
||||
* @param data data to parse
|
||||
* @return either a length or none
|
||||
*/
|
||||
public static Optional<Long> extractXAttrLongValue(byte[] data) {
|
||||
String xAttr;
|
||||
xAttr = HeaderProcessing.decodeBytes(data);
|
||||
if (StringUtils.isNotEmpty(xAttr)) {
|
||||
try {
|
||||
long l = Long.parseLong(xAttr);
|
||||
if (l >= 0) {
|
||||
return Optional.of(l);
|
||||
}
|
||||
} catch (NumberFormatException ex) {
|
||||
LOG.warn("Not a number: {}", xAttr, ex);
|
||||
}
|
||||
}
|
||||
// missing/empty header or parse failure.
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a copy of the passed {@link ObjectMetadata}.
|
||||
* Does so without using the {@link ObjectMetadata#clone()} method,
|
||||
* to avoid copying unnecessary headers.
|
||||
* This operation does not copy the {@code X_HEADER_MAGIC_MARKER}
|
||||
* header to avoid confusion. If a marker file is renamed,
|
||||
* it loses information about any remapped file.
|
||||
* If new fields are added to ObjectMetadata which are not
|
||||
* present in the user metadata headers, they will not be picked
|
||||
* up or cloned unless this operation is updated.
|
||||
* @param source the {@link ObjectMetadata} to copy
|
||||
* @param dest the metadata to update; this is the return value.
|
||||
*/
|
||||
public void cloneObjectMetadata(ObjectMetadata source,
|
||||
ObjectMetadata dest) {
|
||||
|
||||
// Possibly null attributes
|
||||
// Allowing nulls to pass breaks it during later use
|
||||
if (source.getCacheControl() != null) {
|
||||
dest.setCacheControl(source.getCacheControl());
|
||||
}
|
||||
if (source.getContentDisposition() != null) {
|
||||
dest.setContentDisposition(source.getContentDisposition());
|
||||
}
|
||||
if (source.getContentEncoding() != null) {
|
||||
dest.setContentEncoding(source.getContentEncoding());
|
||||
}
|
||||
if (source.getContentMD5() != null) {
|
||||
dest.setContentMD5(source.getContentMD5());
|
||||
}
|
||||
if (source.getContentType() != null) {
|
||||
dest.setContentType(source.getContentType());
|
||||
}
|
||||
if (source.getExpirationTime() != null) {
|
||||
dest.setExpirationTime(source.getExpirationTime());
|
||||
}
|
||||
if (source.getExpirationTimeRuleId() != null) {
|
||||
dest.setExpirationTimeRuleId(source.getExpirationTimeRuleId());
|
||||
}
|
||||
if (source.getHttpExpiresDate() != null) {
|
||||
dest.setHttpExpiresDate(source.getHttpExpiresDate());
|
||||
}
|
||||
if (source.getLastModified() != null) {
|
||||
dest.setLastModified(source.getLastModified());
|
||||
}
|
||||
if (source.getOngoingRestore() != null) {
|
||||
dest.setOngoingRestore(source.getOngoingRestore());
|
||||
}
|
||||
if (source.getRestoreExpirationTime() != null) {
|
||||
dest.setRestoreExpirationTime(source.getRestoreExpirationTime());
|
||||
}
|
||||
if (source.getSSEAlgorithm() != null) {
|
||||
dest.setSSEAlgorithm(source.getSSEAlgorithm());
|
||||
}
|
||||
if (source.getSSECustomerAlgorithm() != null) {
|
||||
dest.setSSECustomerAlgorithm(source.getSSECustomerAlgorithm());
|
||||
}
|
||||
if (source.getSSECustomerKeyMd5() != null) {
|
||||
dest.setSSECustomerKeyMd5(source.getSSECustomerKeyMd5());
|
||||
}
|
||||
|
||||
// copy user metadata except the magic marker header.
|
||||
source.getUserMetadata().entrySet().stream()
|
||||
.filter(e -> !e.getKey().equals(X_HEADER_MAGIC_MARKER))
|
||||
.forEach(e -> dest.addUserMetadata(e.getKey(), e.getValue()));
|
||||
}
|
||||
|
||||
}
|
|
@ -21,11 +21,12 @@ package org.apache.hadoop.fs.s3a.statistics;
|
|||
import java.time.Duration;
|
||||
|
||||
import org.apache.hadoop.fs.s3a.Statistic;
|
||||
import org.apache.hadoop.fs.statistics.DurationTrackerFactory;
|
||||
|
||||
/**
|
||||
* This is the foundational API for collecting S3A statistics.
|
||||
*/
|
||||
public interface CountersAndGauges {
|
||||
public interface CountersAndGauges extends DurationTrackerFactory {
|
||||
|
||||
/**
|
||||
* Increment a specific counter.
|
||||
|
|
|
@ -33,6 +33,7 @@ import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
|
|||
import org.apache.hadoop.fs.s3a.statistics.S3AMultipartUploaderStatistics;
|
||||
import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext;
|
||||
import org.apache.hadoop.fs.s3a.statistics.StatisticsFromAwsSdk;
|
||||
import org.apache.hadoop.fs.statistics.DurationTracker;
|
||||
|
||||
/**
|
||||
* An S3A statistics context which is bonded to a
|
||||
|
@ -210,6 +211,11 @@ public class BondedS3AStatisticsContext implements S3AStatisticsContext {
|
|||
return new S3AMultipartUploaderStatisticsImpl(this::incrementCounter);
|
||||
}
|
||||
|
||||
@Override
|
||||
public DurationTracker trackDuration(final String key, final long count) {
|
||||
return getInstrumentation().trackDuration(key, count);
|
||||
}
|
||||
|
||||
/**
|
||||
* This is the interface which an integration source must implement
|
||||
* for the integration.
|
||||
|
|
|
@ -1337,6 +1337,16 @@ On `close()`, summary data would be written to the file
|
|||
`/results/latest/__magic/job400_1/task_01_01/latest.orc.lzo.pending`.
|
||||
This would contain the upload ID and all the parts and etags of uploaded data.
|
||||
|
||||
A marker file is also created, so that code which verifies that a newly created file
|
||||
exists does not fail.
|
||||
1. These marker files are zero bytes long.
|
||||
1. They declare the full length of the final file in the HTTP header
|
||||
`x-hadoop-s3a-magic-data-length`.
|
||||
1. A call to `getXAttr("header.x-hadoop-s3a-magic-data-length")` will return a
|
||||
string containing the number of bytes in the data uploaded.
|
||||
|
||||
This is needed so that the Spark write-tracking code can report how much data
|
||||
has been created.
|
||||
|
||||
#### Task commit
|
||||
|
||||
|
|
|
@ -360,6 +360,7 @@ However, it has extra requirements of the filesystem
|
|||
1. The S3A client must be configured to recognize interactions
|
||||
with the magic directories and treat them specially.
|
||||
|
||||
Now that Amazon S3 is consistent, the magic committer is enabled by default.
|
||||
|
||||
It's also not been field tested to the extent of Netflix's committer; consider
|
||||
it the least mature of the committers.
|
||||
|
|
|
@ -29,6 +29,7 @@ import com.amazonaws.services.s3.AmazonS3;
|
|||
import com.amazonaws.services.s3.model.GetBucketEncryptionResult;
|
||||
import com.amazonaws.services.s3.model.ObjectMetadata;
|
||||
import com.amazonaws.services.s3.model.PutObjectRequest;
|
||||
import org.assertj.core.api.Assertions;
|
||||
import org.junit.Assume;
|
||||
import org.junit.Test;
|
||||
|
||||
|
@ -47,6 +48,7 @@ import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
|
|||
import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM;
|
||||
import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_KEY;
|
||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
|
||||
import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.XA_ETAG;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
|
||||
/**
|
||||
|
@ -171,6 +173,9 @@ public class ITestS3AMiscOperations extends AbstractS3ATestBase {
|
|||
assertNotEquals("file 1 checksum", 0, checksum1.getLength());
|
||||
assertEquals("checksums of empty files", checksum1,
|
||||
fs.getFileChecksum(touchFile("file2"), 0));
|
||||
Assertions.assertThat(fs.getXAttr(file1, XA_ETAG))
|
||||
.describedAs("etag from xattr")
|
||||
.isEqualTo(checksum1.getBytes());
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -222,6 +227,9 @@ public class ITestS3AMiscOperations extends AbstractS3ATestBase {
|
|||
createFile(fs, file4, true,
|
||||
"hello, world".getBytes(StandardCharsets.UTF_8));
|
||||
assertNotEquals(checksum2, fs.getFileChecksum(file4, 0));
|
||||
Assertions.assertThat(fs.getXAttr(file3, XA_ETAG))
|
||||
.describedAs("etag from xattr")
|
||||
.isEqualTo(checksum1.getBytes());
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -272,7 +272,9 @@ public class ITestS3ARemoteFileChanged extends AbstractS3ATestBase {
|
|||
@Override
|
||||
public void teardown() throws Exception {
|
||||
// restore the s3 client so there's no mocking interfering with the teardown
|
||||
originalS3Client.ifPresent(fs::setAmazonS3Client);
|
||||
if (fs != null) {
|
||||
originalS3Client.ifPresent(fs::setAmazonS3Client);
|
||||
}
|
||||
super.teardown();
|
||||
}
|
||||
|
||||
|
|
|
@ -1335,11 +1335,12 @@ public abstract class AbstractITCommitProtocol extends AbstractCommitITest {
|
|||
= outputFormat.getRecordWriter(tContext);
|
||||
IntWritable iw = new IntWritable(1);
|
||||
recordWriter.write(iw, iw);
|
||||
long expectedLength = 4;
|
||||
Path dest = recordWriter.getDest();
|
||||
validateTaskAttemptPathDuringWrite(dest);
|
||||
validateTaskAttemptPathDuringWrite(dest, expectedLength);
|
||||
recordWriter.close(tContext);
|
||||
// at this point
|
||||
validateTaskAttemptPathAfterWrite(dest);
|
||||
validateTaskAttemptPathAfterWrite(dest, expectedLength);
|
||||
assertTrue("Committer does not have data to commit " + committer,
|
||||
committer.needsTaskCommit(tContext));
|
||||
commitTask(committer, tContext);
|
||||
|
@ -1750,9 +1751,11 @@ public abstract class AbstractITCommitProtocol extends AbstractCommitITest {
|
|||
* Validate the path of a file being written to during the write
|
||||
* itself.
|
||||
* @param p path
|
||||
* @param expectedLength
|
||||
* @throws IOException IO failure
|
||||
*/
|
||||
protected void validateTaskAttemptPathDuringWrite(Path p) throws IOException {
|
||||
protected void validateTaskAttemptPathDuringWrite(Path p,
|
||||
final long expectedLength) throws IOException {
|
||||
|
||||
}
|
||||
|
||||
|
@ -1760,9 +1763,11 @@ public abstract class AbstractITCommitProtocol extends AbstractCommitITest {
|
|||
* Validate the path of a file being written to after the write
|
||||
* operation has completed.
|
||||
* @param p path
|
||||
* @param expectedLength
|
||||
* @throws IOException IO failure
|
||||
*/
|
||||
protected void validateTaskAttemptPathAfterWrite(Path p) throws IOException {
|
||||
protected void validateTaskAttemptPathAfterWrite(Path p,
|
||||
final long expectedLength) throws IOException {
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -26,6 +26,7 @@ import java.util.List;
|
|||
|
||||
import com.amazonaws.services.s3.model.PartETag;
|
||||
import org.apache.hadoop.thirdparty.com.google.common.collect.Lists;
|
||||
import org.assertj.core.api.Assertions;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -52,6 +53,7 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
|
|||
import static org.apache.hadoop.fs.contract.ContractTestUtils.*;
|
||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
|
||||
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
|
||||
import static org.apache.hadoop.fs.s3a.commit.CommitOperations.extractMagicFileLength;
|
||||
import static org.apache.hadoop.fs.s3a.commit.CommitUtils.*;
|
||||
import static org.apache.hadoop.fs.s3a.commit.MagicCommitPaths.*;
|
||||
import static org.apache.hadoop.fs.s3a.Constants.*;
|
||||
|
@ -216,13 +218,13 @@ public class ITestCommitOperations extends AbstractCommitITest {
|
|||
|
||||
@Test
|
||||
public void testCommitEmptyFile() throws Throwable {
|
||||
describe("create then commit an empty file");
|
||||
describe("create then commit an empty magic file");
|
||||
createCommitAndVerify("empty-commit.txt", new byte[0]);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCommitSmallFile() throws Throwable {
|
||||
describe("create then commit an empty file");
|
||||
describe("create then commit a small magic file");
|
||||
createCommitAndVerify("small-commit.txt", DATASET);
|
||||
}
|
||||
|
||||
|
@ -288,6 +290,64 @@ public class ITestCommitOperations extends AbstractCommitITest {
|
|||
commit("child.txt", pendingChildPath, expectedDestPath, 0, 0);
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify that that when a marker file is renamed, its
|
||||
* magic marker attribute is lost.
|
||||
*/
|
||||
@Test
|
||||
public void testMarkerFileRename()
|
||||
throws Exception {
|
||||
S3AFileSystem fs = getFileSystem();
|
||||
Path destFile = methodPath();
|
||||
Path destDir = destFile.getParent();
|
||||
fs.delete(destDir, true);
|
||||
Path magicDest = makeMagic(destFile);
|
||||
Path magicDir = magicDest.getParent();
|
||||
fs.mkdirs(magicDir);
|
||||
|
||||
// use the builder API to verify it works exactly the
|
||||
// same.
|
||||
try (FSDataOutputStream stream = fs.createFile(magicDest)
|
||||
.overwrite(true)
|
||||
.recursive()
|
||||
.build()) {
|
||||
assertIsMagicStream(stream);
|
||||
stream.write(DATASET);
|
||||
}
|
||||
Path magic2 = new Path(magicDir, "magic2");
|
||||
// rename the marker
|
||||
fs.rename(magicDest, magic2);
|
||||
|
||||
// the renamed file has no header
|
||||
Assertions.assertThat(extractMagicFileLength(fs, magic2))
|
||||
.describedAs("XAttribute " + XA_MAGIC_MARKER + " of " + magic2)
|
||||
.isEmpty();
|
||||
// abort the upload, which is driven by the .pending files
|
||||
// there must be 1 deleted file; during test debugging with aborted
|
||||
// runs there may be more.
|
||||
Assertions.assertThat(newCommitOperations()
|
||||
.abortPendingUploadsUnderPath(destDir))
|
||||
.describedAs("Aborting all pending uploads under %s", destDir)
|
||||
.isGreaterThanOrEqualTo(1);
|
||||
}
|
||||
|
||||
/**
|
||||
* Assert that an output stream is magic.
|
||||
* @param stream stream to probe.
|
||||
*/
|
||||
protected void assertIsMagicStream(final FSDataOutputStream stream) {
|
||||
Assertions.assertThat(stream.hasCapability(STREAM_CAPABILITY_MAGIC_OUTPUT))
|
||||
.describedAs("Stream capability %s in stream %s",
|
||||
STREAM_CAPABILITY_MAGIC_OUTPUT, stream)
|
||||
.isTrue();
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a file through the magic commit mechanism.
|
||||
* @param filename file to create (with __magic path.)
|
||||
* @param data data to write
|
||||
* @throws Exception failure
|
||||
*/
|
||||
private void createCommitAndVerify(String filename, byte[] data)
|
||||
throws Exception {
|
||||
S3AFileSystem fs = getFileSystem();
|
||||
|
@ -295,19 +355,30 @@ public class ITestCommitOperations extends AbstractCommitITest {
|
|||
fs.delete(destFile.getParent(), true);
|
||||
Path magicDest = makeMagic(destFile);
|
||||
assertPathDoesNotExist("Magic file should not exist", magicDest);
|
||||
long dataSize = data != null ? data.length : 0;
|
||||
try(FSDataOutputStream stream = fs.create(magicDest, true)) {
|
||||
assertTrue(stream.hasCapability(STREAM_CAPABILITY_MAGIC_OUTPUT));
|
||||
if (data != null && data.length > 0) {
|
||||
assertIsMagicStream(stream);
|
||||
if (dataSize > 0) {
|
||||
stream.write(data);
|
||||
}
|
||||
stream.close();
|
||||
}
|
||||
FileStatus status = getFileStatusEventually(fs, magicDest,
|
||||
CONSISTENCY_WAIT);
|
||||
assertEquals("Non empty marker file: " + status, 0, status.getLen());
|
||||
|
||||
assertEquals("Magic marker file is not zero bytes: " + status,
|
||||
0, 0);
|
||||
Assertions.assertThat(extractMagicFileLength(fs,
|
||||
magicDest))
|
||||
.describedAs("XAttribute " + XA_MAGIC_MARKER + " of " + magicDest)
|
||||
.isNotEmpty()
|
||||
.hasValue(dataSize);
|
||||
commit(filename, destFile, HIGH_THROTTLE, 0);
|
||||
verifyFileContents(fs, destFile, data);
|
||||
// the destination file doesn't have the attribute
|
||||
Assertions.assertThat(extractMagicFileLength(fs,
|
||||
destFile))
|
||||
.describedAs("XAttribute " + XA_MAGIC_MARKER + " of " + destFile)
|
||||
.isEmpty();
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -20,17 +20,21 @@ package org.apache.hadoop.fs.s3a.commit.magic;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.util.List;
|
||||
|
||||
import org.assertj.core.api.Assertions;
|
||||
import org.junit.Test;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.LocatedFileStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.contract.ContractTestUtils;
|
||||
import org.apache.hadoop.fs.s3a.S3AFileSystem;
|
||||
import org.apache.hadoop.fs.s3a.commit.AbstractITCommitProtocol;
|
||||
import org.apache.hadoop.fs.s3a.commit.AbstractS3ACommitter;
|
||||
import org.apache.hadoop.fs.s3a.commit.CommitConstants;
|
||||
import org.apache.hadoop.fs.s3a.commit.CommitOperations;
|
||||
import org.apache.hadoop.fs.s3a.commit.CommitUtils;
|
||||
import org.apache.hadoop.fs.s3a.commit.CommitterFaultInjection;
|
||||
import org.apache.hadoop.fs.s3a.commit.CommitterFaultInjectionImpl;
|
||||
|
@ -39,6 +43,7 @@ import org.apache.hadoop.mapreduce.JobStatus;
|
|||
import org.apache.hadoop.mapreduce.TaskAttemptContext;
|
||||
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
|
||||
|
||||
import static org.apache.hadoop.fs.s3a.S3AUtils.listAndFilter;
|
||||
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
|
||||
import static org.hamcrest.CoreMatchers.containsString;
|
||||
|
||||
|
@ -107,18 +112,44 @@ public class ITestMagicCommitProtocol extends AbstractITCommitProtocol {
|
|||
return new CommitterWithFailedThenSucceed(getOutDir(), tContext);
|
||||
}
|
||||
|
||||
protected void validateTaskAttemptPathDuringWrite(Path p) throws IOException {
|
||||
protected void validateTaskAttemptPathDuringWrite(Path p,
|
||||
final long expectedLength) throws IOException {
|
||||
String pathStr = p.toString();
|
||||
assertTrue("not magic " + pathStr,
|
||||
pathStr.contains(MAGIC));
|
||||
assertPathDoesNotExist("task attempt visible", p);
|
||||
}
|
||||
|
||||
protected void validateTaskAttemptPathAfterWrite(Path p) throws IOException {
|
||||
FileStatus st = getFileSystem().getFileStatus(p);
|
||||
assertEquals("file length in " + st, 0, st.getLen());
|
||||
Path pendingFile = new Path(p.toString() + PENDING_SUFFIX);
|
||||
protected void validateTaskAttemptPathAfterWrite(Path marker,
|
||||
final long expectedLength) throws IOException {
|
||||
// the pending file exists
|
||||
Path pendingFile = new Path(marker.toString() + PENDING_SUFFIX);
|
||||
assertPathExists("pending file", pendingFile);
|
||||
S3AFileSystem fs = getFileSystem();
|
||||
|
||||
// THIS SEQUENCE MUST BE RUN IN ORDER ON A S3GUARDED
|
||||
// STORE
|
||||
// if you list the parent dir and find the marker, it
|
||||
// is really 0 bytes long
|
||||
String name = marker.getName();
|
||||
List<LocatedFileStatus> filtered = listAndFilter(fs,
|
||||
marker.getParent(), false,
|
||||
(path) -> path.getName().equals(name));
|
||||
Assertions.assertThat(filtered)
|
||||
.hasSize(1);
|
||||
Assertions.assertThat(filtered.get(0))
|
||||
.matches(lst -> lst.getLen() == 0,
|
||||
"Listing should return 0 byte length");
|
||||
|
||||
// marker file is empty
|
||||
FileStatus st = fs.getFileStatus(marker);
|
||||
assertEquals("file length in " + st, 0, st.getLen());
|
||||
// xattr header
|
||||
Assertions.assertThat(CommitOperations.extractMagicFileLength(fs,
|
||||
marker))
|
||||
.describedAs("XAttribute " + XA_MAGIC_MARKER)
|
||||
.isNotEmpty()
|
||||
.hasValue(expectedLength);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -23,6 +23,7 @@ import java.util.UUID;
|
|||
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.contract.ContractTestUtils;
|
||||
|
@ -113,14 +114,20 @@ public class ITestStagingCommitProtocol extends AbstractITCommitProtocol {
|
|||
IOException.class);
|
||||
}
|
||||
|
||||
protected void validateTaskAttemptPathDuringWrite(Path p) throws IOException {
|
||||
protected void validateTaskAttemptPathDuringWrite(Path p,
|
||||
final long expectedLength) throws IOException {
|
||||
// this is expected to be local FS
|
||||
ContractTestUtils.assertPathExists(getLocalFS(), "task attempt", p);
|
||||
}
|
||||
|
||||
protected void validateTaskAttemptPathAfterWrite(Path p) throws IOException {
|
||||
protected void validateTaskAttemptPathAfterWrite(Path p,
|
||||
final long expectedLength) throws IOException {
|
||||
// this is expected to be local FS
|
||||
ContractTestUtils.assertPathExists(getLocalFS(), "task attempt", p);
|
||||
// this is expected to be local FS
|
||||
FileSystem localFS = getLocalFS();
|
||||
ContractTestUtils.assertPathExists(localFS, "task attempt", p);
|
||||
FileStatus st = localFS.getFileStatus(p);
|
||||
assertEquals("file length in " + st, expectedLength, st.getLen());
|
||||
}
|
||||
|
||||
protected FileSystem getLocalFS() throws IOException {
|
||||
|
|
|
@ -0,0 +1,219 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.s3a.impl;
|
||||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.assertj.core.api.AbstractStringAssert;
|
||||
import org.assertj.core.api.Assertions;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.s3a.S3AFileSystem;
|
||||
import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest;
|
||||
|
||||
import static org.apache.hadoop.fs.s3a.Statistic.INVOCATION_OP_XATTR_LIST;
|
||||
import static org.apache.hadoop.fs.s3a.Statistic.INVOCATION_XATTR_GET_MAP;
|
||||
import static org.apache.hadoop.fs.s3a.Statistic.INVOCATION_XATTR_GET_NAMED;
|
||||
import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.CONTENT_TYPE_OCTET_STREAM;
|
||||
import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.CONTENT_TYPE_APPLICATION_XML;
|
||||
import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.XA_CONTENT_LENGTH;
|
||||
import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.XA_CONTENT_TYPE;
|
||||
import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.XA_STANDARD_HEADERS;
|
||||
import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.decodeBytes;
|
||||
import static org.apache.hadoop.fs.s3a.performance.OperationCost.CREATE_FILE_OVERWRITE;
|
||||
|
||||
/**
|
||||
* Invoke XAttr API calls against objects in S3 and validate header
|
||||
* extraction.
|
||||
*/
|
||||
public class ITestXAttrCost extends AbstractS3ACostTest {
|
||||
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(ITestXAttrCost.class);
|
||||
|
||||
private static final int GET_METADATA_ON_OBJECT = 1;
|
||||
private static final int GET_METADATA_ON_DIR = GET_METADATA_ON_OBJECT * 2;
|
||||
|
||||
public ITestXAttrCost() {
|
||||
// no parameterization here
|
||||
super(false, true, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testXAttrRoot() throws Throwable {
|
||||
describe("Test xattr on root");
|
||||
Path root = new Path("/");
|
||||
S3AFileSystem fs = getFileSystem();
|
||||
Map<String, byte[]> xAttrs = verifyMetrics(
|
||||
() -> fs.getXAttrs(root),
|
||||
with(INVOCATION_XATTR_GET_MAP, GET_METADATA_ON_OBJECT));
|
||||
logXAttrs(xAttrs);
|
||||
List<String> headerList = verifyMetrics(() ->
|
||||
fs.listXAttrs(root),
|
||||
with(INVOCATION_OP_XATTR_LIST, GET_METADATA_ON_OBJECT));
|
||||
|
||||
// verify this contains all the standard markers,
|
||||
// but not the magic marker header
|
||||
Assertions.assertThat(headerList)
|
||||
.describedAs("Headers on root object")
|
||||
.containsOnly(
|
||||
XA_CONTENT_LENGTH,
|
||||
XA_CONTENT_TYPE);
|
||||
assertHeaderEntry(xAttrs, XA_CONTENT_TYPE)
|
||||
.isEqualTo(CONTENT_TYPE_APPLICATION_XML);
|
||||
}
|
||||
|
||||
/**
|
||||
* Log the attributes as strings.
|
||||
* @param xAttrs map of attributes
|
||||
*/
|
||||
private void logXAttrs(final Map<String, byte[]> xAttrs) {
|
||||
xAttrs.forEach((k, v) ->
|
||||
LOG.info("{} has bytes[{}] => \"{}\"",
|
||||
k, v.length, decodeBytes(v)));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testXAttrFile() throws Throwable {
|
||||
describe("Test xattr on a file");
|
||||
Path testFile = methodPath();
|
||||
create(testFile, true, CREATE_FILE_OVERWRITE);
|
||||
S3AFileSystem fs = getFileSystem();
|
||||
Map<String, byte[]> xAttrs = verifyMetrics(() ->
|
||||
fs.getXAttrs(testFile),
|
||||
with(INVOCATION_XATTR_GET_MAP, GET_METADATA_ON_OBJECT));
|
||||
logXAttrs(xAttrs);
|
||||
assertHeaderEntry(xAttrs, XA_CONTENT_LENGTH)
|
||||
.isEqualTo("0");
|
||||
|
||||
// get the list of supported headers
|
||||
List<String> headerList = verifyMetrics(
|
||||
() -> fs.listXAttrs(testFile),
|
||||
with(INVOCATION_OP_XATTR_LIST, GET_METADATA_ON_OBJECT));
|
||||
// verify this contains all the standard markers,
|
||||
// but not the magic marker header
|
||||
Assertions.assertThat(headerList)
|
||||
.describedAs("Supported headers")
|
||||
.containsAnyElementsOf(Arrays.asList(XA_STANDARD_HEADERS));
|
||||
|
||||
// ask for one header and validate its value
|
||||
byte[] bytes = verifyMetrics(() ->
|
||||
fs.getXAttr(testFile, XA_CONTENT_LENGTH),
|
||||
with(INVOCATION_XATTR_GET_NAMED, GET_METADATA_ON_OBJECT));
|
||||
assertHeader(XA_CONTENT_LENGTH, bytes)
|
||||
.isEqualTo("0");
|
||||
assertHeaderEntry(xAttrs, XA_CONTENT_TYPE)
|
||||
.isEqualTo(CONTENT_TYPE_OCTET_STREAM);
|
||||
}
|
||||
|
||||
/**
|
||||
* Directory attributes can be retrieved, but they take two HEAD requests.
|
||||
* @throws Throwable
|
||||
*/
|
||||
@Test
|
||||
public void testXAttrDir() throws Throwable {
|
||||
describe("Test xattr on a dir");
|
||||
|
||||
S3AFileSystem fs = getFileSystem();
|
||||
Path dir = methodPath();
|
||||
fs.mkdirs(dir);
|
||||
Map<String, byte[]> xAttrs = verifyMetrics(() ->
|
||||
fs.getXAttrs(dir),
|
||||
with(INVOCATION_XATTR_GET_MAP, GET_METADATA_ON_DIR));
|
||||
logXAttrs(xAttrs);
|
||||
assertHeaderEntry(xAttrs, XA_CONTENT_LENGTH)
|
||||
.isEqualTo("0");
|
||||
|
||||
// get the list of supported headers
|
||||
List<String> headerList = verifyMetrics(
|
||||
() -> fs.listXAttrs(dir),
|
||||
with(INVOCATION_OP_XATTR_LIST, GET_METADATA_ON_DIR));
|
||||
// verify this contains all the standard markers,
|
||||
// but not the magic marker header
|
||||
Assertions.assertThat(headerList)
|
||||
.describedAs("Supported headers")
|
||||
.containsAnyElementsOf(Arrays.asList(XA_STANDARD_HEADERS));
|
||||
|
||||
// ask for one header and validate its value
|
||||
byte[] bytes = verifyMetrics(() ->
|
||||
fs.getXAttr(dir, XA_CONTENT_LENGTH),
|
||||
with(INVOCATION_XATTR_GET_NAMED, GET_METADATA_ON_DIR));
|
||||
assertHeader(XA_CONTENT_LENGTH, bytes)
|
||||
.isEqualTo("0");
|
||||
assertHeaderEntry(xAttrs, XA_CONTENT_TYPE)
|
||||
.isEqualTo(CONTENT_TYPE_OCTET_STREAM);
|
||||
}
|
||||
|
||||
/**
|
||||
* When the operations are called on a missing path, FNFE is
|
||||
* raised and only one attempt is made to retry the operation.
|
||||
*/
|
||||
@Test
|
||||
public void testXAttrMissingFile() throws Throwable {
|
||||
describe("Test xattr on a missing path");
|
||||
Path testFile = methodPath();
|
||||
S3AFileSystem fs = getFileSystem();
|
||||
int getMetadataOnMissingFile = GET_METADATA_ON_DIR;
|
||||
verifyMetricsIntercepting(FileNotFoundException.class, "", () ->
|
||||
fs.getXAttrs(testFile),
|
||||
with(INVOCATION_XATTR_GET_MAP, getMetadataOnMissingFile));
|
||||
verifyMetricsIntercepting(FileNotFoundException.class, "", () ->
|
||||
fs.getXAttr(testFile, XA_CONTENT_LENGTH),
|
||||
with(INVOCATION_XATTR_GET_NAMED, getMetadataOnMissingFile));
|
||||
verifyMetricsIntercepting(FileNotFoundException.class, "", () ->
|
||||
fs.listXAttrs(testFile),
|
||||
with(INVOCATION_OP_XATTR_LIST, getMetadataOnMissingFile));
|
||||
}
|
||||
|
||||
/**
|
||||
* Generate an assert on a named header in the map.
|
||||
* @param xAttrs attribute map
|
||||
* @param key header key
|
||||
* @return the assertion
|
||||
*/
|
||||
private AbstractStringAssert<?> assertHeaderEntry(
|
||||
Map<String, byte[]> xAttrs, String key) {
|
||||
|
||||
return assertHeader(key, xAttrs.get(key));
|
||||
}
|
||||
|
||||
/**
|
||||
* Create an assertion on the header; check for the bytes
|
||||
* being non-null/empty and then returns the decoded values
|
||||
* as a string assert.
|
||||
* @param key header key (for error)
|
||||
* @param bytes value
|
||||
* @return the assertion
|
||||
*/
|
||||
private AbstractStringAssert<?> assertHeader(final String key,
|
||||
final byte[] bytes) {
|
||||
|
||||
String decoded = decodeBytes(bytes);
|
||||
return Assertions.assertThat(decoded)
|
||||
.describedAs("xattr %s decoded to: %s", key, decoded)
|
||||
.isNotNull()
|
||||
.isNotEmpty();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,313 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.s3a.impl;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import com.amazonaws.services.s3.model.ObjectMetadata;
|
||||
import org.assertj.core.api.Assertions;
|
||||
import org.assertj.core.util.Lists;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.s3a.S3ATestUtils;
|
||||
import org.apache.hadoop.fs.s3a.test.OperationTrackingStore;
|
||||
import org.apache.hadoop.test.HadoopTestBase;
|
||||
|
||||
import static java.lang.System.currentTimeMillis;
|
||||
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.XA_MAGIC_MARKER;
|
||||
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.X_HEADER_MAGIC_MARKER;
|
||||
import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.XA_LAST_MODIFIED;
|
||||
import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.XA_CONTENT_LENGTH;
|
||||
import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.decodeBytes;
|
||||
import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.encodeBytes;
|
||||
import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.extractXAttrLongValue;
|
||||
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
|
||||
|
||||
/**
|
||||
* Unit tests of header processing logic in {@link HeaderProcessing}.
|
||||
* Builds up a context accessor where the path
|
||||
* defined in {@link #MAGIC_PATH} exists and returns object metadata.
|
||||
*
|
||||
*/
|
||||
public class TestHeaderProcessing extends HadoopTestBase {
|
||||
|
||||
private static final XAttrContextAccessor CONTEXT_ACCESSORS
|
||||
= new XAttrContextAccessor();
|
||||
|
||||
public static final String VALUE = "abcdeFGHIJ123!@##&82;";
|
||||
|
||||
public static final long FILE_LENGTH = 1024;
|
||||
|
||||
private static final String FINAL_FILE = "s3a://bucket/dest/output.csv";
|
||||
|
||||
private StoreContext context;
|
||||
|
||||
private HeaderProcessing headerProcessing;
|
||||
|
||||
private static final String MAGIC_KEY
|
||||
= "dest/__magic/job1/ta1/__base/output.csv";
|
||||
private static final String MAGIC_FILE
|
||||
= "s3a://bucket/" + MAGIC_KEY;
|
||||
|
||||
private static final Path MAGIC_PATH =
|
||||
new Path(MAGIC_FILE);
|
||||
|
||||
public static final long MAGIC_LEN = 4096L;
|
||||
|
||||
/**
|
||||
* All the XAttrs which are built up.
|
||||
*/
|
||||
private static final String[] RETRIEVED_XATTRS = {
|
||||
XA_MAGIC_MARKER,
|
||||
XA_CONTENT_LENGTH,
|
||||
XA_LAST_MODIFIED
|
||||
};
|
||||
|
||||
@Before
|
||||
public void setup() throws Exception {
|
||||
CONTEXT_ACCESSORS.len = FILE_LENGTH;
|
||||
CONTEXT_ACCESSORS.userHeaders.put(
|
||||
X_HEADER_MAGIC_MARKER,
|
||||
Long.toString(MAGIC_LEN));
|
||||
context = S3ATestUtils.createMockStoreContext(true,
|
||||
new OperationTrackingStore(), CONTEXT_ACCESSORS);
|
||||
headerProcessing = new HeaderProcessing(context);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testByteRoundTrip() throws Throwable {
|
||||
Assertions.assertThat(decodeBytes(encodeBytes(VALUE)))
|
||||
.describedAs("encoding of " + VALUE)
|
||||
.isEqualTo(VALUE);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetMarkerXAttr() throws Throwable {
|
||||
assertAttributeHasValue(XA_MAGIC_MARKER, MAGIC_LEN);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetLengthXAttr() throws Throwable {
|
||||
assertAttributeHasValue(XA_CONTENT_LENGTH, FILE_LENGTH);
|
||||
}
|
||||
|
||||
/**
|
||||
* Last modified makes it through.
|
||||
*/
|
||||
@Test
|
||||
public void testGetDateXAttr() throws Throwable {
|
||||
Assertions.assertThat(
|
||||
decodeBytes(headerProcessing.getXAttr(MAGIC_PATH,
|
||||
XA_LAST_MODIFIED)))
|
||||
.describedAs("XAttribute " + XA_LAST_MODIFIED)
|
||||
.isEqualTo(CONTEXT_ACCESSORS.date.toString());
|
||||
}
|
||||
|
||||
/**
|
||||
* The API calls on unknown paths raise 404s.
|
||||
*/
|
||||
@Test
|
||||
public void test404() throws Throwable {
|
||||
intercept(FileNotFoundException.class, () ->
|
||||
headerProcessing.getXAttr(new Path(FINAL_FILE), XA_MAGIC_MARKER));
|
||||
}
|
||||
|
||||
/**
|
||||
* This call returns all the attributes which aren't null, including
|
||||
* all the standard HTTP headers.
|
||||
*/
|
||||
@Test
|
||||
public void testGetAllXAttrs() throws Throwable {
|
||||
Map<String, byte[]> xAttrs = headerProcessing.getXAttrs(MAGIC_PATH);
|
||||
Assertions.assertThat(xAttrs.keySet())
|
||||
.describedAs("Attribute keys")
|
||||
.contains(RETRIEVED_XATTRS);
|
||||
}
|
||||
|
||||
/**
|
||||
* This call returns all the attributes which aren't null, including
|
||||
* all the standard HTTP headers.
|
||||
*/
|
||||
@Test
|
||||
public void testListXAttrKeys() throws Throwable {
|
||||
List<String> xAttrs = headerProcessing.listXAttrs(MAGIC_PATH);
|
||||
Assertions.assertThat(xAttrs)
|
||||
.describedAs("Attribute keys")
|
||||
.contains(RETRIEVED_XATTRS);
|
||||
}
|
||||
|
||||
/**
|
||||
* Filtering is on attribute key, not header.
|
||||
*/
|
||||
@Test
|
||||
public void testGetFilteredXAttrs() throws Throwable {
|
||||
Map<String, byte[]> xAttrs = headerProcessing.getXAttrs(MAGIC_PATH,
|
||||
Lists.list(XA_MAGIC_MARKER, XA_CONTENT_LENGTH, "unknown"));
|
||||
Assertions.assertThat(xAttrs.keySet())
|
||||
.describedAs("Attribute keys")
|
||||
.containsExactlyInAnyOrder(XA_MAGIC_MARKER, XA_CONTENT_LENGTH);
|
||||
// and the values are good
|
||||
assertLongAttributeValue(
|
||||
XA_MAGIC_MARKER,
|
||||
xAttrs.get(XA_MAGIC_MARKER),
|
||||
MAGIC_LEN);
|
||||
assertLongAttributeValue(
|
||||
XA_CONTENT_LENGTH,
|
||||
xAttrs.get(XA_CONTENT_LENGTH),
|
||||
FILE_LENGTH);
|
||||
}
|
||||
|
||||
/**
|
||||
* An empty list of keys results in empty results.
|
||||
*/
|
||||
@Test
|
||||
public void testFilterEmptyXAttrs() throws Throwable {
|
||||
Map<String, byte[]> xAttrs = headerProcessing.getXAttrs(MAGIC_PATH,
|
||||
Lists.list());
|
||||
Assertions.assertThat(xAttrs.keySet())
|
||||
.describedAs("Attribute keys")
|
||||
.isEmpty();
|
||||
}
|
||||
|
||||
/**
|
||||
* Add two headers to the metadata, then verify that
|
||||
* the magic marker header is copied, but not the other header.
|
||||
*/
|
||||
@Test
|
||||
public void testMetadataCopySkipsMagicAttribute() throws Throwable {
|
||||
|
||||
final String owner = "x-header-owner";
|
||||
final String root = "root";
|
||||
CONTEXT_ACCESSORS.userHeaders.put(owner, root);
|
||||
final ObjectMetadata source = context.getContextAccessors()
|
||||
.getObjectMetadata(MAGIC_KEY);
|
||||
final Map<String, String> sourceUserMD = source.getUserMetadata();
|
||||
Assertions.assertThat(sourceUserMD.get(owner))
|
||||
.describedAs("owner header in copied MD")
|
||||
.isEqualTo(root);
|
||||
|
||||
ObjectMetadata dest = new ObjectMetadata();
|
||||
headerProcessing.cloneObjectMetadata(source, dest);
|
||||
|
||||
Assertions.assertThat(dest.getUserMetadata().get(X_HEADER_MAGIC_MARKER))
|
||||
.describedAs("Magic marker header in copied MD")
|
||||
.isNull();
|
||||
Assertions.assertThat(dest.getUserMetadata().get(owner))
|
||||
.describedAs("owner header in copied MD")
|
||||
.isEqualTo(root);
|
||||
}
|
||||
|
||||
/**
|
||||
* Assert that an XAttr has a specific long value.
|
||||
* @param key attribute key
|
||||
* @param bytes bytes of the attribute.
|
||||
* @param expected expected numeric value.
|
||||
*/
|
||||
private void assertLongAttributeValue(
|
||||
final String key,
|
||||
final byte[] bytes,
|
||||
final long expected) {
|
||||
Assertions.assertThat(extractXAttrLongValue(bytes))
|
||||
.describedAs("XAttribute " + key)
|
||||
.isNotEmpty()
|
||||
.hasValue(expected);
|
||||
}
|
||||
|
||||
/**
|
||||
* Assert that a retrieved XAttr has a specific long value.
|
||||
* @param key attribute key
|
||||
* @param expected expected numeric value.
|
||||
*/
|
||||
protected void assertAttributeHasValue(final String key,
|
||||
final long expected)
|
||||
throws IOException {
|
||||
assertLongAttributeValue(
|
||||
key,
|
||||
headerProcessing.getXAttr(MAGIC_PATH, key),
|
||||
expected);
|
||||
}
|
||||
|
||||
/**
|
||||
* Context accessor with XAttrs returned for the {@link #MAGIC_PATH}
|
||||
* path.
|
||||
*/
|
||||
private static final class XAttrContextAccessor
|
||||
implements ContextAccessors {
|
||||
|
||||
private final Map<String, String> userHeaders = new HashMap<>();
|
||||
|
||||
private long len;
|
||||
private Date date = new Date(currentTimeMillis());
|
||||
|
||||
@Override
|
||||
public Path keyToPath(final String key) {
|
||||
return new Path("s3a://bucket/" + key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String pathToKey(final Path path) {
|
||||
// key is path with leading / stripped.
|
||||
String key = path.toUri().getPath();
|
||||
return key.length() > 1 ? key.substring(1) : key;
|
||||
}
|
||||
|
||||
@Override
|
||||
public File createTempFile(final String prefix, final long size)
|
||||
throws IOException {
|
||||
throw new UnsupportedOperationException("unsppported");
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getBucketLocation() throws IOException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Path makeQualified(final Path path) {
|
||||
return path;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ObjectMetadata getObjectMetadata(final String key)
|
||||
throws IOException {
|
||||
if (MAGIC_KEY.equals(key)) {
|
||||
ObjectMetadata omd = new ObjectMetadata();
|
||||
omd.setUserMetadata(userHeaders);
|
||||
omd.setContentLength(len);
|
||||
omd.setLastModified(date);
|
||||
return omd;
|
||||
} else {
|
||||
throw new FileNotFoundException(key);
|
||||
}
|
||||
}
|
||||
|
||||
public void setHeader(String key, String val) {
|
||||
userHeaders.put(key, val);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -29,6 +29,7 @@ import java.util.stream.Collectors;
|
|||
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
|
||||
import com.amazonaws.services.s3.model.MultiObjectDeleteException;
|
||||
import org.apache.hadoop.thirdparty.com.google.common.collect.Lists;
|
||||
import com.amazonaws.services.s3.model.ObjectMetadata;
|
||||
import org.assertj.core.api.Assertions;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
@ -226,7 +227,8 @@ public class TestPartialDeleteFailures {
|
|||
}
|
||||
|
||||
|
||||
private static class MinimalContextAccessor implements ContextAccessors {
|
||||
private static final class MinimalContextAccessor
|
||||
implements ContextAccessors {
|
||||
|
||||
@Override
|
||||
public Path keyToPath(final String key) {
|
||||
|
@ -253,6 +255,12 @@ public class TestPartialDeleteFailures {
|
|||
public Path makeQualified(final Path path) {
|
||||
return path;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ObjectMetadata getObjectMetadata(final String key)
|
||||
throws IOException {
|
||||
return new ObjectMetadata();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue