HADOOP-17330. Backport HADOOP-16005 NativeAzureFileSystem does not support setXAttr to branch-3.2. Contributed by Sally Zuo.

This commit is contained in:
Chen Liang 2020-10-28 15:39:18 -07:00
parent d0104e72c5
commit 576f99ba40
7 changed files with 396 additions and 13 deletions

View File

@ -29,6 +29,8 @@ import java.net.URI;
import java.net.URISyntaxException;
import java.net.URLDecoder;
import java.net.URLEncoder;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.security.InvalidKeyException;
import java.util.Calendar;
import java.util.Date;
@ -247,6 +249,8 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
private static final int DEFAULT_CONCURRENT_WRITES = 8;
private static final Charset METADATA_ENCODING = StandardCharsets.UTF_8;
// Concurrent reads reads of data written out of band are disable by default.
//
private static final boolean DEFAULT_READ_TOLERATE_CONCURRENT_APPEND = false;
@ -1662,17 +1666,30 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
removeMetadataAttribute(blob, OLD_IS_FOLDER_METADATA_KEY);
}
private static void storeLinkAttribute(CloudBlobWrapper blob,
String linkTarget) throws UnsupportedEncodingException {
// We have to URL encode the link attribute as the link URI could
private static String encodeMetadataAttribute(String value) throws UnsupportedEncodingException {
// We have to URL encode the attribute as it could
// have URI special characters which unless encoded will result
// in 403 errors from the server. This is due to metadata properties
// being sent in the HTTP header of the request which is in turn used
// on the server side to authorize the request.
String encodedLinkTarget = null;
if (linkTarget != null) {
encodedLinkTarget = URLEncoder.encode(linkTarget, "UTF-8");
}
return value == null ? null : URLEncoder.encode(value, METADATA_ENCODING.name());
}
private static String decodeMetadataAttribute(String encoded) throws UnsupportedEncodingException {
return encoded == null ? null : URLDecoder.decode(encoded, METADATA_ENCODING.name());
}
private static String ensureValidAttributeName(String attribute) {
// Attribute names must be valid C# identifiers so we have to
// convert the namespace dots (e.g. "user.something") in the
// attribute names. Using underscores here to be consistent with
// the constant metadata keys defined earlier in the file
return attribute.replace('.', '_');
}
private static void storeLinkAttribute(CloudBlobWrapper blob,
String linkTarget) throws UnsupportedEncodingException {
String encodedLinkTarget = encodeMetadataAttribute(linkTarget);
storeMetadataAttribute(blob,
LINK_BACK_TO_UPLOAD_IN_PROGRESS_METADATA_KEY,
encodedLinkTarget);
@ -1686,11 +1703,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
String encodedLinkTarget = getMetadataAttribute(blob,
LINK_BACK_TO_UPLOAD_IN_PROGRESS_METADATA_KEY,
OLD_LINK_BACK_TO_UPLOAD_IN_PROGRESS_METADATA_KEY);
String linkTarget = null;
if (encodedLinkTarget != null) {
linkTarget = URLDecoder.decode(encodedLinkTarget, "UTF-8");
}
return linkTarget;
return decodeMetadataAttribute(encodedLinkTarget);
}
private static boolean retrieveFolderAttribute(CloudBlobWrapper blob) {
@ -2211,6 +2224,36 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
}
}
@Override
public byte[] retrieveAttribute(String key, String attribute) throws IOException {
try {
checkContainer(ContainerAccessType.PureRead);
CloudBlobWrapper blob = getBlobReference(key);
blob.downloadAttributes(getInstrumentedContext());
String value = getMetadataAttribute(blob, ensureValidAttributeName(attribute));
value = decodeMetadataAttribute(value);
return value == null ? null : value.getBytes(METADATA_ENCODING);
} catch (Exception e) {
throw new AzureException(e);
}
}
@Override
public void storeAttribute(String key, String attribute, byte[] value) throws IOException {
try {
checkContainer(ContainerAccessType.ReadThenWrite);
CloudBlobWrapper blob = getBlobReference(key);
blob.downloadAttributes(getInstrumentedContext());
String encodedValue = encodeMetadataAttribute(new String(value, METADATA_ENCODING));
storeMetadataAttribute(blob, ensureValidAttributeName(attribute), encodedValue);
blob.uploadMetadata(getInstrumentedContext());
} catch (Exception e) {
throw new AzureException(e);
}
}
@Override
public InputStream retrieve(String key) throws AzureException, IOException {
return retrieve(key, 0);

View File

@ -64,6 +64,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.Syncable;
import org.apache.hadoop.fs.XAttrSetFlag;
import org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation;
import org.apache.hadoop.fs.azure.metrics.AzureFileSystemMetricsSystem;
import org.apache.hadoop.fs.azure.security.Constants;
@ -3563,6 +3564,76 @@ public class NativeAzureFileSystem extends FileSystem {
}
}
/**
* Set the value of an attribute for a path.
*
* @param path The path on which to set the attribute
* @param xAttrName The attribute to set
* @param value The byte value of the attribute to set (encoded in utf-8)
* @param flag The mode in which to set the attribute
* @throws IOException If there was an issue setting the attribute on Azure
*/
@Override
public void setXAttr(Path path, String xAttrName, byte[] value, EnumSet<XAttrSetFlag> flag) throws IOException {
Path absolutePath = makeAbsolute(path);
performAuthCheck(absolutePath, WasbAuthorizationOperations.WRITE, "setXAttr", absolutePath);
String key = pathToKey(absolutePath);
FileMetadata metadata;
try {
metadata = store.retrieveMetadata(key);
} catch (IOException ex) {
Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
if (innerException instanceof StorageException
&& NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
throw new FileNotFoundException("File " + path + " doesn't exists.");
}
throw ex;
}
if (metadata == null) {
throw new FileNotFoundException("File doesn't exist: " + path);
}
boolean xAttrExists = store.retrieveAttribute(key, xAttrName) != null;
XAttrSetFlag.validate(xAttrName, xAttrExists, flag);
store.storeAttribute(key, xAttrName, value);
}
/**
* Get the value of an attribute for a path.
*
* @param path The path on which to get the attribute
* @param xAttrName The attribute to get
* @return The bytes of the attribute's value (encoded in utf-8)
* or null if the attribute does not exist
* @throws IOException If there was an issue getting the attribute from Azure
*/
@Override
public byte[] getXAttr(Path path, String xAttrName) throws IOException {
Path absolutePath = makeAbsolute(path);
performAuthCheck(absolutePath, WasbAuthorizationOperations.READ, "getXAttr", absolutePath);
String key = pathToKey(absolutePath);
FileMetadata metadata;
try {
metadata = store.retrieveMetadata(key);
} catch (IOException ex) {
Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
if (innerException instanceof StorageException
&& NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
throw new FileNotFoundException("File " + path + " doesn't exists.");
}
throw ex;
}
if (metadata == null) {
throw new FileNotFoundException("File doesn't exist: " + path);
}
return store.retrieveAttribute(key, xAttrName);
}
/**
* Is the user allowed?
* <ol>

View File

@ -76,6 +76,10 @@ interface NativeFileSystemStore {
void changePermissionStatus(String key, PermissionStatus newPermission)
throws AzureException;
byte[] retrieveAttribute(String key, String attribute) throws IOException;
void storeAttribute(String key, String attribute, byte[] value) throws IOException;
/**
* API to delete a blob in the back end azure storage.
* @param key - key to the blob being deleted.

View File

@ -26,6 +26,7 @@ import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Hashtable;
import java.util.List;
import java.util.ArrayList;
import java.util.Arrays;
@ -56,6 +57,7 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.fs.XAttrSetFlag;
import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
import org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations;
import org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes;
@ -628,6 +630,83 @@ public class AzureBlobFileSystem extends FileSystem {
}
}
/**
* Set the value of an attribute for a path.
*
* @param path The path on which to set the attribute
* @param name The attribute to set
* @param value The byte value of the attribute to set (encoded in latin-1)
* @param flag The mode in which to set the attribute
* @throws IOException If there was an issue setting the attribute on Azure
* @throws IllegalArgumentException If name is null or empty or if value is null
*/
@Override
public void setXAttr(final Path path, final String name, final byte[] value, final EnumSet<XAttrSetFlag> flag)
throws IOException {
LOG.debug("AzureBlobFileSystem.setXAttr path: {}", path);
if (name == null || name.isEmpty() || value == null) {
throw new IllegalArgumentException("A valid name and value must be specified.");
}
Path qualifiedPath = makeQualified(path);
performAbfsAuthCheck(FsAction.READ_WRITE, qualifiedPath);
try {
Hashtable<String, String> properties = abfsStore.getPathStatus(path);
String xAttrName = ensureValidAttributeName(name);
boolean xAttrExists = properties.containsKey(xAttrName);
XAttrSetFlag.validate(name, xAttrExists, flag);
String xAttrValue = abfsStore.decodeAttribute(value);
properties.put(xAttrName, xAttrValue);
abfsStore.setPathProperties(path, properties);
} catch (AzureBlobFileSystemException ex) {
checkException(path, ex);
}
}
/**
* Get the value of an attribute for a path.
*
* @param path The path on which to get the attribute
* @param name The attribute to get
* @return The bytes of the attribute's value (encoded in latin-1)
* or null if the attribute does not exist
* @throws IOException If there was an issue getting the attribute from Azure
* @throws IllegalArgumentException If name is null or empty
*/
@Override
public byte[] getXAttr(final Path path, final String name)
throws IOException {
LOG.debug("AzureBlobFileSystem.getXAttr path: {}", path);
if (name == null || name.isEmpty()) {
throw new IllegalArgumentException("A valid name must be specified.");
}
Path qualifiedPath = makeQualified(path);
performAbfsAuthCheck(FsAction.READ, qualifiedPath);
byte[] value = null;
try {
Hashtable<String, String> properties = abfsStore.getPathStatus(path);
String xAttrName = ensureValidAttributeName(name);
if (properties.containsKey(xAttrName)) {
String xAttrValue = properties.get(xAttrName);
value = abfsStore.encodeAttribute(xAttrValue);
}
} catch (AzureBlobFileSystemException ex) {
checkException(path, ex);
}
return value;
}
private static String ensureValidAttributeName(String attribute) {
// to avoid HTTP 400 Bad Request, InvalidPropertyName
return attribute.replace('.', '_');
}
/**
* Set permission of a path.
*

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.fs.azurebfs;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.io.UnsupportedEncodingException;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URI;
@ -145,8 +146,12 @@ public class AzureBlobFileSystemStore {
} catch (IllegalAccessException exception) {
throw new FileSystemOperationUnhandledException(exception);
}
LOG.trace("AbfsConfiguration init complete");
this.userGroupInformation = UserGroupInformation.getCurrentUser();
this.userName = userGroupInformation.getShortUserName();
LOG.trace("UGI init complete");
if (!abfsConfiguration.getSkipUserGroupMetadataDuringInitialization()) {
try {
this.primaryUserGroup = userGroupInformation.getPrimaryGroupName();
@ -158,6 +163,7 @@ public class AzureBlobFileSystemStore {
//Provide a default group name
this.primaryUserGroup = userName;
}
LOG.trace("primaryUserGroup is {}", this.primaryUserGroup);
this.azureAtomicRenameDirSet = new HashSet<>(Arrays.asList(
abfsConfiguration.getAzureAtomicRenameDirs().split(AbfsHttpConstants.COMMA)));
@ -167,6 +173,7 @@ public class AzureBlobFileSystemStore {
this.abfsPerfTracker = new AbfsPerfTracker(fileSystemName, accountName, this.abfsConfiguration);
initializeClient(uri, fileSystemName, accountName, useHttps);
this.identityTransformer = new IdentityTransformer(abfsConfiguration.getRawConfiguration());
LOG.trace("IdentityTransformer init complete");
}
/**
@ -183,6 +190,14 @@ public class AzureBlobFileSystemStore {
return this.primaryUserGroup;
}
byte[] encodeAttribute(String value) throws UnsupportedEncodingException {
return value.getBytes(XMS_PROPERTIES_ENCODING);
}
String decodeAttribute(byte[] value) throws UnsupportedEncodingException {
return new String(value, XMS_PROPERTIES_ENCODING);
}
private String[] authorityParts(URI uri) throws InvalidUriAuthorityException, InvalidUriException {
final String authority = uri.getRawAuthority();
if (null == authority) {
@ -288,6 +303,7 @@ public class AzureBlobFileSystemStore {
public void setFilesystemProperties(final Hashtable<String, String> properties)
throws AzureBlobFileSystemException {
if (properties == null || properties.isEmpty()) {
LOG.trace("setFilesystemProperties no properties present");
return;
}
@ -1107,6 +1123,7 @@ public class AzureBlobFileSystemStore {
AccessTokenProvider tokenProvider = null;
if (abfsConfiguration.getAuthType(accountName) == AuthType.SharedKey) {
LOG.trace("Fetching SharedKey credentials");
int dotIndex = accountName.indexOf(AbfsHttpConstants.DOT);
if (dotIndex <= 0) {
throw new InvalidUriException(
@ -1115,12 +1132,15 @@ public class AzureBlobFileSystemStore {
creds = new SharedKeyCredentials(accountName.substring(0, dotIndex),
abfsConfiguration.getStorageAccountKey());
} else {
LOG.trace("Fetching token provider");
tokenProvider = abfsConfiguration.getTokenProvider();
}
this.client = new AbfsClient(baseUrl, creds, abfsConfiguration,
LOG.trace("Initializing AbfsClient for {}", baseUrl);
this.client = new AbfsClient(baseUrl, creds, abfsConfiguration,
new ExponentialRetryPolicy(abfsConfiguration.getMaxIoRetries()),
tokenProvider, abfsPerfTracker);
LOG.trace("AbfsClient init complete");
}
private String getOctalNotation(FsPermission fsPermission) {

View File

@ -22,10 +22,12 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Date;
import java.util.EnumSet;
import java.util.TimeZone;
import org.apache.commons.logging.Log;
@ -37,6 +39,7 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.XAttrSetFlag;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.UserGroupInformation;
@ -51,6 +54,7 @@ import static org.apache.hadoop.fs.azure.integration.AzureTestUtils.readStringFr
import static org.apache.hadoop.fs.azure.integration.AzureTestUtils.writeStringToFile;
import static org.apache.hadoop.fs.azure.integration.AzureTestUtils.writeStringToStream;
import static org.apache.hadoop.test.GenericTestUtils.*;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
/*
* Tests the Native Azure file system (WASB) against an actual blob store if
@ -65,6 +69,10 @@ public abstract class NativeAzureFileSystemBaseTest
private final long modifiedTimeErrorMargin = 5 * 1000; // Give it +/-5 seconds
private static final short READ_WRITE_PERMISSIONS = 644;
private static final EnumSet<XAttrSetFlag> CREATE_FLAG = EnumSet.of(XAttrSetFlag.CREATE);
private static final EnumSet<XAttrSetFlag> REPLACE_FLAG = EnumSet.of(XAttrSetFlag.REPLACE);
public static final Log LOG = LogFactory.getLog(NativeAzureFileSystemBaseTest.class);
protected NativeAzureFileSystem fs;
@ -117,6 +125,60 @@ public abstract class NativeAzureFileSystemBaseTest
fs.delete(testFile, true);
}
@Test
public void testSetGetXAttr() throws Exception {
byte[] attributeValue1 = "hi".getBytes(StandardCharsets.UTF_8);
byte[] attributeValue2 = "你好".getBytes(StandardCharsets.UTF_8);
String attributeName1 = "user.asciiAttribute";
String attributeName2 = "user.unicodeAttribute";
Path testFile = methodPath();
// after creating a file, the xAttr should not be present
createEmptyFile(testFile, FsPermission.createImmutable(READ_WRITE_PERMISSIONS));
assertNull(fs.getXAttr(testFile, attributeName1));
// after setting the xAttr on the file, the value should be retrievable
fs.setXAttr(testFile, attributeName1, attributeValue1);
assertArrayEquals(attributeValue1, fs.getXAttr(testFile, attributeName1));
// after setting a second xAttr on the file, the first xAttr values should not be overwritten
fs.setXAttr(testFile, attributeName2, attributeValue2);
assertArrayEquals(attributeValue1, fs.getXAttr(testFile, attributeName1));
assertArrayEquals(attributeValue2, fs.getXAttr(testFile, attributeName2));
}
@Test
public void testSetGetXAttrCreateReplace() throws Exception {
byte[] attributeValue = "one".getBytes(StandardCharsets.UTF_8);
String attributeName = "user.someAttribute";
Path testFile = methodPath();
// after creating a file, it must be possible to create a new xAttr
createEmptyFile(testFile, FsPermission.createImmutable(READ_WRITE_PERMISSIONS));
fs.setXAttr(testFile, attributeName, attributeValue, CREATE_FLAG);
assertArrayEquals(attributeValue, fs.getXAttr(testFile, attributeName));
// however after the xAttr is created, creating it again must fail
intercept(IOException.class, () -> fs.setXAttr(testFile, attributeName, attributeValue, CREATE_FLAG));
}
@Test
public void testSetGetXAttrReplace() throws Exception {
byte[] attributeValue1 = "one".getBytes(StandardCharsets.UTF_8);
byte[] attributeValue2 = "two".getBytes(StandardCharsets.UTF_8);
String attributeName = "user.someAttribute";
Path testFile = methodPath();
// after creating a file, it must not be possible to replace an xAttr
createEmptyFile(testFile, FsPermission.createImmutable(READ_WRITE_PERMISSIONS));
intercept(IOException.class, () -> fs.setXAttr(testFile, attributeName, attributeValue1, REPLACE_FLAG));
// however after the xAttr is created, replacing it must succeed
fs.setXAttr(testFile, attributeName, attributeValue1, CREATE_FLAG);
fs.setXAttr(testFile, attributeName, attributeValue2, REPLACE_FLAG);
assertArrayEquals(attributeValue2, fs.getXAttr(testFile, attributeName));
}
@Test
public void testStoreDeleteFolder() throws Exception {
Path testFolder = methodPath();

View File

@ -0,0 +1,104 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs;
import java.io.IOException;
import java.util.EnumSet;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.XAttrSetFlag;
import org.junit.Assume;
import org.junit.Test;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
/**
* Test attribute operations.
*/
public class ITestAzureBlobFileSystemAttributes extends AbstractAbfsIntegrationTest {
private static final EnumSet<XAttrSetFlag> CREATE_FLAG = EnumSet.of(XAttrSetFlag.CREATE);
private static final EnumSet<XAttrSetFlag> REPLACE_FLAG = EnumSet.of(XAttrSetFlag.REPLACE);
public ITestAzureBlobFileSystemAttributes() throws Exception {
super();
}
@Test
public void testSetGetXAttr() throws Exception {
AzureBlobFileSystem fs = getFileSystem();
Assume.assumeTrue(fs.getIsNamespaceEnabled());
byte[] attributeValue1 = fs.getAbfsStore().encodeAttribute("hi");
byte[] attributeValue2 = fs.getAbfsStore().encodeAttribute("你好");
String attributeName1 = "user.asciiAttribute";
String attributeName2 = "user.unicodeAttribute";
Path testFile = path("setGetXAttr");
// after creating a file, the xAttr should not be present
touch(testFile);
assertNull(fs.getXAttr(testFile, attributeName1));
// after setting the xAttr on the file, the value should be retrievable
fs.setXAttr(testFile, attributeName1, attributeValue1);
assertArrayEquals(attributeValue1, fs.getXAttr(testFile, attributeName1));
// after setting a second xAttr on the file, the first xAttr values should not be overwritten
fs.setXAttr(testFile, attributeName2, attributeValue2);
assertArrayEquals(attributeValue1, fs.getXAttr(testFile, attributeName1));
assertArrayEquals(attributeValue2, fs.getXAttr(testFile, attributeName2));
}
@Test
public void testSetGetXAttrCreateReplace() throws Exception {
AzureBlobFileSystem fs = getFileSystem();
Assume.assumeTrue(fs.getIsNamespaceEnabled());
byte[] attributeValue = fs.getAbfsStore().encodeAttribute("one");
String attributeName = "user.someAttribute";
Path testFile = path("createReplaceXAttr");
// after creating a file, it must be possible to create a new xAttr
touch(testFile);
fs.setXAttr(testFile, attributeName, attributeValue, CREATE_FLAG);
assertArrayEquals(attributeValue, fs.getXAttr(testFile, attributeName));
// however after the xAttr is created, creating it again must fail
intercept(IOException.class, () -> fs.setXAttr(testFile, attributeName, attributeValue, CREATE_FLAG));
}
@Test
public void testSetGetXAttrReplace() throws Exception {
AzureBlobFileSystem fs = getFileSystem();
Assume.assumeTrue(fs.getIsNamespaceEnabled());
byte[] attributeValue1 = fs.getAbfsStore().encodeAttribute("one");
byte[] attributeValue2 = fs.getAbfsStore().encodeAttribute("two");
String attributeName = "user.someAttribute";
Path testFile = path("replaceXAttr");
// after creating a file, it must not be possible to replace an xAttr
intercept(IOException.class, () -> {
touch(testFile);
fs.setXAttr(testFile, attributeName, attributeValue1, REPLACE_FLAG);
});
// however after the xAttr is created, replacing it must succeed
fs.setXAttr(testFile, attributeName, attributeValue1, CREATE_FLAG);
fs.setXAttr(testFile, attributeName, attributeValue2, REPLACE_FLAG);
assertArrayEquals(attributeValue2, fs.getXAttr(testFile, attributeName));
}
}