HADOOP-16269. ABFS: add listFileStatus with StartFrom.
Author: Da Zhou
This commit is contained in:
parent
9b0aace1e6
commit
3418bbbb59
|
@ -32,6 +32,7 @@ import java.nio.charset.CharacterCodingException;
|
|||
import java.nio.charset.Charset;
|
||||
import java.nio.charset.CharsetDecoder;
|
||||
import java.nio.charset.CharsetEncoder;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.text.ParseException;
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.util.ArrayList;
|
||||
|
@ -47,6 +48,7 @@ import java.util.Set;
|
|||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Strings;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
|
@ -81,6 +83,7 @@ import org.apache.hadoop.fs.azurebfs.services.AuthType;
|
|||
import org.apache.hadoop.fs.azurebfs.services.ExponentialRetryPolicy;
|
||||
import org.apache.hadoop.fs.azurebfs.services.SharedKeyCredentials;
|
||||
import org.apache.hadoop.fs.azurebfs.utils.Base64;
|
||||
import org.apache.hadoop.fs.azurebfs.utils.CRC64;
|
||||
import org.apache.hadoop.fs.azurebfs.utils.UriUtils;
|
||||
import org.apache.hadoop.fs.permission.AclEntry;
|
||||
import org.apache.hadoop.fs.permission.AclStatus;
|
||||
|
@ -92,7 +95,17 @@ import org.apache.http.client.utils.URIBuilder;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_EQUALS;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_FORWARD_SLASH;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_HYPHEN;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_PLUS;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_STAR;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_UNDERSCORE;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ROOT_PATH;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.SINGLE_WHITE_SPACE;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.TOKEN_VERSION;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_ABFS_ENDPOINT;
|
||||
|
||||
/**
|
||||
* Provides the bridging logic between Hadoop's abstract filesystem and Azure Storage.
|
||||
*/
|
||||
|
@ -106,6 +119,7 @@ public class AzureBlobFileSystemStore implements Closeable {
|
|||
private String userName;
|
||||
private String primaryUserGroup;
|
||||
private static final String DATE_TIME_PATTERN = "E, dd MMM yyyy HH:mm:ss 'GMT'";
|
||||
private static final String TOKEN_DATE_PATTERN = "yyyy-MM-dd'T'HH:mm:ss.SSSSSSS'Z'";
|
||||
private static final String XMS_PROPERTIES_ENCODING = "ISO-8859-1";
|
||||
private static final int LIST_MAX_RESULTS = 500;
|
||||
|
||||
|
@ -522,15 +536,43 @@ public class AzureBlobFileSystemStore implements Closeable {
|
|||
eTag);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param path The list path.
|
||||
* @return the entries in the path.
|
||||
* */
|
||||
public FileStatus[] listStatus(final Path path) throws IOException {
|
||||
LOG.debug("listStatus filesystem: {} path: {}",
|
||||
return listStatus(path, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param path Path the list path.
|
||||
* @param startFrom the entry name that list results should start with.
|
||||
* For example, if folder "/folder" contains four files: "afile", "bfile", "hfile", "ifile".
|
||||
* Then listStatus(Path("/folder"), "hfile") will return "/folder/hfile" and "folder/ifile"
|
||||
* Notice that if startFrom is a non-existent entry name, then the list response contains
|
||||
* all entries after this non-existent entry in lexical order:
|
||||
* listStatus(Path("/folder"), "cfile") will return "/folder/hfile" and "/folder/ifile".
|
||||
*
|
||||
* @return the entries in the path start from "startFrom" in lexical order.
|
||||
* */
|
||||
@InterfaceStability.Unstable
|
||||
public FileStatus[] listStatus(final Path path, final String startFrom) throws IOException {
|
||||
LOG.debug("listStatus filesystem: {} path: {}, startFrom: {}",
|
||||
client.getFileSystem(),
|
||||
path);
|
||||
path,
|
||||
startFrom);
|
||||
|
||||
String relativePath = path.isRoot() ? AbfsHttpConstants.EMPTY_STRING : getRelativePath(path);
|
||||
final String relativePath = path.isRoot() ? AbfsHttpConstants.EMPTY_STRING : getRelativePath(path);
|
||||
String continuation = null;
|
||||
ArrayList<FileStatus> fileStatuses = new ArrayList<>();
|
||||
|
||||
// generate continuation token if a valid startFrom is provided.
|
||||
if (startFrom != null && !startFrom.isEmpty()) {
|
||||
continuation = getIsNamespaceEnabled()
|
||||
? generateContinuationTokenForXns(startFrom)
|
||||
: generateContinuationTokenForNonXns(path.isRoot() ? ROOT_PATH : relativePath, startFrom);
|
||||
}
|
||||
|
||||
ArrayList<FileStatus> fileStatuses = new ArrayList<>();
|
||||
do {
|
||||
AbfsRestOperation op = client.listPath(relativePath, false, LIST_MAX_RESULTS, continuation);
|
||||
continuation = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_CONTINUATION);
|
||||
|
@ -583,6 +625,61 @@ public class AzureBlobFileSystemStore implements Closeable {
|
|||
return fileStatuses.toArray(new FileStatus[fileStatuses.size()]);
|
||||
}
|
||||
|
||||
// generate continuation token for xns account
|
||||
private String generateContinuationTokenForXns(final String firstEntryName) {
|
||||
Preconditions.checkArgument(!Strings.isNullOrEmpty(firstEntryName)
|
||||
&& !firstEntryName.startsWith(AbfsHttpConstants.ROOT_PATH),
|
||||
"startFrom must be a dir/file name and it can not be a full path");
|
||||
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append(firstEntryName).append("#$").append("0");
|
||||
|
||||
CRC64 crc64 = new CRC64();
|
||||
StringBuilder token = new StringBuilder();
|
||||
token.append(crc64.compute(sb.toString().getBytes(StandardCharsets.UTF_8)))
|
||||
.append(SINGLE_WHITE_SPACE)
|
||||
.append("0")
|
||||
.append(SINGLE_WHITE_SPACE)
|
||||
.append(firstEntryName);
|
||||
|
||||
return Base64.encode(token.toString().getBytes(StandardCharsets.UTF_8));
|
||||
}
|
||||
|
||||
// generate continuation token for non-xns account
|
||||
private String generateContinuationTokenForNonXns(final String path, final String firstEntryName) {
|
||||
Preconditions.checkArgument(!Strings.isNullOrEmpty(firstEntryName)
|
||||
&& !firstEntryName.startsWith(AbfsHttpConstants.ROOT_PATH),
|
||||
"startFrom must be a dir/file name and it can not be a full path");
|
||||
|
||||
// Notice: non-xns continuation token requires full path (first "/" is not included) for startFrom
|
||||
final String startFrom = (path.isEmpty() || path.equals(ROOT_PATH))
|
||||
? firstEntryName
|
||||
: path + ROOT_PATH + firstEntryName;
|
||||
|
||||
SimpleDateFormat simpleDateFormat = new SimpleDateFormat(TOKEN_DATE_PATTERN, Locale.US);
|
||||
String date = simpleDateFormat.format(new Date());
|
||||
String token = String.format("%06d!%s!%06d!%s!%06d!%s!",
|
||||
path.length(), path, startFrom.length(), startFrom, date.length(), date);
|
||||
String base64EncodedToken = Base64.encode(token.getBytes(StandardCharsets.UTF_8));
|
||||
|
||||
StringBuilder encodedTokenBuilder = new StringBuilder(base64EncodedToken.length() + 5);
|
||||
encodedTokenBuilder.append(String.format("%s!%d!", TOKEN_VERSION, base64EncodedToken.length()));
|
||||
|
||||
for (int i = 0; i < base64EncodedToken.length(); i++) {
|
||||
char current = base64EncodedToken.charAt(i);
|
||||
if (CHAR_FORWARD_SLASH == current) {
|
||||
current = CHAR_UNDERSCORE;
|
||||
} else if (CHAR_PLUS == current) {
|
||||
current = CHAR_STAR;
|
||||
} else if (CHAR_EQUALS == current) {
|
||||
current = CHAR_HYPHEN;
|
||||
}
|
||||
encodedTokenBuilder.append(current);
|
||||
}
|
||||
|
||||
return encodedTokenBuilder.toString();
|
||||
}
|
||||
|
||||
public void setOwner(final Path path, final String owner, final String group) throws
|
||||
AzureBlobFileSystemException {
|
||||
if (!getIsNamespaceEnabled()) {
|
||||
|
@ -1002,7 +1099,7 @@ public class AzureBlobFileSystemStore implements Closeable {
|
|||
|
||||
FileStatus other = (FileStatus) obj;
|
||||
|
||||
if (!other.equals(this)) {// compare the path
|
||||
if (!this.getPath().equals(other.getPath())) {// compare the path
|
||||
return false;
|
||||
}
|
||||
|
||||
|
|
|
@ -39,6 +39,7 @@ public final class AbfsHttpConstants {
|
|||
public static final String GET_ACCESS_CONTROL = "getAccessControl";
|
||||
public static final String GET_STATUS = "getStatus";
|
||||
public static final String DEFAULT_TIMEOUT = "90";
|
||||
public static final String TOKEN_VERSION = "2";
|
||||
|
||||
public static final String JAVA_VERSION = "java.version";
|
||||
public static final String OS_NAME = "os.name";
|
||||
|
@ -91,5 +92,13 @@ public final class AbfsHttpConstants {
|
|||
public static final String PERMISSION_FORMAT = "%04d";
|
||||
public static final String SUPER_USER = "$superuser";
|
||||
|
||||
public static final char CHAR_FORWARD_SLASH = '/';
|
||||
public static final char CHAR_EXCLAMATION_POINT = '!';
|
||||
public static final char CHAR_UNDERSCORE = '_';
|
||||
public static final char CHAR_HYPHEN = '-';
|
||||
public static final char CHAR_EQUALS = '=';
|
||||
public static final char CHAR_STAR = '*';
|
||||
public static final char CHAR_PLUS = '+';
|
||||
|
||||
private AbfsHttpConstants() {}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,60 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.fs.azurebfs.utils;
|
||||
|
||||
/**
|
||||
* CRC64 implementation for AzureBlobFileSystem.
|
||||
*/
|
||||
public class CRC64 {
|
||||
|
||||
private static final long POLY = 0x9a6c9329ac4bc9b5L;
|
||||
private static final int TABLE_LENGTH = 256;
|
||||
private static final long[] TABLE = new long[TABLE_LENGTH];
|
||||
|
||||
private long value = -1;
|
||||
|
||||
/**
|
||||
* @param input byte arrays.
|
||||
* @return long value of the CRC-64 checksum of the data.
|
||||
* */
|
||||
public long compute(byte[] input) {
|
||||
init();
|
||||
for (int i = 0; i < input.length; i++) {
|
||||
value = TABLE[(input[i] ^ (int) value) & 0xFF] ^ (value >>> 8);
|
||||
}
|
||||
return ~value;
|
||||
}
|
||||
|
||||
/*
|
||||
* Initialize a table constructed from POLY (0x9a6c9329ac4bc9b5L).
|
||||
* */
|
||||
private void init() {
|
||||
value = -1;
|
||||
for (int n = 0; n < TABLE_LENGTH; ++n) {
|
||||
long crc = n;
|
||||
for (int i = 0; i < 8; ++i) {
|
||||
if ((crc & 1) == 1) {
|
||||
crc = (crc >>> 1) ^ POLY;
|
||||
} else {
|
||||
crc >>>= 1;
|
||||
}
|
||||
}
|
||||
TABLE[n] = crc;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -24,7 +24,6 @@ import java.util.Hashtable;
|
|||
import java.util.UUID;
|
||||
import java.util.concurrent.Callable;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -211,9 +210,9 @@ public abstract class AbstractAbfsIntegrationTest extends
|
|||
* @throws IOException failure during create/init.
|
||||
*/
|
||||
public AzureBlobFileSystem createFileSystem() throws IOException {
|
||||
Preconditions.checkState(abfs == null,
|
||||
"existing ABFS instance exists: %s", abfs);
|
||||
if (abfs == null) {
|
||||
abfs = (AzureBlobFileSystem) FileSystem.newInstance(rawConfig);
|
||||
}
|
||||
return abfs;
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,151 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.fs.azurebfs;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
|
||||
import org.apache.hadoop.fs.contract.ContractTestUtils;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
||||
/**
|
||||
* Test AzureBlobFileSystemStore listStatus with startFrom.
|
||||
* */
|
||||
@RunWith(Parameterized.class)
|
||||
public class ITestAzureBlobFileSystemStoreListStatusWithRange extends
|
||||
AbstractAbfsIntegrationTest {
|
||||
private static final boolean SUCCEED = true;
|
||||
private static final boolean FAIL = false;
|
||||
private static final String[] SORTED_ENTRY_NAMES = {"1_folder", "A0", "D01", "a+", "c0", "name5"};
|
||||
|
||||
private AzureBlobFileSystemStore store;
|
||||
private AzureBlobFileSystem fs;
|
||||
|
||||
@Parameterized.Parameter
|
||||
public String path;
|
||||
|
||||
/**
|
||||
* A valid startFrom for listFileStatus with range is a non-fully qualified dir/file name
|
||||
* */
|
||||
@Parameterized.Parameter(1)
|
||||
public String startFrom;
|
||||
|
||||
@Parameterized.Parameter(2)
|
||||
public int expectedStartIndexInArray;
|
||||
|
||||
@Parameterized.Parameter(3)
|
||||
public boolean expectedResult;
|
||||
|
||||
@Parameterized.Parameters(name = "Testing path \"{0}\", startFrom: \"{1}\", Expecting result : {3}") // Test path
|
||||
public static Iterable<Object[]> params() {
|
||||
return Arrays.asList(
|
||||
new Object[][]{
|
||||
// case 0: list in root, without range
|
||||
{"/", null, 0, SUCCEED},
|
||||
|
||||
// case 1: list in the root, start from the second file
|
||||
{"/", SORTED_ENTRY_NAMES[1], 1, SUCCEED},
|
||||
|
||||
// case 2: list in the root, invalid startFrom
|
||||
{"/", "/", -1, FAIL},
|
||||
|
||||
// case 3: list in non-root level, valid startFrom : dir name
|
||||
{"/" + SORTED_ENTRY_NAMES[2], SORTED_ENTRY_NAMES[1], 1, SUCCEED},
|
||||
|
||||
// case 4: list in non-root level, valid startFrom : file name
|
||||
{"/" + SORTED_ENTRY_NAMES[2], SORTED_ENTRY_NAMES[2], 2, SUCCEED},
|
||||
|
||||
// case 5: list in non root level, invalid startFrom
|
||||
{"/" + SORTED_ENTRY_NAMES[2], "/" + SORTED_ENTRY_NAMES[3], -1, FAIL},
|
||||
|
||||
// case 6: list using non existent startFrom, startFrom is smaller than the entries in lexical order
|
||||
// expecting return all entries
|
||||
{"/" + SORTED_ENTRY_NAMES[2], "0-non-existent", 0, SUCCEED},
|
||||
|
||||
// case 7: list using non existent startFrom, startFrom is larger than the entries in lexical order
|
||||
// expecting return 0 entries
|
||||
{"/" + SORTED_ENTRY_NAMES[2], "z-non-existent", -1, SUCCEED},
|
||||
|
||||
// case 8: list using non existent startFrom, startFrom is in the range
|
||||
{"/" + SORTED_ENTRY_NAMES[2], "A1", 2, SUCCEED}
|
||||
});
|
||||
}
|
||||
|
||||
public ITestAzureBlobFileSystemStoreListStatusWithRange() throws Exception {
|
||||
super();
|
||||
if (this.getFileSystem() == null) {
|
||||
super.createFileSystem();
|
||||
}
|
||||
fs = this.getFileSystem();
|
||||
store = fs.getAbfsStore();
|
||||
prepareTestFiles();
|
||||
// Sort the names for verification, ABFS service should return the results in order.
|
||||
Arrays.sort(SORTED_ENTRY_NAMES);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testListWithRange() throws IOException {
|
||||
try {
|
||||
FileStatus[] listResult = store.listStatus(new Path(path), startFrom);
|
||||
if (!expectedResult) {
|
||||
Assert.fail("Excepting failure with IllegalArgumentException");
|
||||
}
|
||||
verifyFileStatus(listResult, new Path(path), expectedStartIndexInArray);
|
||||
} catch (IllegalArgumentException ex) {
|
||||
if (expectedResult) {
|
||||
Assert.fail("Excepting success");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// compare the file status
|
||||
private void verifyFileStatus(FileStatus[] listResult, Path parentPath, int startIndexInSortedName) throws IOException {
|
||||
if (startIndexInSortedName == -1) {
|
||||
Assert.assertEquals("Expected empty FileStatus array", 0, listResult.length);
|
||||
return;
|
||||
}
|
||||
|
||||
FileStatus[] allFileStatuses = fs.listStatus(parentPath);
|
||||
Assert.assertEquals("number of dir/file doesn't match",
|
||||
SORTED_ENTRY_NAMES.length, allFileStatuses.length);
|
||||
int indexInResult = 0;
|
||||
for (int index = startIndexInSortedName; index < SORTED_ENTRY_NAMES.length; index++) {
|
||||
Assert.assertEquals("fileStatus doesn't match", allFileStatuses[index], listResult[indexInResult++]);
|
||||
}
|
||||
}
|
||||
|
||||
private void prepareTestFiles() throws IOException {
|
||||
final AzureBlobFileSystem fs = getFileSystem();
|
||||
// created 2 level file structures
|
||||
for (String levelOneFolder : SORTED_ENTRY_NAMES) {
|
||||
Path levelOnePath = new Path("/" + levelOneFolder);
|
||||
Assert.assertTrue(fs.mkdirs(levelOnePath));
|
||||
for (String fileName : SORTED_ENTRY_NAMES) {
|
||||
Path filePath = new Path(levelOnePath, fileName);
|
||||
ContractTestUtils.touch(fs, filePath);
|
||||
ContractTestUtils.assertIsFile(fs, filePath);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,38 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.fs.azurebfs;
|
||||
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import org.apache.hadoop.fs.azurebfs.utils.CRC64;
|
||||
/**
|
||||
* Test for Crc64 in AzureBlobFileSystem, notice that ABFS CRC64 has its own polynomial.
|
||||
* */
|
||||
public class TestAbfsCrc64 {
|
||||
|
||||
@Test
|
||||
public void tesCrc64Compute() {
|
||||
CRC64 crc64 = new CRC64();
|
||||
final String[] testStr = {"#$", "dir_2_ac83abee", "dir_42_976df1f5"};
|
||||
final String[] expected = {"f91f7e6a837dbfa8", "203f9fefc38ae97b", "cc0d56eafe58a855"};
|
||||
for (int i = 0; i < testStr.length; i++) {
|
||||
Assert.assertEquals(expected[i], Long.toHexString(crc64.compute(testStr[i].getBytes())));
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue