HADOOP-17414. Magic committer files don't have the count of bytes written collected by spark (#2530)

This needs SPARK-33739 in the matching spark branch in order to work

Contributed by Steve Loughran.
This commit is contained in:
Steve Loughran 2021-01-26 19:30:51 +00:00 committed by GitHub
parent e2a7008d50
commit 80c7404b51
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 1391 additions and 108 deletions

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.fs.statistics;
import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.stubDurationTracker;
/**
* Interface for a source of duration tracking.
*
@ -36,12 +38,16 @@ public interface DurationTrackerFactory {
* by the given count.
*
* The expected use is within a try-with-resources clause.
*
* The default implementation returns a stub duration tracker.
* @param key statistic key prefix
* @param count #of times to increment the matching counter in this
* operation.
* @return an object to close after an operation completes.
*/
DurationTracker trackDuration(String key, long count);
default DurationTracker trackDuration(String key, long count) {
return stubDurationTracker();
}
/**
* Initiate a duration tracking operation by creating/returning

View File

@ -130,6 +130,24 @@ public final class StoreStatisticNames {
/** {@value}. */
public static final String OP_TRUNCATE = "op_truncate";
/* The XAttr API */
/** Invoke {@code getXAttrs(Path path)}: {@value}. */
public static final String OP_XATTR_GET_MAP = "op_xattr_get_map";
/** Invoke {@code getXAttr(Path, String)}: {@value}. */
public static final String OP_XATTR_GET_NAMED = "op_xattr_get_named";
/**
* Invoke {@code getXAttrs(Path path, List<String> names)}: {@value}.
*/
public static final String OP_XATTR_GET_NAMED_MAP =
"op_xattr_get_named_map";
/** Invoke {@code listXAttrs(Path path)}: {@value}. */
public static final String OP_XATTR_LIST = "op_xattr_list";
/** {@value}. */
public static final String DELEGATION_TOKENS_ISSUED
= "delegation_tokens_issued";

View File

@ -1048,4 +1048,10 @@ public final class Constants {
public static final String STORE_CAPABILITY_DIRECTORY_MARKER_ACTION_DELETE
= "fs.s3a.capability.directory.marker.action.delete";
/**
* To comply with the XAttr rules, all headers of the object retrieved
* through the getXAttr APIs have the prefix: {@value}.
*/
public static final String XA_HEADER_PREFIX = "header.";
}

View File

@ -447,7 +447,7 @@ class S3ABlockOutputStream extends OutputStream implements
final PutObjectRequest putObjectRequest = uploadData.hasFile() ?
writeOperationHelper.createPutObjectRequest(key, uploadData.getFile())
: writeOperationHelper.createPutObjectRequest(key,
uploadData.getUploadStream(), size);
uploadData.getUploadStream(), size, null);
BlockUploadProgress callback =
new BlockUploadProgress(
block, progressListener, now());

View File

@ -107,6 +107,7 @@ import org.apache.hadoop.fs.s3a.impl.CopyOutcome;
import org.apache.hadoop.fs.s3a.impl.DeleteOperation;
import org.apache.hadoop.fs.s3a.impl.DirectoryPolicy;
import org.apache.hadoop.fs.s3a.impl.DirectoryPolicyImpl;
import org.apache.hadoop.fs.s3a.impl.HeaderProcessing;
import org.apache.hadoop.fs.s3a.impl.InternalConstants;
import org.apache.hadoop.fs.s3a.impl.ListingOperationCallbacks;
import org.apache.hadoop.fs.s3a.impl.MultiObjectDeleteSupport;
@ -330,6 +331,11 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
*/
private DirectoryPolicy directoryPolicy;
/**
* Header processing for XAttr.
*/
private HeaderProcessing headerProcessing;
/**
* Context accessors for re-use.
*/
@ -456,6 +462,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
magicCommitterEnabled ? "is" : "is not");
committerIntegration = new MagicCommitIntegration(
this, magicCommitterEnabled);
// header processing for rename and magic committer
headerProcessing = new HeaderProcessing(createStoreContext());
// instantiate S3 Select support
selectBinding = new SelectBinding(writeHelper);
@ -1781,14 +1789,15 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
/**
* Low-level call to get at the object metadata.
* @param path path to the object
* @param path path to the object. This will be qualified.
* @return metadata
* @throws IOException IO and object access problems.
*/
@VisibleForTesting
@Retries.RetryTranslated
public ObjectMetadata getObjectMetadata(Path path) throws IOException {
return getObjectMetadata(path, null, invoker, null);
return getObjectMetadata(makeQualified(path), null, invoker,
"getObjectMetadata");
}
/**
@ -1800,31 +1809,17 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
* @return metadata
* @throws IOException IO and object access problems.
*/
@VisibleForTesting
@Retries.RetryTranslated
public ObjectMetadata getObjectMetadata(Path path,
private ObjectMetadata getObjectMetadata(Path path,
ChangeTracker changeTracker, Invoker changeInvoker, String operation)
throws IOException {
checkNotClosed();
return once("getObjectMetadata", path.toString(),
String key = pathToKey(path);
return once(operation, path.toString(),
() ->
// this always does a full HEAD to the object
getObjectMetadata(
pathToKey(path), changeTracker, changeInvoker, operation));
}
/**
* Get all the headers of the object of a path, if the object exists.
* @param path path to probe
* @return an immutable map of object headers.
* @throws IOException failure of the query
*/
@Retries.RetryTranslated
public Map<String, Object> getObjectHeaders(Path path) throws IOException {
LOG.debug("getObjectHeaders({})", path);
checkNotClosed();
incrementReadOperations();
return getObjectMetadata(path).getRawMetadata();
key, changeTracker, changeInvoker, operation));
}
/**
@ -2021,7 +2016,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
@Retries.RetryRaw
@VisibleForTesting
ObjectMetadata getObjectMetadata(String key) throws IOException {
return getObjectMetadata(key, null, invoker,null);
return getObjectMetadata(key, null, invoker, "getObjectMetadata");
}
/**
@ -4099,59 +4094,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
* @return a copy of {@link ObjectMetadata} with only relevant attributes
*/
private ObjectMetadata cloneObjectMetadata(ObjectMetadata source) {
// This approach may be too brittle, especially if
// in future there are new attributes added to ObjectMetadata
// that we do not explicitly call to set here
ObjectMetadata ret = newObjectMetadata(source.getContentLength());
// Possibly null attributes
// Allowing nulls to pass breaks it during later use
if (source.getCacheControl() != null) {
ret.setCacheControl(source.getCacheControl());
}
if (source.getContentDisposition() != null) {
ret.setContentDisposition(source.getContentDisposition());
}
if (source.getContentEncoding() != null) {
ret.setContentEncoding(source.getContentEncoding());
}
if (source.getContentMD5() != null) {
ret.setContentMD5(source.getContentMD5());
}
if (source.getContentType() != null) {
ret.setContentType(source.getContentType());
}
if (source.getExpirationTime() != null) {
ret.setExpirationTime(source.getExpirationTime());
}
if (source.getExpirationTimeRuleId() != null) {
ret.setExpirationTimeRuleId(source.getExpirationTimeRuleId());
}
if (source.getHttpExpiresDate() != null) {
ret.setHttpExpiresDate(source.getHttpExpiresDate());
}
if (source.getLastModified() != null) {
ret.setLastModified(source.getLastModified());
}
if (source.getOngoingRestore() != null) {
ret.setOngoingRestore(source.getOngoingRestore());
}
if (source.getRestoreExpirationTime() != null) {
ret.setRestoreExpirationTime(source.getRestoreExpirationTime());
}
if (source.getSSEAlgorithm() != null) {
ret.setSSEAlgorithm(source.getSSEAlgorithm());
}
if (source.getSSECustomerAlgorithm() != null) {
ret.setSSECustomerAlgorithm(source.getSSECustomerAlgorithm());
}
if (source.getSSECustomerKeyMd5() != null) {
ret.setSSECustomerKeyMd5(source.getSSECustomerKeyMd5());
}
for (Map.Entry<String, String> e : source.getUserMetadata().entrySet()) {
ret.addUserMetadata(e.getKey(), e.getValue());
}
getHeaderProcessing().cloneObjectMetadata(source, ret);
return ret;
}
@ -4382,6 +4326,37 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
}
}
/**
* Get header processing support.
* @return the header processing of this instance.
*/
private HeaderProcessing getHeaderProcessing() {
return headerProcessing;
}
@Override
public byte[] getXAttr(final Path path, final String name)
throws IOException {
return getHeaderProcessing().getXAttr(path, name);
}
@Override
public Map<String, byte[]> getXAttrs(final Path path) throws IOException {
return getHeaderProcessing().getXAttrs(path);
}
@Override
public Map<String, byte[]> getXAttrs(final Path path,
final List<String> names)
throws IOException {
return getHeaderProcessing().getXAttrs(path, names);
}
@Override
public List<String> listXAttrs(final Path path) throws IOException {
return getHeaderProcessing().listXAttrs(path);
}
/**
* {@inheritDoc}.
*
@ -5088,5 +5063,11 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
return S3AFileSystem.this.makeQualified(path);
}
@Override
public ObjectMetadata getObjectMetadata(final String key)
throws IOException {
return once("getObjectMetadata", key, () ->
S3AFileSystem.this.getObjectMetadata(key));
}
}
}

View File

@ -120,7 +120,7 @@ import static org.apache.hadoop.fs.s3a.Statistic.*;
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class S3AInstrumentation implements Closeable, MetricsSource,
CountersAndGauges, IOStatisticsSource, DurationTrackerFactory {
CountersAndGauges, IOStatisticsSource {
private static final Logger LOG = LoggerFactory.getLogger(
S3AInstrumentation.class);

View File

@ -157,6 +157,24 @@ public enum Statistic {
"Calls of rename()",
TYPE_COUNTER),
/* The XAttr API metrics are all durations */
INVOCATION_XATTR_GET_MAP(
StoreStatisticNames.OP_XATTR_GET_MAP,
"Calls of getXAttrs(Path path)",
TYPE_DURATION),
INVOCATION_XATTR_GET_NAMED(
StoreStatisticNames.OP_XATTR_GET_NAMED,
"Calls of getXAttr(Path, String)",
TYPE_DURATION),
INVOCATION_XATTR_GET_NAMED_MAP(
StoreStatisticNames.OP_XATTR_GET_NAMED_MAP,
"Calls of xattr()",
TYPE_DURATION),
INVOCATION_OP_XATTR_LIST(
StoreStatisticNames.OP_XATTR_LIST,
"Calls of getXAttrs(Path path, List<String> names)",
TYPE_DURATION),
/* Object IO */
OBJECT_COPY_REQUESTS(StoreStatisticNames.OBJECT_COPY_REQUESTS,
"Object copy requests",

View File

@ -25,6 +25,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import com.amazonaws.services.s3.model.AmazonS3Exception;
@ -172,12 +173,19 @@ public class WriteOperationHelper implements WriteOperations {
* @param destKey destination key
* @param inputStream source data.
* @param length size, if known. Use -1 for not known
* @param headers optional map of custom headers.
* @return the request
*/
public PutObjectRequest createPutObjectRequest(String destKey,
InputStream inputStream, long length) {
InputStream inputStream,
long length,
final Map<String, String> headers) {
ObjectMetadata objectMetadata = newObjectMetadata(length);
if (headers != null) {
objectMetadata.setUserMetadata(headers);
}
return owner.newPutObjectRequest(destKey,
newObjectMetadata(length),
objectMetadata,
inputStream);
}

View File

@ -24,6 +24,7 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import com.amazonaws.services.s3.model.CompleteMultipartUploadResult;
@ -77,10 +78,13 @@ public interface WriteOperations {
* @param destKey destination key
* @param inputStream source data.
* @param length size, if known. Use -1 for not known
* @param headers optional map of custom headers.
* @return the request
*/
PutObjectRequest createPutObjectRequest(String destKey,
InputStream inputStream, long length);
InputStream inputStream,
long length,
@Nullable Map<String, String> headers);
/**
* Create a {@link PutObjectRequest} request to upload a file.

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.fs.s3a.commit;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import static org.apache.hadoop.fs.s3a.Constants.XA_HEADER_PREFIX;
import static org.apache.hadoop.mapreduce.lib.output.PathOutputCommitterFactory.COMMITTER_FACTORY_SCHEME_PATTERN;
/**
@ -316,4 +317,17 @@ public final class CommitConstants {
public static final boolean DEFAULT_S3A_COMMITTER_GENERATE_UUID =
false;
/**
* Magic Marker header to declare final file length on magic uploads
* marker objects: {@value}.
*/
public static final String X_HEADER_MAGIC_MARKER =
"x-hadoop-s3a-magic-data-length";
/**
* XAttr name of magic marker, with "header." prefix: {@value}.
*/
public static final String XA_MAGIC_MARKER = XA_HEADER_PREFIX
+ X_HEADER_MAGIC_MARKER;
}

View File

@ -26,6 +26,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@ -39,6 +40,7 @@ import org.slf4j.LoggerFactory;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
@ -50,6 +52,7 @@ import org.apache.hadoop.fs.s3a.WriteOperationHelper;
import org.apache.hadoop.fs.s3a.commit.files.PendingSet;
import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
import org.apache.hadoop.fs.s3a.commit.files.SuccessData;
import org.apache.hadoop.fs.s3a.impl.HeaderProcessing;
import org.apache.hadoop.fs.s3a.impl.InternalConstants;
import org.apache.hadoop.fs.s3a.s3guard.BulkOperationState;
import org.apache.hadoop.fs.s3a.statistics.CommitterStatistics;
@ -606,6 +609,29 @@ public class CommitOperations implements IOStatisticsSource {
return new CommitContext(writeOperations.initiateCommitOperation(path));
}
/**
* Get the magic file length of a file.
* If the FS doesn't support the API, the attribute is missing or
* the parse to long fails, then Optional.empty() is returned.
* Static for some easier testability.
* @param fs filesystem
* @param path path
* @return either a length or None.
* @throws IOException on error
* */
public static Optional<Long> extractMagicFileLength(FileSystem fs, Path path)
throws IOException {
byte[] bytes;
try {
bytes = fs.getXAttr(path, XA_MAGIC_MARKER);
} catch (UnsupportedOperationException e) {
// FS doesn't support xattr.
LOG.debug("Filesystem {} doesn't support XAttr API", fs);
return Optional.empty();
}
return HeaderProcessing.extractXAttrLongValue(bytes);
}
/**
* Commit context.
*

View File

@ -20,7 +20,9 @@ package org.apache.hadoop.fs.s3a.commit.magic;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import com.amazonaws.services.s3.model.PartETag;
import com.amazonaws.services.s3.model.PutObjectRequest;
@ -37,6 +39,8 @@ import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.X_HEADER_MAGIC_MARKER;
/**
* Put tracker for Magic commits.
* <p>Important</p>: must not directly or indirectly import a class which
@ -122,13 +126,6 @@ public class MagicCommitTracker extends PutTracker {
Preconditions.checkArgument(!parts.isEmpty(),
"No uploaded parts to save");
// put a 0-byte file with the name of the original under-magic path
PutObjectRequest originalDestPut = writer.createPutObjectRequest(
originalDestKey,
new ByteArrayInputStream(EMPTY),
0);
writer.uploadObject(originalDestPut);
// build the commit summary
SinglePendingCommit commitData = new SinglePendingCommit();
commitData.touch(System.currentTimeMillis());
@ -150,9 +147,19 @@ public class MagicCommitTracker extends PutTracker {
PutObjectRequest put = writer.createPutObjectRequest(
pendingPartKey,
new ByteArrayInputStream(bytes),
bytes.length);
bytes.length, null);
writer.uploadObject(put);
// Add the final file length as a header
Map<String, String> headers = new HashMap<>();
headers.put(X_HEADER_MAGIC_MARKER, Long.toString(bytesWritten));
// now put a 0-byte file with the name of the original under-magic path
PutObjectRequest originalDestPut = writer.createPutObjectRequest(
originalDestKey,
new ByteArrayInputStream(EMPTY),
0,
headers);
writer.uploadObject(originalDestPut);
return false;
}

View File

@ -22,6 +22,8 @@ import java.io.File;
import java.io.IOException;
import java.nio.file.AccessDeniedException;
import com.amazonaws.services.s3.model.ObjectMetadata;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.Retries;
@ -81,4 +83,15 @@ public interface ContextAccessors {
* @return possibly new path.
*/
Path makeQualified(Path path);
/**
* Retrieve the object metadata.
*
* @param key key to retrieve.
* @return metadata
* @throws IOException IO and object access problems.
*/
@Retries.RetryTranslated
ObjectMetadata getObjectMetadata(String key) throws IOException;
}

View File

@ -0,0 +1,500 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.impl;
import javax.annotation.Nullable;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.TreeMap;
import com.amazonaws.services.s3.Headers;
import com.amazonaws.services.s3.model.ObjectMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.Statistic;
import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext;
import static org.apache.hadoop.fs.s3a.Constants.XA_HEADER_PREFIX;
import static org.apache.hadoop.fs.s3a.Statistic.INVOCATION_OP_XATTR_LIST;
import static org.apache.hadoop.fs.s3a.Statistic.INVOCATION_XATTR_GET_MAP;
import static org.apache.hadoop.fs.s3a.Statistic.INVOCATION_XATTR_GET_NAMED;
import static org.apache.hadoop.fs.s3a.Statistic.INVOCATION_XATTR_GET_NAMED_MAP;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.X_HEADER_MAGIC_MARKER;
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration;
/**
* Part of the S3A FS where object headers are
* processed.
* Implements all the various XAttr read operations.
* Those APIs all expect byte arrays back.
* Metadata cloning is also implemented here, so as
* to stay in sync with custom header logic.
*
* The standard header names are extracted from the AWS SDK.
* The S3A connector does not (currently) support setting them,
* though it would be possible to do so through the createFile()
* builder API.
*/
public class HeaderProcessing extends AbstractStoreOperation {
private static final Logger LOG = LoggerFactory.getLogger(
HeaderProcessing.class);
/**
* An empty buffer.
*/
private static final byte[] EMPTY = new byte[0];
/**
* Standard HTTP header found on some S3 objects: {@value}.
*/
public static final String XA_CACHE_CONTROL =
XA_HEADER_PREFIX + Headers.CACHE_CONTROL;
/**
* Standard HTTP header found on some S3 objects: {@value}.
*/
public static final String XA_CONTENT_DISPOSITION =
XA_HEADER_PREFIX + Headers.CONTENT_DISPOSITION;
/**
* Standard HTTP header found on some S3 objects: {@value}.
*/
public static final String XA_CONTENT_ENCODING =
XA_HEADER_PREFIX + Headers.CONTENT_ENCODING;
/**
* Standard HTTP header found on some S3 objects: {@value}.
*/
public static final String XA_CONTENT_LANGUAGE =
XA_HEADER_PREFIX + Headers.CONTENT_LANGUAGE;
/**
* Length XAttr: {@value}.
*/
public static final String XA_CONTENT_LENGTH =
XA_HEADER_PREFIX + Headers.CONTENT_LENGTH;
/**
* Standard HTTP header found on some S3 objects: {@value}.
*/
public static final String XA_CONTENT_MD5 =
XA_HEADER_PREFIX + Headers.CONTENT_MD5;
/**
* Content range: {@value}.
* This is returned on GET requests with ranges.
*/
public static final String XA_CONTENT_RANGE =
XA_HEADER_PREFIX + Headers.CONTENT_RANGE;
/**
* Content type: may be set when uploading.
* {@value}.
*/
public static final String XA_CONTENT_TYPE =
XA_HEADER_PREFIX + Headers.CONTENT_TYPE;
/**
* Etag Header {@value}.
* Also accessible via {@code ObjectMetadata.getEtag()}, where
* it can be retrieved via {@code getFileChecksum(path)} if
* the S3A connector is enabled.
*/
public static final String XA_ETAG = XA_HEADER_PREFIX + Headers.ETAG;
/**
* last modified XAttr: {@value}.
*/
public static final String XA_LAST_MODIFIED =
XA_HEADER_PREFIX + Headers.LAST_MODIFIED;
/* AWS Specific Headers. May not be found on other S3 endpoints. */
/**
* object archive status; empty if not on S3 Glacier
* (i.e all normal files should be non-archived as
* S3A and applications don't handle archived data)
* Value {@value}.
*/
public static final String XA_ARCHIVE_STATUS =
XA_HEADER_PREFIX + Headers.ARCHIVE_STATUS;
/**
* Object legal hold status. {@value}.
*/
public static final String XA_OBJECT_LOCK_LEGAL_HOLD_STATUS =
XA_HEADER_PREFIX + Headers.OBJECT_LOCK_LEGAL_HOLD_STATUS;
/**
* Object lock mode. {@value}.
*/
public static final String XA_OBJECT_LOCK_MODE =
XA_HEADER_PREFIX + Headers.OBJECT_LOCK_MODE;
/**
* ISO8601 expiry date of object lock hold. {@value}.
*/
public static final String XA_OBJECT_LOCK_RETAIN_UNTIL_DATE =
XA_HEADER_PREFIX + Headers.OBJECT_LOCK_RETAIN_UNTIL_DATE;
/**
* Replication status for cross-region replicated objects. {@value}.
*/
public static final String XA_OBJECT_REPLICATION_STATUS =
XA_HEADER_PREFIX + Headers.OBJECT_REPLICATION_STATUS;
/**
* Version ID; empty for non-versioned buckets/data. {@value}.
*/
public static final String XA_S3_VERSION_ID =
XA_HEADER_PREFIX + Headers.S3_VERSION_ID;
/**
* The server-side encryption algorithm to use
* with AWS-managed keys: {@value}.
*/
public static final String XA_SERVER_SIDE_ENCRYPTION =
XA_HEADER_PREFIX + Headers.SERVER_SIDE_ENCRYPTION;
/**
* Storage Class XAttr: {@value}.
*/
public static final String XA_STORAGE_CLASS =
XA_HEADER_PREFIX + Headers.STORAGE_CLASS;
/**
* Standard headers which are retrieved from HEAD Requests
* and set as XAttrs if the response included the relevant header.
*/
public static final String[] XA_STANDARD_HEADERS = {
/* HTTP standard headers */
XA_CACHE_CONTROL,
XA_CONTENT_DISPOSITION,
XA_CONTENT_ENCODING,
XA_CONTENT_LANGUAGE,
XA_CONTENT_LENGTH,
XA_CONTENT_MD5,
XA_CONTENT_RANGE,
XA_CONTENT_TYPE,
XA_ETAG,
XA_LAST_MODIFIED,
/* aws headers */
XA_ARCHIVE_STATUS,
XA_OBJECT_LOCK_LEGAL_HOLD_STATUS,
XA_OBJECT_LOCK_MODE,
XA_OBJECT_LOCK_RETAIN_UNTIL_DATE,
XA_OBJECT_REPLICATION_STATUS,
XA_S3_VERSION_ID,
XA_SERVER_SIDE_ENCRYPTION,
XA_STORAGE_CLASS,
};
/**
* Content type of generic binary objects.
* This is the default for uploaded objects.
*/
public static final String CONTENT_TYPE_OCTET_STREAM =
"application/octet-stream";
/**
* XML content type : {@value}.
* This is application/xml, not text/xml, and is
* what a HEAD of / returns as the type of a root path.
*/
public static final String CONTENT_TYPE_APPLICATION_XML =
"application/xml";
/**
* Construct.
* @param storeContext store context.
*/
public HeaderProcessing(final StoreContext storeContext) {
super(storeContext);
}
/**
* Query the store, get all the headers into a map. Each Header
* has the "header." prefix.
* Caller must have read access.
* The value of each header is the string value of the object
* UTF-8 encoded.
* @param path path of object.
* @param statistic statistic to use for duration tracking.
* @return the headers
* @throws IOException failure, including file not found.
*/
private Map<String, byte[]> retrieveHeaders(
final Path path,
final Statistic statistic) throws IOException {
StoreContext context = getStoreContext();
ContextAccessors accessors = context.getContextAccessors();
String objectKey = accessors.pathToKey(path);
ObjectMetadata md;
String symbol = statistic.getSymbol();
S3AStatisticsContext instrumentation = context.getInstrumentation();
try {
md = trackDuration(instrumentation, symbol, () ->
accessors.getObjectMetadata(objectKey));
} catch (FileNotFoundException e) {
// no entry. It could be a directory, so try again.
md = trackDuration(instrumentation, symbol, () ->
accessors.getObjectMetadata(objectKey + "/"));
}
// all user metadata
Map<String, String> rawHeaders = md.getUserMetadata();
Map<String, byte[]> headers = new TreeMap<>();
rawHeaders.forEach((key, value) ->
headers.put(XA_HEADER_PREFIX + key, encodeBytes(value)));
// and add the usual content length &c, if set
maybeSetHeader(headers, XA_CACHE_CONTROL,
md.getCacheControl());
maybeSetHeader(headers, XA_CONTENT_DISPOSITION,
md.getContentDisposition());
maybeSetHeader(headers, XA_CONTENT_ENCODING,
md.getContentEncoding());
maybeSetHeader(headers, XA_CONTENT_LANGUAGE,
md.getContentLanguage());
maybeSetHeader(headers, XA_CONTENT_LENGTH,
md.getContentLength());
maybeSetHeader(headers, XA_CONTENT_MD5,
md.getContentMD5());
maybeSetHeader(headers, XA_CONTENT_RANGE,
md.getContentRange());
maybeSetHeader(headers, XA_CONTENT_TYPE,
md.getContentType());
maybeSetHeader(headers, XA_ETAG,
md.getETag());
maybeSetHeader(headers, XA_LAST_MODIFIED,
md.getLastModified());
// AWS custom headers
maybeSetHeader(headers, XA_ARCHIVE_STATUS,
md.getArchiveStatus());
maybeSetHeader(headers, XA_OBJECT_LOCK_LEGAL_HOLD_STATUS,
md.getObjectLockLegalHoldStatus());
maybeSetHeader(headers, XA_OBJECT_LOCK_MODE,
md.getObjectLockMode());
maybeSetHeader(headers, XA_OBJECT_LOCK_RETAIN_UNTIL_DATE,
md.getObjectLockRetainUntilDate());
maybeSetHeader(headers, XA_OBJECT_REPLICATION_STATUS,
md.getReplicationStatus());
maybeSetHeader(headers, XA_S3_VERSION_ID,
md.getVersionId());
maybeSetHeader(headers, XA_SERVER_SIDE_ENCRYPTION,
md.getSSEAlgorithm());
maybeSetHeader(headers, XA_STORAGE_CLASS,
md.getStorageClass());
maybeSetHeader(headers, XA_STORAGE_CLASS,
md.getReplicationStatus());
return headers;
}
/**
* Set a header if the value is non null.
*
* @param headers header map
* @param name header name
* @param value value to encode.
*/
private void maybeSetHeader(
final Map<String, byte[]> headers,
final String name,
final Object value) {
if (value != null) {
headers.put(name, encodeBytes(value));
}
}
/**
* Stringify an object and return its bytes in UTF-8 encoding.
* @param s source
* @return encoded object or an empty buffer
*/
public static byte[] encodeBytes(@Nullable Object s) {
return s == null
? EMPTY
: s.toString().getBytes(StandardCharsets.UTF_8);
}
/**
* Get the string value from the bytes.
* if null : return null, otherwise the UTF-8 decoded
* bytes.
* @param bytes source bytes
* @return decoded value
*/
public static String decodeBytes(byte[] bytes) {
return bytes == null
? null
: new String(bytes, StandardCharsets.UTF_8);
}
/**
* Get an XAttr name and value for a file or directory.
* @param path Path to get extended attribute
* @param name XAttr name.
* @return byte[] XAttr value or null
* @throws IOException IO failure
*/
public byte[] getXAttr(Path path, String name) throws IOException {
return retrieveHeaders(path, INVOCATION_XATTR_GET_NAMED).get(name);
}
/**
* See {@code FileSystem.getXAttrs(path}.
*
* @param path Path to get extended attributes
* @return Map describing the XAttrs of the file or directory
* @throws IOException IO failure
*/
public Map<String, byte[]> getXAttrs(Path path) throws IOException {
return retrieveHeaders(path, INVOCATION_XATTR_GET_MAP);
}
/**
* See {@code FileSystem.listXAttrs(path)}.
* @param path Path to get extended attributes
* @return List of supported XAttrs
* @throws IOException IO failure
*/
public List<String> listXAttrs(final Path path) throws IOException {
return new ArrayList<>(retrieveHeaders(path, INVOCATION_OP_XATTR_LIST)
.keySet());
}
/**
* See {@code FileSystem.getXAttrs(path, names}.
* @param path Path to get extended attributes
* @param names XAttr names.
* @return Map describing the XAttrs of the file or directory
* @throws IOException IO failure
*/
public Map<String, byte[]> getXAttrs(Path path, List<String> names)
throws IOException {
Map<String, byte[]> headers = retrieveHeaders(path,
INVOCATION_XATTR_GET_NAMED_MAP);
Map<String, byte[]> result = new TreeMap<>();
headers.entrySet().stream()
.filter(entry -> names.contains(entry.getKey()))
.forEach(entry -> result.put(entry.getKey(), entry.getValue()));
return result;
}
/**
* Convert an XAttr byte array to a long.
* testability.
* @param data data to parse
* @return either a length or none
*/
public static Optional<Long> extractXAttrLongValue(byte[] data) {
String xAttr;
xAttr = HeaderProcessing.decodeBytes(data);
if (StringUtils.isNotEmpty(xAttr)) {
try {
long l = Long.parseLong(xAttr);
if (l >= 0) {
return Optional.of(l);
}
} catch (NumberFormatException ex) {
LOG.warn("Not a number: {}", xAttr, ex);
}
}
// missing/empty header or parse failure.
return Optional.empty();
}
/**
* Creates a copy of the passed {@link ObjectMetadata}.
* Does so without using the {@link ObjectMetadata#clone()} method,
* to avoid copying unnecessary headers.
* This operation does not copy the {@code X_HEADER_MAGIC_MARKER}
* header to avoid confusion. If a marker file is renamed,
* it loses information about any remapped file.
* If new fields are added to ObjectMetadata which are not
* present in the user metadata headers, they will not be picked
* up or cloned unless this operation is updated.
* @param source the {@link ObjectMetadata} to copy
* @param dest the metadata to update; this is the return value.
*/
public void cloneObjectMetadata(ObjectMetadata source,
ObjectMetadata dest) {
// Possibly null attributes
// Allowing nulls to pass breaks it during later use
if (source.getCacheControl() != null) {
dest.setCacheControl(source.getCacheControl());
}
if (source.getContentDisposition() != null) {
dest.setContentDisposition(source.getContentDisposition());
}
if (source.getContentEncoding() != null) {
dest.setContentEncoding(source.getContentEncoding());
}
if (source.getContentMD5() != null) {
dest.setContentMD5(source.getContentMD5());
}
if (source.getContentType() != null) {
dest.setContentType(source.getContentType());
}
if (source.getExpirationTime() != null) {
dest.setExpirationTime(source.getExpirationTime());
}
if (source.getExpirationTimeRuleId() != null) {
dest.setExpirationTimeRuleId(source.getExpirationTimeRuleId());
}
if (source.getHttpExpiresDate() != null) {
dest.setHttpExpiresDate(source.getHttpExpiresDate());
}
if (source.getLastModified() != null) {
dest.setLastModified(source.getLastModified());
}
if (source.getOngoingRestore() != null) {
dest.setOngoingRestore(source.getOngoingRestore());
}
if (source.getRestoreExpirationTime() != null) {
dest.setRestoreExpirationTime(source.getRestoreExpirationTime());
}
if (source.getSSEAlgorithm() != null) {
dest.setSSEAlgorithm(source.getSSEAlgorithm());
}
if (source.getSSECustomerAlgorithm() != null) {
dest.setSSECustomerAlgorithm(source.getSSECustomerAlgorithm());
}
if (source.getSSECustomerKeyMd5() != null) {
dest.setSSECustomerKeyMd5(source.getSSECustomerKeyMd5());
}
// copy user metadata except the magic marker header.
source.getUserMetadata().entrySet().stream()
.filter(e -> !e.getKey().equals(X_HEADER_MAGIC_MARKER))
.forEach(e -> dest.addUserMetadata(e.getKey(), e.getValue()));
}
}

View File

@ -21,11 +21,12 @@ package org.apache.hadoop.fs.s3a.statistics;
import java.time.Duration;
import org.apache.hadoop.fs.s3a.Statistic;
import org.apache.hadoop.fs.statistics.DurationTrackerFactory;
/**
* This is the foundational API for collecting S3A statistics.
*/
public interface CountersAndGauges {
public interface CountersAndGauges extends DurationTrackerFactory {
/**
* Increment a specific counter.

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
import org.apache.hadoop.fs.s3a.statistics.S3AMultipartUploaderStatistics;
import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext;
import org.apache.hadoop.fs.s3a.statistics.StatisticsFromAwsSdk;
import org.apache.hadoop.fs.statistics.DurationTracker;
/**
* An S3A statistics context which is bonded to a
@ -210,6 +211,11 @@ public class BondedS3AStatisticsContext implements S3AStatisticsContext {
return new S3AMultipartUploaderStatisticsImpl(this::incrementCounter);
}
@Override
public DurationTracker trackDuration(final String key, final long count) {
return getInstrumentation().trackDuration(key, count);
}
/**
* This is the interface which an integration source must implement
* for the integration.

View File

@ -1337,6 +1337,16 @@ On `close()`, summary data would be written to the file
`/results/latest/__magic/job400_1/task_01_01/latest.orc.lzo.pending`.
This would contain the upload ID and all the parts and etags of uploaded data.
A marker file is also created, so that code which verifies that a newly created file
exists does not fail.
1. These marker files are zero bytes long.
1. They declare the full length of the final file in the HTTP header
`x-hadoop-s3a-magic-data-length`.
1. A call to `getXAttr("header.x-hadoop-s3a-magic-data-length")` will return a
string containing the number of bytes in the data uploaded.
This is needed so that the Spark write-tracking code can report how much data
has been created.
#### Task commit

View File

@ -360,6 +360,7 @@ However, it has extra requirements of the filesystem
1. The S3A client must be configured to recognize interactions
with the magic directories and treat them specially.
Now that Amazon S3 is consistent, the magic committer is enabled by default.
It's also not been field tested to the extent of Netflix's committer; consider
it the least mature of the committers.

View File

@ -29,6 +29,7 @@ import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.GetBucketEncryptionResult;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PutObjectRequest;
import org.assertj.core.api.Assertions;
import org.junit.Assume;
import org.junit.Test;
@ -47,6 +48,7 @@ import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM;
import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_KEY;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.XA_ETAG;
import static org.hamcrest.Matchers.nullValue;
/**
@ -171,6 +173,9 @@ public class ITestS3AMiscOperations extends AbstractS3ATestBase {
assertNotEquals("file 1 checksum", 0, checksum1.getLength());
assertEquals("checksums of empty files", checksum1,
fs.getFileChecksum(touchFile("file2"), 0));
Assertions.assertThat(fs.getXAttr(file1, XA_ETAG))
.describedAs("etag from xattr")
.isEqualTo(checksum1.getBytes());
}
/**
@ -222,6 +227,9 @@ public class ITestS3AMiscOperations extends AbstractS3ATestBase {
createFile(fs, file4, true,
"hello, world".getBytes(StandardCharsets.UTF_8));
assertNotEquals(checksum2, fs.getFileChecksum(file4, 0));
Assertions.assertThat(fs.getXAttr(file3, XA_ETAG))
.describedAs("etag from xattr")
.isEqualTo(checksum1.getBytes());
}
/**

View File

@ -272,7 +272,9 @@ public class ITestS3ARemoteFileChanged extends AbstractS3ATestBase {
@Override
public void teardown() throws Exception {
// restore the s3 client so there's no mocking interfering with the teardown
originalS3Client.ifPresent(fs::setAmazonS3Client);
if (fs != null) {
originalS3Client.ifPresent(fs::setAmazonS3Client);
}
super.teardown();
}

View File

@ -1335,11 +1335,12 @@ public abstract class AbstractITCommitProtocol extends AbstractCommitITest {
= outputFormat.getRecordWriter(tContext);
IntWritable iw = new IntWritable(1);
recordWriter.write(iw, iw);
long expectedLength = 4;
Path dest = recordWriter.getDest();
validateTaskAttemptPathDuringWrite(dest);
validateTaskAttemptPathDuringWrite(dest, expectedLength);
recordWriter.close(tContext);
// at this point
validateTaskAttemptPathAfterWrite(dest);
validateTaskAttemptPathAfterWrite(dest, expectedLength);
assertTrue("Committer does not have data to commit " + committer,
committer.needsTaskCommit(tContext));
commitTask(committer, tContext);
@ -1750,9 +1751,11 @@ public abstract class AbstractITCommitProtocol extends AbstractCommitITest {
* Validate the path of a file being written to during the write
* itself.
* @param p path
* @param expectedLength
* @throws IOException IO failure
*/
protected void validateTaskAttemptPathDuringWrite(Path p) throws IOException {
protected void validateTaskAttemptPathDuringWrite(Path p,
final long expectedLength) throws IOException {
}
@ -1760,9 +1763,11 @@ public abstract class AbstractITCommitProtocol extends AbstractCommitITest {
* Validate the path of a file being written to after the write
* operation has completed.
* @param p path
* @param expectedLength
* @throws IOException IO failure
*/
protected void validateTaskAttemptPathAfterWrite(Path p) throws IOException {
protected void validateTaskAttemptPathAfterWrite(Path p,
final long expectedLength) throws IOException {
}

View File

@ -26,6 +26,7 @@ import java.util.List;
import com.amazonaws.services.s3.model.PartETag;
import org.apache.hadoop.thirdparty.com.google.common.collect.Lists;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -52,6 +53,7 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import static org.apache.hadoop.fs.contract.ContractTestUtils.*;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
import static org.apache.hadoop.fs.s3a.commit.CommitOperations.extractMagicFileLength;
import static org.apache.hadoop.fs.s3a.commit.CommitUtils.*;
import static org.apache.hadoop.fs.s3a.commit.MagicCommitPaths.*;
import static org.apache.hadoop.fs.s3a.Constants.*;
@ -216,13 +218,13 @@ public class ITestCommitOperations extends AbstractCommitITest {
@Test
public void testCommitEmptyFile() throws Throwable {
describe("create then commit an empty file");
describe("create then commit an empty magic file");
createCommitAndVerify("empty-commit.txt", new byte[0]);
}
@Test
public void testCommitSmallFile() throws Throwable {
describe("create then commit an empty file");
describe("create then commit a small magic file");
createCommitAndVerify("small-commit.txt", DATASET);
}
@ -288,6 +290,64 @@ public class ITestCommitOperations extends AbstractCommitITest {
commit("child.txt", pendingChildPath, expectedDestPath, 0, 0);
}
/**
* Verify that that when a marker file is renamed, its
* magic marker attribute is lost.
*/
@Test
public void testMarkerFileRename()
throws Exception {
S3AFileSystem fs = getFileSystem();
Path destFile = methodPath();
Path destDir = destFile.getParent();
fs.delete(destDir, true);
Path magicDest = makeMagic(destFile);
Path magicDir = magicDest.getParent();
fs.mkdirs(magicDir);
// use the builder API to verify it works exactly the
// same.
try (FSDataOutputStream stream = fs.createFile(magicDest)
.overwrite(true)
.recursive()
.build()) {
assertIsMagicStream(stream);
stream.write(DATASET);
}
Path magic2 = new Path(magicDir, "magic2");
// rename the marker
fs.rename(magicDest, magic2);
// the renamed file has no header
Assertions.assertThat(extractMagicFileLength(fs, magic2))
.describedAs("XAttribute " + XA_MAGIC_MARKER + " of " + magic2)
.isEmpty();
// abort the upload, which is driven by the .pending files
// there must be 1 deleted file; during test debugging with aborted
// runs there may be more.
Assertions.assertThat(newCommitOperations()
.abortPendingUploadsUnderPath(destDir))
.describedAs("Aborting all pending uploads under %s", destDir)
.isGreaterThanOrEqualTo(1);
}
/**
* Assert that an output stream is magic.
* @param stream stream to probe.
*/
protected void assertIsMagicStream(final FSDataOutputStream stream) {
Assertions.assertThat(stream.hasCapability(STREAM_CAPABILITY_MAGIC_OUTPUT))
.describedAs("Stream capability %s in stream %s",
STREAM_CAPABILITY_MAGIC_OUTPUT, stream)
.isTrue();
}
/**
* Create a file through the magic commit mechanism.
* @param filename file to create (with __magic path.)
* @param data data to write
* @throws Exception failure
*/
private void createCommitAndVerify(String filename, byte[] data)
throws Exception {
S3AFileSystem fs = getFileSystem();
@ -295,19 +355,30 @@ public class ITestCommitOperations extends AbstractCommitITest {
fs.delete(destFile.getParent(), true);
Path magicDest = makeMagic(destFile);
assertPathDoesNotExist("Magic file should not exist", magicDest);
long dataSize = data != null ? data.length : 0;
try(FSDataOutputStream stream = fs.create(magicDest, true)) {
assertTrue(stream.hasCapability(STREAM_CAPABILITY_MAGIC_OUTPUT));
if (data != null && data.length > 0) {
assertIsMagicStream(stream);
if (dataSize > 0) {
stream.write(data);
}
stream.close();
}
FileStatus status = getFileStatusEventually(fs, magicDest,
CONSISTENCY_WAIT);
assertEquals("Non empty marker file: " + status, 0, status.getLen());
assertEquals("Magic marker file is not zero bytes: " + status,
0, 0);
Assertions.assertThat(extractMagicFileLength(fs,
magicDest))
.describedAs("XAttribute " + XA_MAGIC_MARKER + " of " + magicDest)
.isNotEmpty()
.hasValue(dataSize);
commit(filename, destFile, HIGH_THROTTLE, 0);
verifyFileContents(fs, destFile, data);
// the destination file doesn't have the attribute
Assertions.assertThat(extractMagicFileLength(fs,
destFile))
.describedAs("XAttribute " + XA_MAGIC_MARKER + " of " + destFile)
.isEmpty();
}
/**

View File

@ -20,17 +20,21 @@ package org.apache.hadoop.fs.s3a.commit.magic;
import java.io.IOException;
import java.net.URI;
import java.util.List;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.commit.AbstractITCommitProtocol;
import org.apache.hadoop.fs.s3a.commit.AbstractS3ACommitter;
import org.apache.hadoop.fs.s3a.commit.CommitConstants;
import org.apache.hadoop.fs.s3a.commit.CommitOperations;
import org.apache.hadoop.fs.s3a.commit.CommitUtils;
import org.apache.hadoop.fs.s3a.commit.CommitterFaultInjection;
import org.apache.hadoop.fs.s3a.commit.CommitterFaultInjectionImpl;
@ -39,6 +43,7 @@ import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import static org.apache.hadoop.fs.s3a.S3AUtils.listAndFilter;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
import static org.hamcrest.CoreMatchers.containsString;
@ -107,18 +112,44 @@ public class ITestMagicCommitProtocol extends AbstractITCommitProtocol {
return new CommitterWithFailedThenSucceed(getOutDir(), tContext);
}
protected void validateTaskAttemptPathDuringWrite(Path p) throws IOException {
protected void validateTaskAttemptPathDuringWrite(Path p,
final long expectedLength) throws IOException {
String pathStr = p.toString();
assertTrue("not magic " + pathStr,
pathStr.contains(MAGIC));
assertPathDoesNotExist("task attempt visible", p);
}
protected void validateTaskAttemptPathAfterWrite(Path p) throws IOException {
FileStatus st = getFileSystem().getFileStatus(p);
assertEquals("file length in " + st, 0, st.getLen());
Path pendingFile = new Path(p.toString() + PENDING_SUFFIX);
protected void validateTaskAttemptPathAfterWrite(Path marker,
final long expectedLength) throws IOException {
// the pending file exists
Path pendingFile = new Path(marker.toString() + PENDING_SUFFIX);
assertPathExists("pending file", pendingFile);
S3AFileSystem fs = getFileSystem();
// THIS SEQUENCE MUST BE RUN IN ORDER ON A S3GUARDED
// STORE
// if you list the parent dir and find the marker, it
// is really 0 bytes long
String name = marker.getName();
List<LocatedFileStatus> filtered = listAndFilter(fs,
marker.getParent(), false,
(path) -> path.getName().equals(name));
Assertions.assertThat(filtered)
.hasSize(1);
Assertions.assertThat(filtered.get(0))
.matches(lst -> lst.getLen() == 0,
"Listing should return 0 byte length");
// marker file is empty
FileStatus st = fs.getFileStatus(marker);
assertEquals("file length in " + st, 0, st.getLen());
// xattr header
Assertions.assertThat(CommitOperations.extractMagicFileLength(fs,
marker))
.describedAs("XAttribute " + XA_MAGIC_MARKER)
.isNotEmpty()
.hasValue(expectedLength);
}
/**

View File

@ -23,6 +23,7 @@ import java.util.UUID;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.ContractTestUtils;
@ -113,14 +114,20 @@ public class ITestStagingCommitProtocol extends AbstractITCommitProtocol {
IOException.class);
}
protected void validateTaskAttemptPathDuringWrite(Path p) throws IOException {
protected void validateTaskAttemptPathDuringWrite(Path p,
final long expectedLength) throws IOException {
// this is expected to be local FS
ContractTestUtils.assertPathExists(getLocalFS(), "task attempt", p);
}
protected void validateTaskAttemptPathAfterWrite(Path p) throws IOException {
protected void validateTaskAttemptPathAfterWrite(Path p,
final long expectedLength) throws IOException {
// this is expected to be local FS
ContractTestUtils.assertPathExists(getLocalFS(), "task attempt", p);
// this is expected to be local FS
FileSystem localFS = getLocalFS();
ContractTestUtils.assertPathExists(localFS, "task attempt", p);
FileStatus st = localFS.getFileStatus(p);
assertEquals("file length in " + st, expectedLength, st.getLen());
}
protected FileSystem getLocalFS() throws IOException {

View File

@ -0,0 +1,219 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.impl;
import java.io.FileNotFoundException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.assertj.core.api.AbstractStringAssert;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest;
import static org.apache.hadoop.fs.s3a.Statistic.INVOCATION_OP_XATTR_LIST;
import static org.apache.hadoop.fs.s3a.Statistic.INVOCATION_XATTR_GET_MAP;
import static org.apache.hadoop.fs.s3a.Statistic.INVOCATION_XATTR_GET_NAMED;
import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.CONTENT_TYPE_OCTET_STREAM;
import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.CONTENT_TYPE_APPLICATION_XML;
import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.XA_CONTENT_LENGTH;
import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.XA_CONTENT_TYPE;
import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.XA_STANDARD_HEADERS;
import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.decodeBytes;
import static org.apache.hadoop.fs.s3a.performance.OperationCost.CREATE_FILE_OVERWRITE;
/**
* Invoke XAttr API calls against objects in S3 and validate header
* extraction.
*/
public class ITestXAttrCost extends AbstractS3ACostTest {
private static final Logger LOG =
LoggerFactory.getLogger(ITestXAttrCost.class);
private static final int GET_METADATA_ON_OBJECT = 1;
private static final int GET_METADATA_ON_DIR = GET_METADATA_ON_OBJECT * 2;
public ITestXAttrCost() {
// no parameterization here
super(false, true, false);
}
@Test
public void testXAttrRoot() throws Throwable {
describe("Test xattr on root");
Path root = new Path("/");
S3AFileSystem fs = getFileSystem();
Map<String, byte[]> xAttrs = verifyMetrics(
() -> fs.getXAttrs(root),
with(INVOCATION_XATTR_GET_MAP, GET_METADATA_ON_OBJECT));
logXAttrs(xAttrs);
List<String> headerList = verifyMetrics(() ->
fs.listXAttrs(root),
with(INVOCATION_OP_XATTR_LIST, GET_METADATA_ON_OBJECT));
// verify this contains all the standard markers,
// but not the magic marker header
Assertions.assertThat(headerList)
.describedAs("Headers on root object")
.containsOnly(
XA_CONTENT_LENGTH,
XA_CONTENT_TYPE);
assertHeaderEntry(xAttrs, XA_CONTENT_TYPE)
.isEqualTo(CONTENT_TYPE_APPLICATION_XML);
}
/**
* Log the attributes as strings.
* @param xAttrs map of attributes
*/
private void logXAttrs(final Map<String, byte[]> xAttrs) {
xAttrs.forEach((k, v) ->
LOG.info("{} has bytes[{}] => \"{}\"",
k, v.length, decodeBytes(v)));
}
@Test
public void testXAttrFile() throws Throwable {
describe("Test xattr on a file");
Path testFile = methodPath();
create(testFile, true, CREATE_FILE_OVERWRITE);
S3AFileSystem fs = getFileSystem();
Map<String, byte[]> xAttrs = verifyMetrics(() ->
fs.getXAttrs(testFile),
with(INVOCATION_XATTR_GET_MAP, GET_METADATA_ON_OBJECT));
logXAttrs(xAttrs);
assertHeaderEntry(xAttrs, XA_CONTENT_LENGTH)
.isEqualTo("0");
// get the list of supported headers
List<String> headerList = verifyMetrics(
() -> fs.listXAttrs(testFile),
with(INVOCATION_OP_XATTR_LIST, GET_METADATA_ON_OBJECT));
// verify this contains all the standard markers,
// but not the magic marker header
Assertions.assertThat(headerList)
.describedAs("Supported headers")
.containsAnyElementsOf(Arrays.asList(XA_STANDARD_HEADERS));
// ask for one header and validate its value
byte[] bytes = verifyMetrics(() ->
fs.getXAttr(testFile, XA_CONTENT_LENGTH),
with(INVOCATION_XATTR_GET_NAMED, GET_METADATA_ON_OBJECT));
assertHeader(XA_CONTENT_LENGTH, bytes)
.isEqualTo("0");
assertHeaderEntry(xAttrs, XA_CONTENT_TYPE)
.isEqualTo(CONTENT_TYPE_OCTET_STREAM);
}
/**
* Directory attributes can be retrieved, but they take two HEAD requests.
* @throws Throwable
*/
@Test
public void testXAttrDir() throws Throwable {
describe("Test xattr on a dir");
S3AFileSystem fs = getFileSystem();
Path dir = methodPath();
fs.mkdirs(dir);
Map<String, byte[]> xAttrs = verifyMetrics(() ->
fs.getXAttrs(dir),
with(INVOCATION_XATTR_GET_MAP, GET_METADATA_ON_DIR));
logXAttrs(xAttrs);
assertHeaderEntry(xAttrs, XA_CONTENT_LENGTH)
.isEqualTo("0");
// get the list of supported headers
List<String> headerList = verifyMetrics(
() -> fs.listXAttrs(dir),
with(INVOCATION_OP_XATTR_LIST, GET_METADATA_ON_DIR));
// verify this contains all the standard markers,
// but not the magic marker header
Assertions.assertThat(headerList)
.describedAs("Supported headers")
.containsAnyElementsOf(Arrays.asList(XA_STANDARD_HEADERS));
// ask for one header and validate its value
byte[] bytes = verifyMetrics(() ->
fs.getXAttr(dir, XA_CONTENT_LENGTH),
with(INVOCATION_XATTR_GET_NAMED, GET_METADATA_ON_DIR));
assertHeader(XA_CONTENT_LENGTH, bytes)
.isEqualTo("0");
assertHeaderEntry(xAttrs, XA_CONTENT_TYPE)
.isEqualTo(CONTENT_TYPE_OCTET_STREAM);
}
/**
* When the operations are called on a missing path, FNFE is
* raised and only one attempt is made to retry the operation.
*/
@Test
public void testXAttrMissingFile() throws Throwable {
describe("Test xattr on a missing path");
Path testFile = methodPath();
S3AFileSystem fs = getFileSystem();
int getMetadataOnMissingFile = GET_METADATA_ON_DIR;
verifyMetricsIntercepting(FileNotFoundException.class, "", () ->
fs.getXAttrs(testFile),
with(INVOCATION_XATTR_GET_MAP, getMetadataOnMissingFile));
verifyMetricsIntercepting(FileNotFoundException.class, "", () ->
fs.getXAttr(testFile, XA_CONTENT_LENGTH),
with(INVOCATION_XATTR_GET_NAMED, getMetadataOnMissingFile));
verifyMetricsIntercepting(FileNotFoundException.class, "", () ->
fs.listXAttrs(testFile),
with(INVOCATION_OP_XATTR_LIST, getMetadataOnMissingFile));
}
/**
* Generate an assert on a named header in the map.
* @param xAttrs attribute map
* @param key header key
* @return the assertion
*/
private AbstractStringAssert<?> assertHeaderEntry(
Map<String, byte[]> xAttrs, String key) {
return assertHeader(key, xAttrs.get(key));
}
/**
* Create an assertion on the header; check for the bytes
* being non-null/empty and then returns the decoded values
* as a string assert.
* @param key header key (for error)
* @param bytes value
* @return the assertion
*/
private AbstractStringAssert<?> assertHeader(final String key,
final byte[] bytes) {
String decoded = decodeBytes(bytes);
return Assertions.assertThat(decoded)
.describedAs("xattr %s decoded to: %s", key, decoded)
.isNotNull()
.isNotEmpty();
}
}

View File

@ -0,0 +1,313 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.impl;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import com.amazonaws.services.s3.model.ObjectMetadata;
import org.assertj.core.api.Assertions;
import org.assertj.core.util.Lists;
import org.junit.Before;
import org.junit.Test;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.S3ATestUtils;
import org.apache.hadoop.fs.s3a.test.OperationTrackingStore;
import org.apache.hadoop.test.HadoopTestBase;
import static java.lang.System.currentTimeMillis;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.XA_MAGIC_MARKER;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.X_HEADER_MAGIC_MARKER;
import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.XA_LAST_MODIFIED;
import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.XA_CONTENT_LENGTH;
import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.decodeBytes;
import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.encodeBytes;
import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.extractXAttrLongValue;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
/**
* Unit tests of header processing logic in {@link HeaderProcessing}.
* Builds up a context accessor where the path
* defined in {@link #MAGIC_PATH} exists and returns object metadata.
*
*/
public class TestHeaderProcessing extends HadoopTestBase {
private static final XAttrContextAccessor CONTEXT_ACCESSORS
= new XAttrContextAccessor();
public static final String VALUE = "abcdeFGHIJ123!@##&82;";
public static final long FILE_LENGTH = 1024;
private static final String FINAL_FILE = "s3a://bucket/dest/output.csv";
private StoreContext context;
private HeaderProcessing headerProcessing;
private static final String MAGIC_KEY
= "dest/__magic/job1/ta1/__base/output.csv";
private static final String MAGIC_FILE
= "s3a://bucket/" + MAGIC_KEY;
private static final Path MAGIC_PATH =
new Path(MAGIC_FILE);
public static final long MAGIC_LEN = 4096L;
/**
* All the XAttrs which are built up.
*/
private static final String[] RETRIEVED_XATTRS = {
XA_MAGIC_MARKER,
XA_CONTENT_LENGTH,
XA_LAST_MODIFIED
};
@Before
public void setup() throws Exception {
CONTEXT_ACCESSORS.len = FILE_LENGTH;
CONTEXT_ACCESSORS.userHeaders.put(
X_HEADER_MAGIC_MARKER,
Long.toString(MAGIC_LEN));
context = S3ATestUtils.createMockStoreContext(true,
new OperationTrackingStore(), CONTEXT_ACCESSORS);
headerProcessing = new HeaderProcessing(context);
}
@Test
public void testByteRoundTrip() throws Throwable {
Assertions.assertThat(decodeBytes(encodeBytes(VALUE)))
.describedAs("encoding of " + VALUE)
.isEqualTo(VALUE);
}
@Test
public void testGetMarkerXAttr() throws Throwable {
assertAttributeHasValue(XA_MAGIC_MARKER, MAGIC_LEN);
}
@Test
public void testGetLengthXAttr() throws Throwable {
assertAttributeHasValue(XA_CONTENT_LENGTH, FILE_LENGTH);
}
/**
* Last modified makes it through.
*/
@Test
public void testGetDateXAttr() throws Throwable {
Assertions.assertThat(
decodeBytes(headerProcessing.getXAttr(MAGIC_PATH,
XA_LAST_MODIFIED)))
.describedAs("XAttribute " + XA_LAST_MODIFIED)
.isEqualTo(CONTEXT_ACCESSORS.date.toString());
}
/**
* The API calls on unknown paths raise 404s.
*/
@Test
public void test404() throws Throwable {
intercept(FileNotFoundException.class, () ->
headerProcessing.getXAttr(new Path(FINAL_FILE), XA_MAGIC_MARKER));
}
/**
* This call returns all the attributes which aren't null, including
* all the standard HTTP headers.
*/
@Test
public void testGetAllXAttrs() throws Throwable {
Map<String, byte[]> xAttrs = headerProcessing.getXAttrs(MAGIC_PATH);
Assertions.assertThat(xAttrs.keySet())
.describedAs("Attribute keys")
.contains(RETRIEVED_XATTRS);
}
/**
* This call returns all the attributes which aren't null, including
* all the standard HTTP headers.
*/
@Test
public void testListXAttrKeys() throws Throwable {
List<String> xAttrs = headerProcessing.listXAttrs(MAGIC_PATH);
Assertions.assertThat(xAttrs)
.describedAs("Attribute keys")
.contains(RETRIEVED_XATTRS);
}
/**
* Filtering is on attribute key, not header.
*/
@Test
public void testGetFilteredXAttrs() throws Throwable {
Map<String, byte[]> xAttrs = headerProcessing.getXAttrs(MAGIC_PATH,
Lists.list(XA_MAGIC_MARKER, XA_CONTENT_LENGTH, "unknown"));
Assertions.assertThat(xAttrs.keySet())
.describedAs("Attribute keys")
.containsExactlyInAnyOrder(XA_MAGIC_MARKER, XA_CONTENT_LENGTH);
// and the values are good
assertLongAttributeValue(
XA_MAGIC_MARKER,
xAttrs.get(XA_MAGIC_MARKER),
MAGIC_LEN);
assertLongAttributeValue(
XA_CONTENT_LENGTH,
xAttrs.get(XA_CONTENT_LENGTH),
FILE_LENGTH);
}
/**
* An empty list of keys results in empty results.
*/
@Test
public void testFilterEmptyXAttrs() throws Throwable {
Map<String, byte[]> xAttrs = headerProcessing.getXAttrs(MAGIC_PATH,
Lists.list());
Assertions.assertThat(xAttrs.keySet())
.describedAs("Attribute keys")
.isEmpty();
}
/**
* Add two headers to the metadata, then verify that
* the magic marker header is copied, but not the other header.
*/
@Test
public void testMetadataCopySkipsMagicAttribute() throws Throwable {
final String owner = "x-header-owner";
final String root = "root";
CONTEXT_ACCESSORS.userHeaders.put(owner, root);
final ObjectMetadata source = context.getContextAccessors()
.getObjectMetadata(MAGIC_KEY);
final Map<String, String> sourceUserMD = source.getUserMetadata();
Assertions.assertThat(sourceUserMD.get(owner))
.describedAs("owner header in copied MD")
.isEqualTo(root);
ObjectMetadata dest = new ObjectMetadata();
headerProcessing.cloneObjectMetadata(source, dest);
Assertions.assertThat(dest.getUserMetadata().get(X_HEADER_MAGIC_MARKER))
.describedAs("Magic marker header in copied MD")
.isNull();
Assertions.assertThat(dest.getUserMetadata().get(owner))
.describedAs("owner header in copied MD")
.isEqualTo(root);
}
/**
* Assert that an XAttr has a specific long value.
* @param key attribute key
* @param bytes bytes of the attribute.
* @param expected expected numeric value.
*/
private void assertLongAttributeValue(
final String key,
final byte[] bytes,
final long expected) {
Assertions.assertThat(extractXAttrLongValue(bytes))
.describedAs("XAttribute " + key)
.isNotEmpty()
.hasValue(expected);
}
/**
* Assert that a retrieved XAttr has a specific long value.
* @param key attribute key
* @param expected expected numeric value.
*/
protected void assertAttributeHasValue(final String key,
final long expected)
throws IOException {
assertLongAttributeValue(
key,
headerProcessing.getXAttr(MAGIC_PATH, key),
expected);
}
/**
* Context accessor with XAttrs returned for the {@link #MAGIC_PATH}
* path.
*/
private static final class XAttrContextAccessor
implements ContextAccessors {
private final Map<String, String> userHeaders = new HashMap<>();
private long len;
private Date date = new Date(currentTimeMillis());
@Override
public Path keyToPath(final String key) {
return new Path("s3a://bucket/" + key);
}
@Override
public String pathToKey(final Path path) {
// key is path with leading / stripped.
String key = path.toUri().getPath();
return key.length() > 1 ? key.substring(1) : key;
}
@Override
public File createTempFile(final String prefix, final long size)
throws IOException {
throw new UnsupportedOperationException("unsppported");
}
@Override
public String getBucketLocation() throws IOException {
return null;
}
@Override
public Path makeQualified(final Path path) {
return path;
}
@Override
public ObjectMetadata getObjectMetadata(final String key)
throws IOException {
if (MAGIC_KEY.equals(key)) {
ObjectMetadata omd = new ObjectMetadata();
omd.setUserMetadata(userHeaders);
omd.setContentLength(len);
omd.setLastModified(date);
return omd;
} else {
throw new FileNotFoundException(key);
}
}
public void setHeader(String key, String val) {
userHeaders.put(key, val);
}
}
}

View File

@ -29,6 +29,7 @@ import java.util.stream.Collectors;
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
import com.amazonaws.services.s3.model.MultiObjectDeleteException;
import org.apache.hadoop.thirdparty.com.google.common.collect.Lists;
import com.amazonaws.services.s3.model.ObjectMetadata;
import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Test;
@ -226,7 +227,8 @@ public class TestPartialDeleteFailures {
}
private static class MinimalContextAccessor implements ContextAccessors {
private static final class MinimalContextAccessor
implements ContextAccessors {
@Override
public Path keyToPath(final String key) {
@ -253,6 +255,12 @@ public class TestPartialDeleteFailures {
public Path makeQualified(final Path path) {
return path;
}
@Override
public ObjectMetadata getObjectMetadata(final String key)
throws IOException {
return new ObjectMetadata();
}
}
}