HADOOP-17765. ABFS: Use Unique File Paths in Tests. (#3153)
Contributed by Sumangala Patki Change-Id: Ic8f34bf578069504f7a811a7729982b9c9f49729
This commit is contained in:
parent
74f5f90615
commit
5e109705ef
|
@ -26,11 +26,11 @@ import java.util.UUID;
|
|||
import java.util.concurrent.Callable;
|
||||
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
|
@ -84,6 +84,7 @@ public abstract class AbstractAbfsIntegrationTest extends
|
|||
private AuthType authType;
|
||||
private boolean useConfiguredFileSystem = false;
|
||||
private boolean usingFilesystemForSASTests = false;
|
||||
private static final int SHORTENED_GUID_LEN = 12;
|
||||
|
||||
protected AbstractAbfsIntegrationTest() throws Exception {
|
||||
fileSystemName = TEST_CONTAINER_PREFIX + UUID.randomUUID().toString();
|
||||
|
@ -270,7 +271,8 @@ public abstract class AbstractAbfsIntegrationTest extends
|
|||
// so first create temporary instance of the filesystem using SharedKey
|
||||
// then re-use the filesystem it creates with SAS auth instead of SharedKey.
|
||||
AzureBlobFileSystem tempFs = (AzureBlobFileSystem) FileSystem.newInstance(rawConfig);
|
||||
Assert.assertTrue(tempFs.exists(new Path("/")));
|
||||
ContractTestUtils.assertPathExists(tempFs, "This path should exist",
|
||||
new Path("/"));
|
||||
abfsConfig.set(FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME, AuthType.SAS.name());
|
||||
usingFilesystemForSASTests = true;
|
||||
}
|
||||
|
@ -440,7 +442,20 @@ public abstract class AbstractAbfsIntegrationTest extends
|
|||
*/
|
||||
protected Path path(String filepath) throws IOException {
|
||||
return getFileSystem().makeQualified(
|
||||
new Path(getTestPath(), filepath));
|
||||
new Path(getTestPath(), getUniquePath(filepath)));
|
||||
}
|
||||
|
||||
/**
|
||||
* Generate a unique path using the given filepath.
|
||||
* @param filepath path string
|
||||
* @return unique path created from filepath and a GUID
|
||||
*/
|
||||
protected Path getUniquePath(String filepath) {
|
||||
if (filepath.equals("/")) {
|
||||
return new Path(filepath);
|
||||
}
|
||||
return new Path(filepath + StringUtils
|
||||
.right(UUID.randomUUID().toString(), SHORTENED_GUID_LEN));
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -93,7 +93,7 @@ public final class ITestAbfsClient extends AbstractAbfsIntegrationTest {
|
|||
public void testListPathWithValidListMaxResultsValues()
|
||||
throws IOException, ExecutionException, InterruptedException {
|
||||
final int fileCount = 10;
|
||||
final String directory = "testWithValidListMaxResultsValues";
|
||||
final Path directory = getUniquePath("testWithValidListMaxResultsValues");
|
||||
createDirectoryWithNFiles(directory, fileCount);
|
||||
final int[] testData = {fileCount + 100, fileCount + 1, fileCount,
|
||||
fileCount - 1, 1};
|
||||
|
@ -102,7 +102,7 @@ public final class ITestAbfsClient extends AbstractAbfsIntegrationTest {
|
|||
setListMaxResults(listMaxResults);
|
||||
int expectedListResultsSize =
|
||||
listMaxResults > fileCount ? fileCount : listMaxResults;
|
||||
Assertions.assertThat(listPath(directory)).describedAs(
|
||||
Assertions.assertThat(listPath(directory.toString())).describedAs(
|
||||
"AbfsClient.listPath result should contain %d items when "
|
||||
+ "listMaxResults is %d and directory contains %d items",
|
||||
expectedListResultsSize, listMaxResults, fileCount)
|
||||
|
@ -114,9 +114,10 @@ public final class ITestAbfsClient extends AbstractAbfsIntegrationTest {
|
|||
public void testListPathWithValueGreaterThanServerMaximum()
|
||||
throws IOException, ExecutionException, InterruptedException {
|
||||
setListMaxResults(LIST_MAX_RESULTS_SERVER + 100);
|
||||
final String directory = "testWithValueGreaterThanServerMaximum";
|
||||
final Path directory = getUniquePath(
|
||||
"testWithValueGreaterThanServerMaximum");
|
||||
createDirectoryWithNFiles(directory, LIST_MAX_RESULTS_SERVER + 200);
|
||||
Assertions.assertThat(listPath(directory)).describedAs(
|
||||
Assertions.assertThat(listPath(directory.toString())).describedAs(
|
||||
"AbfsClient.listPath result will contain a maximum of %d items "
|
||||
+ "even if listMaxResults >= %d or directory "
|
||||
+ "contains more than %d items", LIST_MAX_RESULTS_SERVER,
|
||||
|
@ -152,7 +153,7 @@ public final class ITestAbfsClient extends AbstractAbfsIntegrationTest {
|
|||
.setListMaxResults(listMaxResults);
|
||||
}
|
||||
|
||||
private void createDirectoryWithNFiles(String directory, int n)
|
||||
private void createDirectoryWithNFiles(Path directory, int n)
|
||||
throws ExecutionException, InterruptedException {
|
||||
final List<Future<Void>> tasks = new ArrayList<>();
|
||||
ExecutorService es = Executors.newFixedThreadPool(10);
|
||||
|
|
|
@ -237,8 +237,8 @@ public class ITestAbfsListStatusRemoteIterator extends AbstractAbfsIntegrationTe
|
|||
@Test
|
||||
public void testHasNextForFile() throws Exception {
|
||||
final AzureBlobFileSystem fs = getFileSystem();
|
||||
String testFileName = "testFile";
|
||||
Path testFile = new Path(testFileName);
|
||||
Path testFile = path("testFile");
|
||||
String testFileName = testFile.toString();
|
||||
getFileSystem().create(testFile);
|
||||
setPageSize(10);
|
||||
RemoteIterator<FileStatus> fsItr = fs.listStatusIterator(testFile);
|
||||
|
@ -304,7 +304,7 @@ public class ITestAbfsListStatusRemoteIterator extends AbstractAbfsIntegrationTe
|
|||
|
||||
private Path createTestDirectory() throws IOException {
|
||||
String testDirectoryName = "testDirectory" + System.currentTimeMillis();
|
||||
Path testDirectory = new Path(testDirectoryName);
|
||||
Path testDirectory = path(testDirectoryName);
|
||||
getFileSystem().mkdirs(testDirectory);
|
||||
return testDirectory;
|
||||
}
|
||||
|
|
|
@ -45,7 +45,7 @@ import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.M
|
|||
*/
|
||||
@RunWith(Parameterized.class)
|
||||
public class ITestAbfsReadWriteAndSeek extends AbstractAbfsScaleTest {
|
||||
private static final Path TEST_PATH = new Path("/testfile");
|
||||
private static final String TEST_PATH = "/testfile";
|
||||
|
||||
@Parameterized.Parameters(name = "Size={0}")
|
||||
public static Iterable<Object[]> sizes() {
|
||||
|
@ -75,13 +75,14 @@ public class ITestAbfsReadWriteAndSeek extends AbstractAbfsScaleTest {
|
|||
final byte[] b = new byte[2 * bufferSize];
|
||||
new Random().nextBytes(b);
|
||||
|
||||
try (FSDataOutputStream stream = fs.create(TEST_PATH)) {
|
||||
Path testPath = path(TEST_PATH);
|
||||
try (FSDataOutputStream stream = fs.create(testPath)) {
|
||||
stream.write(b);
|
||||
}
|
||||
|
||||
final byte[] readBuffer = new byte[2 * bufferSize];
|
||||
int result;
|
||||
try (FSDataInputStream inputStream = fs.open(TEST_PATH)) {
|
||||
try (FSDataInputStream inputStream = fs.open(testPath)) {
|
||||
((AbfsInputStream) inputStream.getWrappedStream()).registerListener(
|
||||
new TracingHeaderValidator(abfsConfiguration.getClientCorrelationId(),
|
||||
fs.getFileSystemId(), FSOperationType.READ, true, 0,
|
||||
|
@ -112,7 +113,8 @@ public class ITestAbfsReadWriteAndSeek extends AbstractAbfsScaleTest {
|
|||
|
||||
final byte[] b = new byte[bufferSize * 10];
|
||||
new Random().nextBytes(b);
|
||||
try (FSDataOutputStream stream = fs.create(TEST_PATH)) {
|
||||
Path testPath = path(TEST_PATH);
|
||||
try (FSDataOutputStream stream = fs.create(testPath)) {
|
||||
((AbfsOutputStream) stream.getWrappedStream()).registerListener(
|
||||
new TracingHeaderValidator(abfsConfiguration.getClientCorrelationId(),
|
||||
fs.getFileSystemId(), FSOperationType.WRITE, false, 0,
|
||||
|
@ -126,7 +128,7 @@ public class ITestAbfsReadWriteAndSeek extends AbstractAbfsScaleTest {
|
|||
fs.registerListener(
|
||||
new TracingHeaderValidator(abfsConfiguration.getClientCorrelationId(),
|
||||
fs.getFileSystemId(), FSOperationType.OPEN, false, 0));
|
||||
try (FSDataInputStream inputStream = fs.open(TEST_PATH)) {
|
||||
try (FSDataInputStream inputStream = fs.open(testPath)) {
|
||||
((AbfsInputStream) inputStream.getWrappedStream()).registerListener(
|
||||
new TracingHeaderValidator(abfsConfiguration.getClientCorrelationId(),
|
||||
fs.getFileSystemId(), FSOperationType.READ, false, 0,
|
||||
|
|
|
@ -91,7 +91,7 @@ public class ITestAbfsStatistics extends AbstractAbfsIntegrationTest {
|
|||
|
||||
fs.mkdirs(createDirectoryPath);
|
||||
fs.createNonRecursive(createFilePath, FsPermission
|
||||
.getDefault(), false, 1024, (short) 1, 1024, null);
|
||||
.getDefault(), false, 1024, (short) 1, 1024, null).close();
|
||||
|
||||
Map<String, Long> metricMap = fs.getInstrumentationMap();
|
||||
/*
|
||||
|
@ -117,7 +117,7 @@ public class ITestAbfsStatistics extends AbstractAbfsIntegrationTest {
|
|||
fs.mkdirs(path(getMethodName() + "Dir" + i));
|
||||
fs.createNonRecursive(path(getMethodName() + i),
|
||||
FsPermission.getDefault(), false, 1024, (short) 1,
|
||||
1024, null);
|
||||
1024, null).close();
|
||||
}
|
||||
|
||||
metricMap = fs.getInstrumentationMap();
|
||||
|
@ -160,7 +160,7 @@ public class ITestAbfsStatistics extends AbstractAbfsIntegrationTest {
|
|||
files_deleted counters.
|
||||
*/
|
||||
fs.mkdirs(createDirectoryPath);
|
||||
fs.create(path(createDirectoryPath + getMethodName()));
|
||||
fs.create(path(createDirectoryPath + getMethodName())).close();
|
||||
fs.delete(createDirectoryPath, true);
|
||||
|
||||
Map<String, Long> metricMap = fs.getInstrumentationMap();
|
||||
|
@ -179,7 +179,7 @@ public class ITestAbfsStatistics extends AbstractAbfsIntegrationTest {
|
|||
directories_deleted is called or not.
|
||||
*/
|
||||
fs.mkdirs(createDirectoryPath);
|
||||
fs.create(createFilePath);
|
||||
fs.create(createFilePath).close();
|
||||
fs.delete(createDirectoryPath, true);
|
||||
metricMap = fs.getInstrumentationMap();
|
||||
|
||||
|
@ -199,9 +199,9 @@ public class ITestAbfsStatistics extends AbstractAbfsIntegrationTest {
|
|||
Path createFilePath = path(getMethodName());
|
||||
Path destCreateFilePath = path(getMethodName() + "New");
|
||||
|
||||
fs.create(createFilePath);
|
||||
fs.open(createFilePath);
|
||||
fs.append(createFilePath);
|
||||
fs.create(createFilePath).close();
|
||||
fs.open(createFilePath).close();
|
||||
fs.append(createFilePath).close();
|
||||
assertTrue(fs.rename(createFilePath, destCreateFilePath));
|
||||
|
||||
Map<String, Long> metricMap = fs.getInstrumentationMap();
|
||||
|
@ -225,11 +225,11 @@ public class ITestAbfsStatistics extends AbstractAbfsIntegrationTest {
|
|||
//re-initialising Abfs to reset statistic values.
|
||||
fs.initialize(fs.getUri(), fs.getConf());
|
||||
|
||||
fs.create(destCreateFilePath);
|
||||
fs.create(destCreateFilePath).close();
|
||||
|
||||
for (int i = 0; i < NUMBER_OF_OPS; i++) {
|
||||
fs.open(destCreateFilePath);
|
||||
fs.append(destCreateFilePath);
|
||||
fs.append(destCreateFilePath).close();
|
||||
}
|
||||
|
||||
metricMap = fs.getInstrumentationMap();
|
||||
|
|
|
@ -52,8 +52,8 @@ public class ITestAbfsStreamStatistics extends AbstractAbfsIntegrationTest {
|
|||
+ "Abfs");
|
||||
|
||||
final AzureBlobFileSystem fs = getFileSystem();
|
||||
Path smallOperationsFile = new Path("testOneReadWriteOps");
|
||||
Path largeOperationsFile = new Path("testLargeReadWriteOps");
|
||||
Path smallOperationsFile = path("testOneReadWriteOps");
|
||||
Path largeOperationsFile = path("testLargeReadWriteOps");
|
||||
FileSystem.Statistics statistics = fs.getFsStatistics();
|
||||
String testReadWriteOps = "test this";
|
||||
statistics.reset();
|
||||
|
|
|
@ -35,8 +35,8 @@ import org.apache.hadoop.fs.contract.ContractTestUtils;
|
|||
*/
|
||||
public class ITestAzureBlobFileSystemAppend extends
|
||||
AbstractAbfsIntegrationTest {
|
||||
private static final Path TEST_FILE_PATH = new Path("testfile");
|
||||
private static final Path TEST_FOLDER_PATH = new Path("testFolder");
|
||||
private static final String TEST_FILE_PATH = "testfile";
|
||||
private static final String TEST_FOLDER_PATH = "testFolder";
|
||||
|
||||
public ITestAzureBlobFileSystemAppend() throws Exception {
|
||||
super();
|
||||
|
@ -45,15 +45,15 @@ public class ITestAzureBlobFileSystemAppend extends
|
|||
@Test(expected = FileNotFoundException.class)
|
||||
public void testAppendDirShouldFail() throws Exception {
|
||||
final AzureBlobFileSystem fs = getFileSystem();
|
||||
final Path filePath = TEST_FILE_PATH;
|
||||
final Path filePath = path(TEST_FILE_PATH);
|
||||
fs.mkdirs(filePath);
|
||||
fs.append(filePath, 0);
|
||||
fs.append(filePath, 0).close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAppendWithLength0() throws Exception {
|
||||
final AzureBlobFileSystem fs = getFileSystem();
|
||||
try(FSDataOutputStream stream = fs.create(TEST_FILE_PATH)) {
|
||||
try(FSDataOutputStream stream = fs.create(path(TEST_FILE_PATH))) {
|
||||
final byte[] b = new byte[1024];
|
||||
new Random().nextBytes(b);
|
||||
stream.write(b, 1000, 0);
|
||||
|
@ -65,28 +65,29 @@ public class ITestAzureBlobFileSystemAppend extends
|
|||
@Test(expected = FileNotFoundException.class)
|
||||
public void testAppendFileAfterDelete() throws Exception {
|
||||
final AzureBlobFileSystem fs = getFileSystem();
|
||||
final Path filePath = TEST_FILE_PATH;
|
||||
final Path filePath = path(TEST_FILE_PATH);
|
||||
ContractTestUtils.touch(fs, filePath);
|
||||
fs.delete(filePath, false);
|
||||
|
||||
fs.append(filePath);
|
||||
fs.append(filePath).close();
|
||||
}
|
||||
|
||||
@Test(expected = FileNotFoundException.class)
|
||||
public void testAppendDirectory() throws Exception {
|
||||
final AzureBlobFileSystem fs = getFileSystem();
|
||||
final Path folderPath = TEST_FOLDER_PATH;
|
||||
final Path folderPath = path(TEST_FOLDER_PATH);
|
||||
fs.mkdirs(folderPath);
|
||||
fs.append(folderPath);
|
||||
fs.append(folderPath).close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTracingForAppend() throws IOException {
|
||||
AzureBlobFileSystem fs = getFileSystem();
|
||||
fs.create(TEST_FILE_PATH);
|
||||
Path testPath = path(TEST_FILE_PATH);
|
||||
fs.create(testPath).close();
|
||||
fs.registerListener(new TracingHeaderValidator(
|
||||
fs.getAbfsStore().getAbfsConfiguration().getClientCorrelationId(),
|
||||
fs.getFileSystemId(), FSOperationType.APPEND, false, 0));
|
||||
fs.append(TEST_FILE_PATH, 10);
|
||||
fs.append(testPath, 10);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -99,7 +99,7 @@ public class ITestAzureBlobFileSystemAuthorization extends AbstractAbfsIntegrati
|
|||
this.getConfiguration().getRawConfiguration());
|
||||
intercept(SASTokenProviderException.class,
|
||||
() -> {
|
||||
testFs.create(new org.apache.hadoop.fs.Path("/testFile"));
|
||||
testFs.create(new org.apache.hadoop.fs.Path("/testFile")).close();
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -114,7 +114,7 @@ public class ITestAzureBlobFileSystemAuthorization extends AbstractAbfsIntegrati
|
|||
testFs.initialize(fs.getUri(), this.getConfiguration().getRawConfiguration());
|
||||
intercept(SASTokenProviderException.class,
|
||||
()-> {
|
||||
testFs.create(new org.apache.hadoop.fs.Path("/testFile"));
|
||||
testFs.create(new org.apache.hadoop.fs.Path("/testFile")).close();
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -297,7 +297,7 @@ public class ITestAzureBlobFileSystemAuthorization extends AbstractAbfsIntegrati
|
|||
fs.listStatus(reqPath);
|
||||
break;
|
||||
case CreatePath:
|
||||
fs.create(reqPath);
|
||||
fs.create(reqPath).close();
|
||||
break;
|
||||
case RenamePath:
|
||||
fs.rename(reqPath,
|
||||
|
|
|
@ -50,13 +50,16 @@ public class ITestAzureBlobFileSystemBackCompat extends
|
|||
CloudBlobContainer container = blobClient.getContainerReference(this.getFileSystemName());
|
||||
container.createIfNotExists();
|
||||
|
||||
CloudBlockBlob blockBlob = container.getBlockBlobReference("test/10/10/10");
|
||||
Path testPath = getUniquePath("test");
|
||||
CloudBlockBlob blockBlob = container
|
||||
.getBlockBlobReference(testPath + "/10/10/10");
|
||||
blockBlob.uploadText("");
|
||||
|
||||
blockBlob = container.getBlockBlobReference("test/10/123/3/2/1/3");
|
||||
blockBlob = container.getBlockBlobReference(testPath + "/10/123/3/2/1/3");
|
||||
blockBlob.uploadText("");
|
||||
|
||||
FileStatus[] fileStatuses = fs.listStatus(new Path("/test/10/"));
|
||||
FileStatus[] fileStatuses = fs
|
||||
.listStatus(new Path(String.format("/%s/10/", testPath)));
|
||||
assertEquals(2, fileStatuses.length);
|
||||
assertEquals("10", fileStatuses[0].getPath().getName());
|
||||
assertTrue(fileStatuses[0].isDirectory());
|
||||
|
|
|
@ -352,7 +352,8 @@ public class ITestAzureBlobFileSystemCheckAccess
|
|||
|
||||
private Path setupTestDirectoryAndUserAccess(String testFileName,
|
||||
FsAction fsAction) throws Exception {
|
||||
Path file = new Path(TEST_FOLDER_PATH + testFileName);
|
||||
Path testPath = path(TEST_FOLDER_PATH);
|
||||
Path file = new Path(testPath + testFileName);
|
||||
file = this.superUserFs.makeQualified(file);
|
||||
this.superUserFs.delete(file, true);
|
||||
this.superUserFs.create(file);
|
||||
|
|
|
@ -53,7 +53,7 @@ public class ITestAzureBlobFileSystemCopy extends AbstractAbfsIntegrationTest {
|
|||
localFs.delete(localFilePath, true);
|
||||
try {
|
||||
writeString(localFs, localFilePath, "Testing");
|
||||
Path dstPath = new Path("copiedFromLocal");
|
||||
Path dstPath = path("copiedFromLocal");
|
||||
assertTrue(FileUtil.copy(localFs, localFilePath, fs, dstPath, false,
|
||||
fs.getConf()));
|
||||
assertIsFile(fs, dstPath);
|
||||
|
|
|
@ -69,7 +69,7 @@ import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CONNECTIONS_MADE;
|
|||
public class ITestAzureBlobFileSystemCreate extends
|
||||
AbstractAbfsIntegrationTest {
|
||||
private static final Path TEST_FILE_PATH = new Path("testfile");
|
||||
private static final Path TEST_FOLDER_PATH = new Path("testFolder");
|
||||
private static final String TEST_FOLDER_PATH = "testFolder";
|
||||
private static final String TEST_CHILD_FILE = "childFile";
|
||||
|
||||
public ITestAzureBlobFileSystemCreate() throws Exception {
|
||||
|
@ -92,7 +92,8 @@ public class ITestAzureBlobFileSystemCreate extends
|
|||
@SuppressWarnings("deprecation")
|
||||
public void testCreateNonRecursive() throws Exception {
|
||||
final AzureBlobFileSystem fs = getFileSystem();
|
||||
Path testFile = new Path(TEST_FOLDER_PATH, TEST_CHILD_FILE);
|
||||
Path testFolderPath = path(TEST_FOLDER_PATH);
|
||||
Path testFile = new Path(testFolderPath, TEST_CHILD_FILE);
|
||||
try {
|
||||
fs.createNonRecursive(testFile, true, 1024, (short) 1, 1024, null);
|
||||
fail("Should've thrown");
|
||||
|
@ -101,7 +102,7 @@ public class ITestAzureBlobFileSystemCreate extends
|
|||
fs.registerListener(new TracingHeaderValidator(
|
||||
fs.getAbfsStore().getAbfsConfiguration().getClientCorrelationId(),
|
||||
fs.getFileSystemId(), FSOperationType.MKDIR, false, 0));
|
||||
fs.mkdirs(TEST_FOLDER_PATH);
|
||||
fs.mkdirs(testFolderPath);
|
||||
fs.registerListener(null);
|
||||
|
||||
fs.createNonRecursive(testFile, true, 1024, (short) 1, 1024, null)
|
||||
|
@ -113,13 +114,14 @@ public class ITestAzureBlobFileSystemCreate extends
|
|||
@SuppressWarnings("deprecation")
|
||||
public void testCreateNonRecursive1() throws Exception {
|
||||
final AzureBlobFileSystem fs = getFileSystem();
|
||||
Path testFile = new Path(TEST_FOLDER_PATH, TEST_CHILD_FILE);
|
||||
Path testFolderPath = path(TEST_FOLDER_PATH);
|
||||
Path testFile = new Path(testFolderPath, TEST_CHILD_FILE);
|
||||
try {
|
||||
fs.createNonRecursive(testFile, FsPermission.getDefault(), EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE), 1024, (short) 1, 1024, null);
|
||||
fail("Should've thrown");
|
||||
} catch (FileNotFoundException expected) {
|
||||
}
|
||||
fs.mkdirs(TEST_FOLDER_PATH);
|
||||
fs.mkdirs(testFolderPath);
|
||||
fs.createNonRecursive(testFile, true, 1024, (short) 1, 1024, null)
|
||||
.close();
|
||||
assertIsFile(fs, testFile);
|
||||
|
@ -131,13 +133,14 @@ public class ITestAzureBlobFileSystemCreate extends
|
|||
public void testCreateNonRecursive2() throws Exception {
|
||||
final AzureBlobFileSystem fs = getFileSystem();
|
||||
|
||||
Path testFile = new Path(TEST_FOLDER_PATH, TEST_CHILD_FILE);
|
||||
Path testFolderPath = path(TEST_FOLDER_PATH);
|
||||
Path testFile = new Path(testFolderPath, TEST_CHILD_FILE);
|
||||
try {
|
||||
fs.createNonRecursive(testFile, FsPermission.getDefault(), false, 1024, (short) 1, 1024, null);
|
||||
fail("Should've thrown");
|
||||
} catch (FileNotFoundException e) {
|
||||
}
|
||||
fs.mkdirs(TEST_FOLDER_PATH);
|
||||
fs.mkdirs(testFolderPath);
|
||||
fs.createNonRecursive(testFile, true, 1024, (short) 1, 1024, null)
|
||||
.close();
|
||||
assertIsFile(fs, testFile);
|
||||
|
@ -149,7 +152,8 @@ public class ITestAzureBlobFileSystemCreate extends
|
|||
@Test
|
||||
public void testWriteAfterClose() throws Throwable {
|
||||
final AzureBlobFileSystem fs = getFileSystem();
|
||||
Path testPath = new Path(TEST_FOLDER_PATH, TEST_CHILD_FILE);
|
||||
Path testFolderPath = path(TEST_FOLDER_PATH);
|
||||
Path testPath = new Path(testFolderPath, TEST_CHILD_FILE);
|
||||
FSDataOutputStream out = fs.create(testPath);
|
||||
out.close();
|
||||
intercept(IOException.class, () -> out.write('a'));
|
||||
|
@ -169,7 +173,8 @@ public class ITestAzureBlobFileSystemCreate extends
|
|||
@Test
|
||||
public void testTryWithResources() throws Throwable {
|
||||
final AzureBlobFileSystem fs = getFileSystem();
|
||||
Path testPath = new Path(TEST_FOLDER_PATH, TEST_CHILD_FILE);
|
||||
Path testFolderPath = path(TEST_FOLDER_PATH);
|
||||
Path testPath = new Path(testFolderPath, TEST_CHILD_FILE);
|
||||
try (FSDataOutputStream out = fs.create(testPath)) {
|
||||
out.write('1');
|
||||
out.hsync();
|
||||
|
@ -202,7 +207,8 @@ public class ITestAzureBlobFileSystemCreate extends
|
|||
@Test
|
||||
public void testFilterFSWriteAfterClose() throws Throwable {
|
||||
final AzureBlobFileSystem fs = getFileSystem();
|
||||
Path testPath = new Path(TEST_FOLDER_PATH, TEST_CHILD_FILE);
|
||||
Path testFolderPath = path(TEST_FOLDER_PATH);
|
||||
Path testPath = new Path(testFolderPath, TEST_CHILD_FILE);
|
||||
FSDataOutputStream out = fs.create(testPath);
|
||||
intercept(FileNotFoundException.class,
|
||||
() -> {
|
||||
|
|
|
@ -53,6 +53,8 @@ import org.apache.hadoop.security.AccessControlException;
|
|||
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_SAS_TOKEN_PROVIDER_TYPE;
|
||||
import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.AUTHORIZATION_PERMISSION_MISS_MATCH;
|
||||
import static org.apache.hadoop.fs.azurebfs.utils.AclTestHelpers.aclEntry;
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertPathDoesNotExist;
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertPathExists;
|
||||
import static org.apache.hadoop.fs.permission.AclEntryScope.ACCESS;
|
||||
import static org.apache.hadoop.fs.permission.AclEntryScope.DEFAULT;
|
||||
import static org.apache.hadoop.fs.permission.AclEntryType.GROUP;
|
||||
|
@ -223,15 +225,15 @@ public class ITestAzureBlobFileSystemDelegationSAS extends AbstractAbfsIntegrati
|
|||
stream.writeBytes("hello");
|
||||
}
|
||||
|
||||
assertFalse(fs.exists(destinationPath));
|
||||
assertPathDoesNotExist(fs, "This path should not exist", destinationPath);
|
||||
fs.rename(sourcePath, destinationPath);
|
||||
assertFalse(fs.exists(sourcePath));
|
||||
assertTrue(fs.exists(destinationPath));
|
||||
assertPathDoesNotExist(fs, "This path should not exist", sourcePath);
|
||||
assertPathExists(fs, "This path should exist", destinationPath);
|
||||
|
||||
assertFalse(fs.exists(destinationDir));
|
||||
assertPathDoesNotExist(fs, "This path should not exist", destinationDir);
|
||||
fs.rename(sourceDir, destinationDir);
|
||||
assertFalse(fs.exists(sourceDir));
|
||||
assertTrue(fs.exists(destinationDir));
|
||||
assertPathDoesNotExist(fs, "This path should not exist", sourceDir);
|
||||
assertPathExists(fs, "This path should exist", destinationDir);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -246,13 +248,13 @@ public class ITestAzureBlobFileSystemDelegationSAS extends AbstractAbfsIntegrati
|
|||
stream.writeBytes("hello");
|
||||
}
|
||||
|
||||
assertTrue(fs.exists(filePath));
|
||||
assertPathExists(fs, "This path should exist", filePath);
|
||||
fs.delete(filePath, false);
|
||||
assertFalse(fs.exists(filePath));
|
||||
assertPathDoesNotExist(fs, "This path should not exist", filePath);
|
||||
|
||||
assertTrue(fs.exists(dirPath));
|
||||
assertPathExists(fs, "This path should exist", dirPath);
|
||||
fs.delete(dirPath, false);
|
||||
assertFalse(fs.exists(dirPath));
|
||||
assertPathDoesNotExist(fs, "This path should not exist", dirPath);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -267,11 +269,11 @@ public class ITestAzureBlobFileSystemDelegationSAS extends AbstractAbfsIntegrati
|
|||
stream.writeBytes("hello");
|
||||
}
|
||||
|
||||
assertTrue(fs.exists(dirPath));
|
||||
assertTrue(fs.exists(filePath));
|
||||
assertPathExists(fs, "This path should exist", dirPath);
|
||||
assertPathExists(fs, "This path should exist", filePath);
|
||||
fs.delete(dirPath, true);
|
||||
assertFalse(fs.exists(filePath));
|
||||
assertFalse(fs.exists(dirPath));
|
||||
assertPathDoesNotExist(fs, "This path should not exist", filePath);
|
||||
assertPathDoesNotExist(fs, "This path should not exist", dirPath);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -395,8 +397,8 @@ public class ITestAzureBlobFileSystemDelegationSAS extends AbstractAbfsIntegrati
|
|||
@Test
|
||||
public void testSignatureMask() throws Exception {
|
||||
final AzureBlobFileSystem fs = getFileSystem();
|
||||
String src = "/testABC/test.xt";
|
||||
fs.create(new Path(src));
|
||||
String src = String.format("/testABC/test%s.xt", UUID.randomUUID());
|
||||
fs.create(new Path(src)).close();
|
||||
AbfsRestOperation abfsHttpRestOperation = fs.getAbfsClient()
|
||||
.renamePath(src, "/testABC" + "/abc.txt", null,
|
||||
getTestTracingContext(fs, false));
|
||||
|
|
|
@ -79,12 +79,13 @@ public class ITestAzureBlobFileSystemDelete extends
|
|||
public void testDeleteRoot() throws Exception {
|
||||
final AzureBlobFileSystem fs = getFileSystem();
|
||||
|
||||
fs.mkdirs(new Path("/testFolder0"));
|
||||
fs.mkdirs(new Path("/testFolder1"));
|
||||
fs.mkdirs(new Path("/testFolder2"));
|
||||
touch(new Path("/testFolder1/testfile"));
|
||||
touch(new Path("/testFolder1/testfile2"));
|
||||
touch(new Path("/testFolder1/testfile3"));
|
||||
Path testPath = path("/testFolder");
|
||||
fs.mkdirs(new Path(testPath + "_0"));
|
||||
fs.mkdirs(new Path(testPath + "_1"));
|
||||
fs.mkdirs(new Path(testPath + "_2"));
|
||||
touch(new Path(testPath + "_1/testfile"));
|
||||
touch(new Path(testPath + "_1/testfile2"));
|
||||
touch(new Path(testPath + "_1/testfile3"));
|
||||
|
||||
Path root = new Path("/");
|
||||
FileStatus[] ls = fs.listStatus(root);
|
||||
|
@ -98,7 +99,7 @@ public class ITestAzureBlobFileSystemDelete extends
|
|||
@Test()
|
||||
public void testOpenFileAfterDelete() throws Exception {
|
||||
final AzureBlobFileSystem fs = getFileSystem();
|
||||
Path testfile = new Path("/testFile");
|
||||
Path testfile = path("/testFile");
|
||||
touch(testfile);
|
||||
assertDeleted(fs, testfile, false);
|
||||
|
||||
|
@ -109,7 +110,7 @@ public class ITestAzureBlobFileSystemDelete extends
|
|||
@Test
|
||||
public void testEnsureFileIsDeleted() throws Exception {
|
||||
final AzureBlobFileSystem fs = getFileSystem();
|
||||
Path testfile = new Path("testfile");
|
||||
Path testfile = path("testfile");
|
||||
touch(testfile);
|
||||
assertDeleted(fs, testfile, false);
|
||||
assertPathDoesNotExist(fs, "deleted", testfile);
|
||||
|
@ -118,10 +119,10 @@ public class ITestAzureBlobFileSystemDelete extends
|
|||
@Test
|
||||
public void testDeleteDirectory() throws Exception {
|
||||
final AzureBlobFileSystem fs = getFileSystem();
|
||||
Path dir = new Path("testfile");
|
||||
Path dir = path("testfile");
|
||||
fs.mkdirs(dir);
|
||||
fs.mkdirs(new Path("testfile/test1"));
|
||||
fs.mkdirs(new Path("testfile/test1/test2"));
|
||||
fs.mkdirs(new Path(dir + "/test1"));
|
||||
fs.mkdirs(new Path(dir + "/test1/test2"));
|
||||
|
||||
assertDeleted(fs, dir, true);
|
||||
assertPathDoesNotExist(fs, "deleted", dir);
|
||||
|
@ -133,8 +134,9 @@ public class ITestAzureBlobFileSystemDelete extends
|
|||
final List<Future<Void>> tasks = new ArrayList<>();
|
||||
|
||||
ExecutorService es = Executors.newFixedThreadPool(10);
|
||||
Path dir = path("/test");
|
||||
for (int i = 0; i < 1000; i++) {
|
||||
final Path fileName = new Path("/test/" + i);
|
||||
final Path fileName = new Path(dir + "/" + i);
|
||||
Callable<Void> callable = new Callable<Void>() {
|
||||
@Override
|
||||
public Void call() throws Exception {
|
||||
|
@ -151,7 +153,6 @@ public class ITestAzureBlobFileSystemDelete extends
|
|||
}
|
||||
|
||||
es.shutdownNow();
|
||||
Path dir = new Path("/test");
|
||||
fs.registerListener(new TracingHeaderValidator(
|
||||
fs.getAbfsStore().getAbfsConfiguration().getClientCorrelationId(),
|
||||
fs.getFileSystemId(), FSOperationType.DELETE, false, 0));
|
||||
|
|
|
@ -33,6 +33,8 @@ import org.apache.hadoop.fs.Path;
|
|||
import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys;
|
||||
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_TOLERATE_CONCURRENT_APPEND;
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertPathDoesNotExist;
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertPathExists;
|
||||
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
|
||||
|
||||
/**
|
||||
|
@ -52,14 +54,14 @@ public class ITestAzureBlobFileSystemE2E extends AbstractAbfsIntegrationTest {
|
|||
|
||||
@Test
|
||||
public void testWriteOneByteToFile() throws Exception {
|
||||
final Path testFilePath = new Path(methodName.getMethodName());
|
||||
final Path testFilePath = path(methodName.getMethodName());
|
||||
testWriteOneByteToFile(testFilePath);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReadWriteBytesToFile() throws Exception {
|
||||
final AzureBlobFileSystem fs = getFileSystem();
|
||||
final Path testFilePath = new Path(methodName.getMethodName());
|
||||
final Path testFilePath = path(methodName.getMethodName());
|
||||
testWriteOneByteToFile(testFilePath);
|
||||
try(FSDataInputStream inputStream = fs.open(testFilePath,
|
||||
TEST_DEFAULT_BUFFER_SIZE)) {
|
||||
|
@ -78,7 +80,7 @@ public class ITestAzureBlobFileSystemE2E extends AbstractAbfsIntegrationTest {
|
|||
final byte[] b = new byte[2 * readBufferSize];
|
||||
new Random().nextBytes(b);
|
||||
|
||||
final Path testFilePath = new Path(methodName.getMethodName());
|
||||
final Path testFilePath = path(methodName.getMethodName());
|
||||
try(FSDataOutputStream writeStream = fs.create(testFilePath)) {
|
||||
writeStream.write(b);
|
||||
writeStream.flush();
|
||||
|
@ -107,7 +109,7 @@ public class ITestAzureBlobFileSystemE2E extends AbstractAbfsIntegrationTest {
|
|||
byte[] bytesToRead = new byte[readBufferSize];
|
||||
final byte[] b = new byte[2 * readBufferSize];
|
||||
new Random().nextBytes(b);
|
||||
final Path testFilePath = new Path(methodName.getMethodName());
|
||||
final Path testFilePath = path(methodName.getMethodName());
|
||||
|
||||
try (FSDataOutputStream writeStream = fs.create(testFilePath)) {
|
||||
writeStream.write(b);
|
||||
|
@ -130,7 +132,7 @@ public class ITestAzureBlobFileSystemE2E extends AbstractAbfsIntegrationTest {
|
|||
@Test
|
||||
public void testWriteWithBufferOffset() throws Exception {
|
||||
final AzureBlobFileSystem fs = getFileSystem();
|
||||
final Path testFilePath = new Path(methodName.getMethodName());
|
||||
final Path testFilePath = path(methodName.getMethodName());
|
||||
|
||||
final byte[] b = new byte[1024 * 1000];
|
||||
new Random().nextBytes(b);
|
||||
|
@ -151,7 +153,7 @@ public class ITestAzureBlobFileSystemE2E extends AbstractAbfsIntegrationTest {
|
|||
@Test
|
||||
public void testReadWriteHeavyBytesToFileWithSmallerChunks() throws Exception {
|
||||
final AzureBlobFileSystem fs = getFileSystem();
|
||||
final Path testFilePath = new Path(methodName.getMethodName());
|
||||
final Path testFilePath = path(methodName.getMethodName());
|
||||
|
||||
final byte[] writeBuffer = new byte[5 * 1000 * 1024];
|
||||
new Random().nextBytes(writeBuffer);
|
||||
|
@ -171,50 +173,51 @@ public class ITestAzureBlobFileSystemE2E extends AbstractAbfsIntegrationTest {
|
|||
@Test
|
||||
public void testReadWithFileNotFoundException() throws Exception {
|
||||
final AzureBlobFileSystem fs = getFileSystem();
|
||||
final Path testFilePath = new Path(methodName.getMethodName());
|
||||
final Path testFilePath = path(methodName.getMethodName());
|
||||
testWriteOneByteToFile(testFilePath);
|
||||
|
||||
FSDataInputStream inputStream = fs.open(testFilePath, TEST_DEFAULT_BUFFER_SIZE);
|
||||
try (FSDataInputStream inputStream = fs.open(testFilePath,
|
||||
TEST_DEFAULT_BUFFER_SIZE)) {
|
||||
fs.delete(testFilePath, true);
|
||||
assertFalse(fs.exists(testFilePath));
|
||||
assertPathDoesNotExist(fs, "This path should not exist", testFilePath);
|
||||
|
||||
intercept(FileNotFoundException.class,
|
||||
() -> inputStream.read(new byte[1]));
|
||||
intercept(FileNotFoundException.class, () -> inputStream.read(new byte[1]));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWriteWithFileNotFoundException() throws Exception {
|
||||
final AzureBlobFileSystem fs = getFileSystem();
|
||||
final Path testFilePath = new Path(methodName.getMethodName());
|
||||
final Path testFilePath = path(methodName.getMethodName());
|
||||
|
||||
FSDataOutputStream stream = fs.create(testFilePath);
|
||||
assertTrue(fs.exists(testFilePath));
|
||||
try (FSDataOutputStream stream = fs.create(testFilePath)) {
|
||||
assertPathExists(fs, "Path should exist", testFilePath);
|
||||
stream.write(TEST_BYTE);
|
||||
|
||||
fs.delete(testFilePath, true);
|
||||
assertFalse(fs.exists(testFilePath));
|
||||
assertPathDoesNotExist(fs, "This path should not exist", testFilePath);
|
||||
|
||||
// trigger append call
|
||||
intercept(FileNotFoundException.class,
|
||||
() -> stream.close());
|
||||
intercept(FileNotFoundException.class, () -> stream.close());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFlushWithFileNotFoundException() throws Exception {
|
||||
final AzureBlobFileSystem fs = getFileSystem();
|
||||
final Path testFilePath = new Path(methodName.getMethodName());
|
||||
final Path testFilePath = path(methodName.getMethodName());
|
||||
if (fs.getAbfsStore().isAppendBlobKey(fs.makeQualified(testFilePath).toString())) {
|
||||
return;
|
||||
}
|
||||
|
||||
FSDataOutputStream stream = fs.create(testFilePath);
|
||||
assertTrue(fs.exists(testFilePath));
|
||||
try (FSDataOutputStream stream = fs.create(testFilePath)) {
|
||||
assertPathExists(fs, "This path should exist", testFilePath);
|
||||
|
||||
fs.delete(testFilePath, true);
|
||||
assertFalse(fs.exists(testFilePath));
|
||||
assertPathDoesNotExist(fs, "This path should not exist", testFilePath);
|
||||
|
||||
intercept(FileNotFoundException.class,
|
||||
() -> stream.close());
|
||||
intercept(FileNotFoundException.class, () -> stream.close());
|
||||
}
|
||||
}
|
||||
|
||||
private void testWriteOneByteToFile(Path testFilePath) throws Exception {
|
||||
|
|
|
@ -27,6 +27,8 @@ import org.apache.hadoop.fs.FileStatus;
|
|||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertPathExists;
|
||||
|
||||
/**
|
||||
* Test FileStatus.
|
||||
*/
|
||||
|
@ -37,8 +39,8 @@ public class ITestAzureBlobFileSystemFileStatus extends
|
|||
private static final String DEFAULT_UMASK_VALUE = "027";
|
||||
private static final String FULL_PERMISSION = "777";
|
||||
|
||||
private static final Path TEST_FILE = new Path("testFile");
|
||||
private static final Path TEST_FOLDER = new Path("testDir");
|
||||
private static final String TEST_FILE = "testFile";
|
||||
private static final String TEST_FOLDER = "testDir";
|
||||
|
||||
public ITestAzureBlobFileSystemFileStatus() throws Exception {
|
||||
super();
|
||||
|
@ -57,8 +59,9 @@ public class ITestAzureBlobFileSystemFileStatus extends
|
|||
public void testFileStatusPermissionsAndOwnerAndGroup() throws Exception {
|
||||
final AzureBlobFileSystem fs = this.getFileSystem();
|
||||
fs.getConf().set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, DEFAULT_UMASK_VALUE);
|
||||
touch(TEST_FILE);
|
||||
validateStatus(fs, TEST_FILE, false);
|
||||
Path testFile = path(TEST_FILE);
|
||||
touch(testFile);
|
||||
validateStatus(fs, testFile, false);
|
||||
}
|
||||
|
||||
private FileStatus validateStatus(final AzureBlobFileSystem fs, final Path name, final boolean isDir)
|
||||
|
@ -93,9 +96,10 @@ public class ITestAzureBlobFileSystemFileStatus extends
|
|||
public void testFolderStatusPermissionsAndOwnerAndGroup() throws Exception {
|
||||
final AzureBlobFileSystem fs = this.getFileSystem();
|
||||
fs.getConf().set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, DEFAULT_UMASK_VALUE);
|
||||
fs.mkdirs(TEST_FOLDER);
|
||||
Path testFolder = path(TEST_FOLDER);
|
||||
fs.mkdirs(testFolder);
|
||||
|
||||
validateStatus(fs, TEST_FOLDER, true);
|
||||
validateStatus(fs, testFolder, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -108,11 +112,11 @@ public class ITestAzureBlobFileSystemFileStatus extends
|
|||
Path pathwithouthost2 = new Path("/abfs/file2.txt");
|
||||
|
||||
// verify compatibility of this path format
|
||||
fs.create(pathWithHost1);
|
||||
assertTrue(fs.exists(pathwithouthost1));
|
||||
fs.create(pathWithHost1).close();
|
||||
assertPathExists(fs, "This path should exist", pathwithouthost1);
|
||||
|
||||
fs.create(pathwithouthost2);
|
||||
assertTrue(fs.exists(pathWithHost2));
|
||||
fs.create(pathwithouthost2).close();
|
||||
assertPathExists(fs, "This path should exist", pathWithHost2);
|
||||
|
||||
// verify get
|
||||
FileStatus fileStatus1 = fs.getFileStatus(pathWithHost1);
|
||||
|
@ -125,13 +129,13 @@ public class ITestAzureBlobFileSystemFileStatus extends
|
|||
@Test
|
||||
public void testLastModifiedTime() throws IOException {
|
||||
AzureBlobFileSystem fs = this.getFileSystem();
|
||||
Path testFilePath = new Path("childfile1.txt");
|
||||
Path testFilePath = path("childfile1.txt");
|
||||
long createStartTime = System.currentTimeMillis();
|
||||
long minCreateStartTime = (createStartTime / 1000) * 1000 - 1;
|
||||
// Dividing and multiplying by 1000 to make last 3 digits 0.
|
||||
// It is observed that modification time is returned with last 3
|
||||
// digits 0 always.
|
||||
fs.create(testFilePath);
|
||||
fs.create(testFilePath).close();
|
||||
long createEndTime = System.currentTimeMillis();
|
||||
FileStatus fStat = fs.getFileStatus(testFilePath);
|
||||
long lastModifiedTime = fStat.getModificationTime();
|
||||
|
|
|
@ -316,15 +316,14 @@ public class ITestAzureBlobFileSystemFlush extends AbstractAbfsScaleTest {
|
|||
|
||||
byte[] buf = new byte[10];
|
||||
new Random().nextBytes(buf);
|
||||
FSDataOutputStream out = fs.create(new Path("/testFile"));
|
||||
((AbfsOutputStream) out.getWrappedStream()).registerListener(
|
||||
new TracingHeaderValidator(
|
||||
fs.getAbfsStore().getAbfsConfiguration().getClientCorrelationId(),
|
||||
fs.getFileSystemId(), FSOperationType.WRITE, false, 0,
|
||||
try (FSDataOutputStream out = fs.create(new Path("/testFile"))) {
|
||||
((AbfsOutputStream) out.getWrappedStream()).registerListener(new TracingHeaderValidator(
|
||||
fs.getAbfsStore().getAbfsConfiguration().getClientCorrelationId(), fs.getFileSystemId(), FSOperationType.WRITE, false, 0,
|
||||
((AbfsOutputStream) out.getWrappedStream()).getStreamID()));
|
||||
out.write(buf);
|
||||
out.hsync();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStreamCapabilitiesWithFlushDisabled() throws Exception {
|
||||
|
|
|
@ -99,7 +99,7 @@ public class ITestAzureBlobFileSystemListStatus extends
|
|||
@Test
|
||||
public void testListFileVsListDir() throws Exception {
|
||||
final AzureBlobFileSystem fs = getFileSystem();
|
||||
Path path = new Path("/testFile");
|
||||
Path path = path("/testFile");
|
||||
try(FSDataOutputStream ignored = fs.create(path)) {
|
||||
FileStatus[] testFiles = fs.listStatus(path);
|
||||
assertEquals("length of test files", 1, testFiles.length);
|
||||
|
@ -111,19 +111,20 @@ public class ITestAzureBlobFileSystemListStatus extends
|
|||
@Test
|
||||
public void testListFileVsListDir2() throws Exception {
|
||||
final AzureBlobFileSystem fs = getFileSystem();
|
||||
fs.mkdirs(new Path("/testFolder"));
|
||||
fs.mkdirs(new Path("/testFolder/testFolder2"));
|
||||
fs.mkdirs(new Path("/testFolder/testFolder2/testFolder3"));
|
||||
Path testFile0Path = new Path("/testFolder/testFolder2/testFolder3/testFile");
|
||||
Path testFolder = path("/testFolder");
|
||||
fs.mkdirs(testFolder);
|
||||
fs.mkdirs(new Path(testFolder + "/testFolder2"));
|
||||
fs.mkdirs(new Path(testFolder + "/testFolder2/testFolder3"));
|
||||
Path testFile0Path = new Path(
|
||||
testFolder + "/testFolder2/testFolder3/testFile");
|
||||
ContractTestUtils.touch(fs, testFile0Path);
|
||||
|
||||
FileStatus[] testFiles = fs.listStatus(testFile0Path);
|
||||
assertEquals("Wrong listing size of file " + testFile0Path,
|
||||
1, testFiles.length);
|
||||
FileStatus file0 = testFiles[0];
|
||||
assertEquals("Wrong path for " + file0,
|
||||
new Path(getTestUrl(), "/testFolder/testFolder2/testFolder3/testFile"),
|
||||
file0.getPath());
|
||||
assertEquals("Wrong path for " + file0, new Path(getTestUrl(),
|
||||
testFolder + "/testFolder2/testFolder3/testFile"), file0.getPath());
|
||||
assertIsFileReference(file0);
|
||||
}
|
||||
|
||||
|
@ -136,18 +137,18 @@ public class ITestAzureBlobFileSystemListStatus extends
|
|||
@Test
|
||||
public void testListFiles() throws Exception {
|
||||
final AzureBlobFileSystem fs = getFileSystem();
|
||||
Path testDir = new Path("/test");
|
||||
Path testDir = path("/test");
|
||||
fs.mkdirs(testDir);
|
||||
|
||||
FileStatus[] fileStatuses = fs.listStatus(new Path("/"));
|
||||
assertEquals(1, fileStatuses.length);
|
||||
|
||||
fs.mkdirs(new Path("/test/sub"));
|
||||
fs.mkdirs(new Path(testDir + "/sub"));
|
||||
fileStatuses = fs.listStatus(testDir);
|
||||
assertEquals(1, fileStatuses.length);
|
||||
assertEquals("sub", fileStatuses[0].getPath().getName());
|
||||
assertIsDirectoryReference(fileStatuses[0]);
|
||||
Path childF = fs.makeQualified(new Path("/test/f"));
|
||||
Path childF = fs.makeQualified(new Path(testDir + "/f"));
|
||||
touch(childF);
|
||||
fileStatuses = fs.listStatus(testDir);
|
||||
assertEquals(2, fileStatuses.length);
|
||||
|
@ -193,7 +194,7 @@ public class ITestAzureBlobFileSystemListStatus extends
|
|||
final AzureBlobFileSystem fs = getFileSystem();
|
||||
|
||||
Path nontrailingPeriodDir = path("testTrailingDir/dir");
|
||||
Path trailingPeriodDir = path("testTrailingDir/dir.");
|
||||
Path trailingPeriodDir = new Path("testMkdirTrailingDir/dir.");
|
||||
|
||||
assertMkdirs(fs, nontrailingPeriodDir);
|
||||
|
||||
|
@ -212,8 +213,8 @@ public class ITestAzureBlobFileSystemListStatus extends
|
|||
boolean exceptionThrown = false;
|
||||
final AzureBlobFileSystem fs = getFileSystem();
|
||||
|
||||
Path trailingPeriodFile = path("testTrailingDir/file.");
|
||||
Path nontrailingPeriodFile = path("testTrailingDir/file");
|
||||
Path trailingPeriodFile = new Path("testTrailingDir/file.");
|
||||
Path nontrailingPeriodFile = path("testCreateTrailingDir/file");
|
||||
|
||||
createFile(fs, nontrailingPeriodFile, false, new byte[0]);
|
||||
assertPathExists(fs, "Trailing period file does not exist",
|
||||
|
@ -235,7 +236,7 @@ public class ITestAzureBlobFileSystemListStatus extends
|
|||
final AzureBlobFileSystem fs = getFileSystem();
|
||||
|
||||
Path nonTrailingPeriodFile = path("testTrailingDir/file");
|
||||
Path trailingPeriodFile = path("testTrailingDir/file.");
|
||||
Path trailingPeriodFile = new Path("testRenameTrailingDir/file.");
|
||||
|
||||
createFile(fs, nonTrailingPeriodFile, false, new byte[0]);
|
||||
try {
|
||||
|
|
|
@ -49,7 +49,7 @@ public class ITestAzureBlobFileSystemMkDir extends AbstractAbfsIntegrationTest {
|
|||
DEFAULT_FS_AZURE_ENABLE_MKDIR_OVERWRITE || !getIsNamespaceEnabled(
|
||||
getFileSystem()));
|
||||
final AzureBlobFileSystem fs = getFileSystem();
|
||||
Path path = new Path("testFolder");
|
||||
Path path = path("testFolder");
|
||||
assertMkdirs(fs, path);
|
||||
assertMkdirs(fs, path);
|
||||
}
|
||||
|
@ -64,7 +64,7 @@ public class ITestAzureBlobFileSystemMkDir extends AbstractAbfsIntegrationTest {
|
|||
Configuration config = new Configuration(this.getRawConfiguration());
|
||||
config.set(FS_AZURE_ENABLE_MKDIR_OVERWRITE, Boolean.toString(false));
|
||||
AzureBlobFileSystem fs = getFileSystem(config);
|
||||
Path path = new Path("testFolder");
|
||||
Path path = path("testFolder");
|
||||
assertMkdirs(fs, path); //checks that mkdirs returns true
|
||||
long timeCreated = fs.getFileStatus(path).getModificationTime();
|
||||
assertMkdirs(fs, path); //call to existing dir should return success
|
||||
|
@ -78,8 +78,8 @@ public class ITestAzureBlobFileSystemMkDir extends AbstractAbfsIntegrationTest {
|
|||
DEFAULT_FS_AZURE_ENABLE_MKDIR_OVERWRITE && getIsNamespaceEnabled(
|
||||
getFileSystem()));
|
||||
final AzureBlobFileSystem fs = getFileSystem();
|
||||
Path path = new Path("testFilePath");
|
||||
fs.create(path);
|
||||
Path path = path("testFilePath");
|
||||
fs.create(path).close();
|
||||
assertTrue(fs.getFileStatus(path).isFile());
|
||||
intercept(FileAlreadyExistsException.class, () -> fs.mkdirs(path));
|
||||
}
|
||||
|
|
|
@ -45,6 +45,8 @@ import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_A
|
|||
import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_BLOB_DATA_CONTRIBUTOR_CLIENT_SECRET;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_BLOB_DATA_READER_CLIENT_ID;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_BLOB_DATA_READER_CLIENT_SECRET;
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertPathDoesNotExist;
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertPathExists;
|
||||
|
||||
/**
|
||||
* Test Azure Oauth with Blob Data contributor role and Blob Data Reader role.
|
||||
|
@ -54,8 +56,8 @@ import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_A
|
|||
public class ITestAzureBlobFileSystemOauth extends AbstractAbfsIntegrationTest{
|
||||
|
||||
private static final Path FILE_PATH = new Path("/testFile");
|
||||
private static final Path EXISTED_FILE_PATH = new Path("/existedFile");
|
||||
private static final Path EXISTED_FOLDER_PATH = new Path("/existedFolder");
|
||||
private static final String EXISTED_FILE_PATH = "/existedFile";
|
||||
private static final String EXISTED_FOLDER_PATH = "/existedFolder";
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(ITestAbfsStreamStatistics.class);
|
||||
|
||||
|
@ -72,7 +74,9 @@ public class ITestAzureBlobFileSystemOauth extends AbstractAbfsIntegrationTest{
|
|||
String secret = this.getConfiguration().get(TestConfigurationKeys.FS_AZURE_BLOB_DATA_CONTRIBUTOR_CLIENT_SECRET);
|
||||
Assume.assumeTrue("Contributor client secret not provided", secret != null);
|
||||
|
||||
prepareFiles();
|
||||
Path existedFilePath = path(EXISTED_FILE_PATH);
|
||||
Path existedFolderPath = path(EXISTED_FOLDER_PATH);
|
||||
prepareFiles(existedFilePath, existedFolderPath);
|
||||
|
||||
final AzureBlobFileSystem fs = getBlobConributor();
|
||||
|
||||
|
@ -80,39 +84,39 @@ public class ITestAzureBlobFileSystemOauth extends AbstractAbfsIntegrationTest{
|
|||
try(FSDataOutputStream stream = fs.create(FILE_PATH)) {
|
||||
stream.write(0);
|
||||
}
|
||||
assertTrue(fs.exists(FILE_PATH));
|
||||
assertPathExists(fs, "This path should exist", FILE_PATH);
|
||||
FileStatus fileStatus = fs.getFileStatus(FILE_PATH);
|
||||
assertEquals(1, fileStatus.getLen());
|
||||
// delete file
|
||||
assertTrue(fs.delete(FILE_PATH, true));
|
||||
assertFalse(fs.exists(FILE_PATH));
|
||||
assertPathDoesNotExist(fs, "This path should not exist", FILE_PATH);
|
||||
|
||||
// Verify Blob Data Contributor has full access to existed folder, file
|
||||
|
||||
// READ FOLDER
|
||||
assertTrue(fs.exists(EXISTED_FOLDER_PATH));
|
||||
assertPathExists(fs, "This path should exist", existedFolderPath);
|
||||
|
||||
//DELETE FOLDER
|
||||
fs.delete(EXISTED_FOLDER_PATH, true);
|
||||
assertFalse(fs.exists(EXISTED_FOLDER_PATH));
|
||||
fs.delete(existedFolderPath, true);
|
||||
assertPathDoesNotExist(fs, "This path should not exist", existedFolderPath);
|
||||
|
||||
// READ FILE
|
||||
try (FSDataInputStream stream = fs.open(EXISTED_FILE_PATH)) {
|
||||
try (FSDataInputStream stream = fs.open(existedFilePath)) {
|
||||
assertTrue(stream.read() != 0);
|
||||
}
|
||||
|
||||
assertEquals(0, fs.getFileStatus(EXISTED_FILE_PATH).getLen());
|
||||
assertEquals(0, fs.getFileStatus(existedFilePath).getLen());
|
||||
|
||||
// WRITE FILE
|
||||
try (FSDataOutputStream stream = fs.append(EXISTED_FILE_PATH)) {
|
||||
try (FSDataOutputStream stream = fs.append(existedFilePath)) {
|
||||
stream.write(0);
|
||||
}
|
||||
|
||||
assertEquals(1, fs.getFileStatus(EXISTED_FILE_PATH).getLen());
|
||||
assertEquals(1, fs.getFileStatus(existedFilePath).getLen());
|
||||
|
||||
// REMOVE FILE
|
||||
fs.delete(EXISTED_FILE_PATH, true);
|
||||
assertFalse(fs.exists(EXISTED_FILE_PATH));
|
||||
fs.delete(existedFilePath, true);
|
||||
assertPathDoesNotExist(fs, "This path should not exist", existedFilePath);
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -125,7 +129,9 @@ public class ITestAzureBlobFileSystemOauth extends AbstractAbfsIntegrationTest{
|
|||
String secret = this.getConfiguration().get(TestConfigurationKeys.FS_AZURE_BLOB_DATA_READER_CLIENT_SECRET);
|
||||
Assume.assumeTrue("Reader client secret not provided", secret != null);
|
||||
|
||||
prepareFiles();
|
||||
Path existedFilePath = path(EXISTED_FILE_PATH);
|
||||
Path existedFolderPath = path(EXISTED_FOLDER_PATH);
|
||||
prepareFiles(existedFilePath, existedFolderPath);
|
||||
final AzureBlobFileSystem fs = getBlobReader();
|
||||
|
||||
// Use abfsStore in this test to verify the ERROR code in AbfsRestOperationException
|
||||
|
@ -134,24 +140,24 @@ public class ITestAzureBlobFileSystemOauth extends AbstractAbfsIntegrationTest{
|
|||
// TEST READ FS
|
||||
Map<String, String> properties = abfsStore.getFilesystemProperties(tracingContext);
|
||||
// TEST READ FOLDER
|
||||
assertTrue(fs.exists(EXISTED_FOLDER_PATH));
|
||||
assertPathExists(fs, "This path should exist", existedFolderPath);
|
||||
|
||||
// TEST DELETE FOLDER
|
||||
try {
|
||||
abfsStore.delete(EXISTED_FOLDER_PATH, true, tracingContext);
|
||||
abfsStore.delete(existedFolderPath, true, tracingContext);
|
||||
} catch (AbfsRestOperationException e) {
|
||||
assertEquals(AzureServiceErrorCode.AUTHORIZATION_PERMISSION_MISS_MATCH, e.getErrorCode());
|
||||
}
|
||||
|
||||
// TEST READ FILE
|
||||
try (InputStream inputStream = abfsStore.openFileForRead(EXISTED_FILE_PATH, null,
|
||||
tracingContext)) {
|
||||
try (InputStream inputStream = abfsStore
|
||||
.openFileForRead(existedFilePath, null, tracingContext)) {
|
||||
assertTrue(inputStream.read() != 0);
|
||||
}
|
||||
|
||||
// TEST WRITE FILE
|
||||
try {
|
||||
abfsStore.openFileForWrite(EXISTED_FILE_PATH, fs.getFsStatistics(), true,
|
||||
abfsStore.openFileForWrite(existedFilePath, fs.getFsStatistics(), true,
|
||||
tracingContext);
|
||||
} catch (AbfsRestOperationException e) {
|
||||
assertEquals(AzureServiceErrorCode.AUTHORIZATION_PERMISSION_MISS_MATCH, e.getErrorCode());
|
||||
|
@ -161,14 +167,14 @@ public class ITestAzureBlobFileSystemOauth extends AbstractAbfsIntegrationTest{
|
|||
|
||||
}
|
||||
|
||||
private void prepareFiles() throws IOException {
|
||||
private void prepareFiles(Path existedFilePath, Path existedFolderPath) throws IOException {
|
||||
// create test files/folders to verify access control diff between
|
||||
// Blob data contributor and Blob data reader
|
||||
final AzureBlobFileSystem fs = this.getFileSystem();
|
||||
fs.create(EXISTED_FILE_PATH);
|
||||
assertTrue(fs.exists(EXISTED_FILE_PATH));
|
||||
fs.mkdirs(EXISTED_FOLDER_PATH);
|
||||
assertTrue(fs.exists(EXISTED_FOLDER_PATH));
|
||||
fs.create(existedFilePath).close();
|
||||
assertPathExists(fs, "This path should exist", existedFilePath);
|
||||
fs.mkdirs(existedFolderPath);
|
||||
assertPathExists(fs, "This path should exist", existedFolderPath);
|
||||
}
|
||||
|
||||
private AzureBlobFileSystem getBlobConributor() throws Exception {
|
||||
|
|
|
@ -84,7 +84,8 @@ public class ITestAzureBlobFileSystemPermission extends AbstractAbfsIntegrationT
|
|||
new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE));
|
||||
fs.removeDefaultAcl(path.getParent());
|
||||
|
||||
fs.create(path, permission, true, KILOBYTE, (short) 1, KILOBYTE - 1, null);
|
||||
fs.create(path, permission, true, KILOBYTE, (short) 1, KILOBYTE - 1,
|
||||
null).close();
|
||||
FileStatus status = fs.getFileStatus(path);
|
||||
Assert.assertEquals(permission.applyUMask(DEFAULT_UMASK_PERMISSION), status.getPermission());
|
||||
}
|
||||
|
|
|
@ -86,7 +86,7 @@ public class ITestAzureBlobFileSystemRandomRead extends
|
|||
|
||||
@Test
|
||||
public void testBasicRead() throws Exception {
|
||||
Path testPath = new Path(TEST_FILE_PREFIX + "_testBasicRead");
|
||||
Path testPath = path(TEST_FILE_PREFIX + "_testBasicRead");
|
||||
assumeHugeFileExists(testPath);
|
||||
|
||||
try (FSDataInputStream inputStream = this.getFileSystem().open(testPath)) {
|
||||
|
@ -115,7 +115,7 @@ public class ITestAzureBlobFileSystemRandomRead extends
|
|||
public void testRandomRead() throws Exception {
|
||||
Assume.assumeFalse("This test does not support namespace enabled account",
|
||||
getIsNamespaceEnabled(getFileSystem()));
|
||||
Path testPath = new Path(TEST_FILE_PREFIX + "_testRandomRead");
|
||||
Path testPath = path(TEST_FILE_PREFIX + "_testRandomRead");
|
||||
assumeHugeFileExists(testPath);
|
||||
|
||||
try (
|
||||
|
@ -174,7 +174,7 @@ public class ITestAzureBlobFileSystemRandomRead extends
|
|||
*/
|
||||
@Test
|
||||
public void testSeekToNewSource() throws Exception {
|
||||
Path testPath = new Path(TEST_FILE_PREFIX + "_testSeekToNewSource");
|
||||
Path testPath = path(TEST_FILE_PREFIX + "_testSeekToNewSource");
|
||||
assumeHugeFileExists(testPath);
|
||||
|
||||
try (FSDataInputStream inputStream = this.getFileSystem().open(testPath)) {
|
||||
|
@ -189,7 +189,7 @@ public class ITestAzureBlobFileSystemRandomRead extends
|
|||
*/
|
||||
@Test
|
||||
public void testSkipBounds() throws Exception {
|
||||
Path testPath = new Path(TEST_FILE_PREFIX + "_testSkipBounds");
|
||||
Path testPath = path(TEST_FILE_PREFIX + "_testSkipBounds");
|
||||
long testFileLength = assumeHugeFileExists(testPath);
|
||||
|
||||
try (FSDataInputStream inputStream = this.getFileSystem().open(testPath)) {
|
||||
|
@ -230,7 +230,7 @@ public class ITestAzureBlobFileSystemRandomRead extends
|
|||
*/
|
||||
@Test
|
||||
public void testValidateSeekBounds() throws Exception {
|
||||
Path testPath = new Path(TEST_FILE_PREFIX + "_testValidateSeekBounds");
|
||||
Path testPath = path(TEST_FILE_PREFIX + "_testValidateSeekBounds");
|
||||
long testFileLength = assumeHugeFileExists(testPath);
|
||||
|
||||
try (FSDataInputStream inputStream = this.getFileSystem().open(testPath)) {
|
||||
|
@ -281,7 +281,7 @@ public class ITestAzureBlobFileSystemRandomRead extends
|
|||
*/
|
||||
@Test
|
||||
public void testSeekAndAvailableAndPosition() throws Exception {
|
||||
Path testPath = new Path(TEST_FILE_PREFIX + "_testSeekAndAvailableAndPosition");
|
||||
Path testPath = path(TEST_FILE_PREFIX + "_testSeekAndAvailableAndPosition");
|
||||
long testFileLength = assumeHugeFileExists(testPath);
|
||||
|
||||
try (FSDataInputStream inputStream = this.getFileSystem().open(testPath)) {
|
||||
|
@ -347,7 +347,7 @@ public class ITestAzureBlobFileSystemRandomRead extends
|
|||
*/
|
||||
@Test
|
||||
public void testSkipAndAvailableAndPosition() throws Exception {
|
||||
Path testPath = new Path(TEST_FILE_PREFIX + "_testSkipAndAvailableAndPosition");
|
||||
Path testPath = path(TEST_FILE_PREFIX + "_testSkipAndAvailableAndPosition");
|
||||
long testFileLength = assumeHugeFileExists(testPath);
|
||||
|
||||
try (FSDataInputStream inputStream = this.getFileSystem().open(testPath)) {
|
||||
|
@ -413,7 +413,8 @@ public class ITestAzureBlobFileSystemRandomRead extends
|
|||
@Test
|
||||
public void testSequentialReadAfterReverseSeekPerformance()
|
||||
throws Exception {
|
||||
Path testPath = new Path(TEST_FILE_PREFIX + "_testSequentialReadAfterReverseSeekPerformance");
|
||||
Path testPath = path(
|
||||
TEST_FILE_PREFIX + "_testSequentialReadAfterReverseSeekPerformance");
|
||||
assumeHugeFileExists(testPath);
|
||||
final int maxAttempts = 10;
|
||||
final double maxAcceptableRatio = 1.01;
|
||||
|
@ -446,7 +447,7 @@ public class ITestAzureBlobFileSystemRandomRead extends
|
|||
public void testRandomReadPerformance() throws Exception {
|
||||
Assume.assumeFalse("This test does not support namespace enabled account",
|
||||
getIsNamespaceEnabled(getFileSystem()));
|
||||
Path testPath = new Path(TEST_FILE_PREFIX + "_testRandomReadPerformance");
|
||||
Path testPath = path(TEST_FILE_PREFIX + "_testRandomReadPerformance");
|
||||
assumeHugeFileExists(testPath);
|
||||
|
||||
final AzureBlobFileSystem abFs = this.getFileSystem();
|
||||
|
|
|
@ -34,6 +34,7 @@ import org.apache.hadoop.fs.Path;
|
|||
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertIsFile;
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertMkdirs;
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertPathDoesNotExist;
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertPathExists;
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertRenameOutcome;
|
||||
|
||||
/**
|
||||
|
@ -72,13 +73,13 @@ public class ITestAzureBlobFileSystemRename extends
|
|||
@Test
|
||||
public void testRenameFileUnderDir() throws Exception {
|
||||
final AzureBlobFileSystem fs = getFileSystem();
|
||||
Path sourceDir = new Path("/testSrc");
|
||||
Path sourceDir = path("/testSrc");
|
||||
assertMkdirs(fs, sourceDir);
|
||||
String filename = "file1";
|
||||
Path file1 = new Path(sourceDir, filename);
|
||||
touch(file1);
|
||||
|
||||
Path destDir = new Path("/testDst");
|
||||
Path destDir = path("/testDst");
|
||||
assertRenameOutcome(fs, sourceDir, destDir, true);
|
||||
FileStatus[] fileStatus = fs.listStatus(destDir);
|
||||
assertNotNull("Null file status", fileStatus);
|
||||
|
@ -90,14 +91,15 @@ public class ITestAzureBlobFileSystemRename extends
|
|||
@Test
|
||||
public void testRenameDirectory() throws Exception {
|
||||
final AzureBlobFileSystem fs = getFileSystem();
|
||||
fs.mkdirs(new Path("testDir"));
|
||||
Path test1 = new Path("testDir/test1");
|
||||
Path testDir = path("testDir");
|
||||
fs.mkdirs(testDir);
|
||||
Path test1 = new Path(testDir + "/test1");
|
||||
fs.mkdirs(test1);
|
||||
fs.mkdirs(new Path("testDir/test1/test2"));
|
||||
fs.mkdirs(new Path("testDir/test1/test2/test3"));
|
||||
fs.mkdirs(new Path(testDir + "/test1/test2"));
|
||||
fs.mkdirs(new Path(testDir + "/test1/test2/test3"));
|
||||
|
||||
assertRenameOutcome(fs, test1,
|
||||
new Path("testDir/test10"), true);
|
||||
new Path(testDir + "/test10"), true);
|
||||
assertPathDoesNotExist(fs, "rename source dir", test1);
|
||||
}
|
||||
|
||||
|
@ -107,8 +109,9 @@ public class ITestAzureBlobFileSystemRename extends
|
|||
final List<Future<Void>> tasks = new ArrayList<>();
|
||||
|
||||
ExecutorService es = Executors.newFixedThreadPool(10);
|
||||
Path source = path("/test");
|
||||
for (int i = 0; i < 1000; i++) {
|
||||
final Path fileName = new Path("/test/" + i);
|
||||
final Path fileName = new Path(source + "/" + i);
|
||||
Callable<Void> callable = new Callable<Void>() {
|
||||
@Override
|
||||
public Void call() throws Exception {
|
||||
|
@ -125,8 +128,7 @@ public class ITestAzureBlobFileSystemRename extends
|
|||
}
|
||||
|
||||
es.shutdownNow();
|
||||
Path source = new Path("/test");
|
||||
Path dest = new Path("/renamedDir");
|
||||
Path dest = path("/renamedDir");
|
||||
assertRenameOutcome(fs, source, dest, true);
|
||||
|
||||
FileStatus[] files = fs.listStatus(dest);
|
||||
|
@ -150,14 +152,19 @@ public class ITestAzureBlobFileSystemRename extends
|
|||
@Test
|
||||
public void testPosixRenameDirectory() throws Exception {
|
||||
final AzureBlobFileSystem fs = this.getFileSystem();
|
||||
fs.mkdirs(new Path("testDir2/test1/test2/test3"));
|
||||
fs.mkdirs(new Path("testDir2/test4"));
|
||||
Assert.assertTrue(fs.rename(new Path("testDir2/test1/test2/test3"), new Path("testDir2/test4")));
|
||||
assertTrue(fs.exists(new Path("testDir2")));
|
||||
assertTrue(fs.exists(new Path("testDir2/test1/test2")));
|
||||
assertTrue(fs.exists(new Path("testDir2/test4")));
|
||||
assertTrue(fs.exists(new Path("testDir2/test4/test3")));
|
||||
assertFalse(fs.exists(new Path("testDir2/test1/test2/test3")));
|
||||
Path testDir2 = path("testDir2");
|
||||
fs.mkdirs(new Path(testDir2 + "/test1/test2/test3"));
|
||||
fs.mkdirs(new Path(testDir2 + "/test4"));
|
||||
Assert.assertTrue(fs.rename(new Path(testDir2 + "/test1/test2/test3"), new Path(testDir2 + "/test4")));
|
||||
assertPathExists(fs, "This path should exist", testDir2);
|
||||
assertPathExists(fs, "This path should exist",
|
||||
new Path(testDir2 + "/test1/test2"));
|
||||
assertPathExists(fs, "This path should exist",
|
||||
new Path(testDir2 + "/test4"));
|
||||
assertPathExists(fs, "This path should exist",
|
||||
new Path(testDir2 + "/test4/test3"));
|
||||
assertPathDoesNotExist(fs, "This path should not exist",
|
||||
new Path(testDir2 + "/test1/test2/test3"));
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -76,7 +76,7 @@ public class ITestAzureBlobFileSystemRenameUnicode extends
|
|||
@Test
|
||||
public void testRenameFileUsingUnicode() throws Exception {
|
||||
final AzureBlobFileSystem fs = getFileSystem();
|
||||
Path folderPath1 = new Path(srcDir);
|
||||
Path folderPath1 = path(srcDir);
|
||||
assertMkdirs(fs, folderPath1);
|
||||
assertIsDirectory(fs, folderPath1);
|
||||
Path filePath = new Path(folderPath1 + "/" + filename);
|
||||
|
|
|
@ -40,6 +40,7 @@ import org.apache.hadoop.fs.permission.AclStatus;
|
|||
import org.apache.hadoop.fs.permission.FsAction;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertPathExists;
|
||||
import static org.junit.Assume.assumeTrue;
|
||||
|
||||
import static org.apache.hadoop.fs.permission.AclEntryScope.ACCESS;
|
||||
|
@ -1297,7 +1298,7 @@ public class ITestAzureBlobFilesystemAcl extends AbstractAbfsIntegrationTest {
|
|||
final Path filePath = new Path(methodName.getMethodName());
|
||||
fs.create(filePath);
|
||||
|
||||
assertTrue(fs.exists(filePath));
|
||||
assertPathExists(fs, "This path should exist", filePath);
|
||||
|
||||
TracingHeaderValidator tracingHeaderValidator = new TracingHeaderValidator(
|
||||
conf.getClientCorrelationId(), fs.getFileSystemId(),
|
||||
|
@ -1320,7 +1321,7 @@ public class ITestAzureBlobFilesystemAcl extends AbstractAbfsIntegrationTest {
|
|||
final Path filePath = new Path(methodName.getMethodName());
|
||||
fs.create(filePath);
|
||||
|
||||
assertTrue(fs.exists(filePath));
|
||||
assertPathExists(fs, "This path should exist", filePath);
|
||||
FsPermission oldPermission = fs.getFileStatus(filePath).getPermission();
|
||||
// default permission for non-namespace enabled account is "777"
|
||||
FsPermission newPermission = new FsPermission("557");
|
||||
|
|
|
@ -33,6 +33,7 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Random;
|
||||
import java.util.UUID;
|
||||
|
||||
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
|
||||
import org.apache.hadoop.fs.contract.ContractTestUtils;
|
||||
|
@ -107,7 +108,7 @@ public class ITestCustomerProvidedKey extends AbstractAbfsIntegrationTest {
|
|||
@Test
|
||||
public void testReadWithCPK() throws Exception {
|
||||
final AzureBlobFileSystem fs = getAbfs(true);
|
||||
String fileName = "/" + methodName.getMethodName();
|
||||
String fileName = path("/" + methodName.getMethodName()).toString();
|
||||
createFileAndGetContent(fs, fileName, FILE_SIZE);
|
||||
|
||||
AbfsClient abfsClient = fs.getAbfsClient();
|
||||
|
@ -157,7 +158,7 @@ public class ITestCustomerProvidedKey extends AbstractAbfsIntegrationTest {
|
|||
@Test
|
||||
public void testReadWithoutCPK() throws Exception {
|
||||
final AzureBlobFileSystem fs = getAbfs(false);
|
||||
String fileName = "/" + methodName.getMethodName();
|
||||
String fileName = path("/" + methodName.getMethodName()).toString();
|
||||
createFileAndGetContent(fs, fileName, FILE_SIZE);
|
||||
|
||||
AbfsClient abfsClient = fs.getAbfsClient();
|
||||
|
@ -196,7 +197,7 @@ public class ITestCustomerProvidedKey extends AbstractAbfsIntegrationTest {
|
|||
@Test
|
||||
public void testAppendWithCPK() throws Exception {
|
||||
final AzureBlobFileSystem fs = getAbfs(true);
|
||||
final String fileName = "/" + methodName.getMethodName();
|
||||
final String fileName = path("/" + methodName.getMethodName()).toString();
|
||||
createFileAndGetContent(fs, fileName, FILE_SIZE);
|
||||
|
||||
// Trying to append with correct CPK headers
|
||||
|
@ -241,7 +242,7 @@ public class ITestCustomerProvidedKey extends AbstractAbfsIntegrationTest {
|
|||
@Test
|
||||
public void testAppendWithoutCPK() throws Exception {
|
||||
final AzureBlobFileSystem fs = getAbfs(false);
|
||||
final String fileName = "/" + methodName.getMethodName();
|
||||
final String fileName = path("/" + methodName.getMethodName()).toString();
|
||||
createFileAndGetContent(fs, fileName, FILE_SIZE);
|
||||
|
||||
// Trying to append without CPK headers
|
||||
|
@ -277,7 +278,7 @@ public class ITestCustomerProvidedKey extends AbstractAbfsIntegrationTest {
|
|||
@Test
|
||||
public void testSetGetXAttr() throws Exception {
|
||||
final AzureBlobFileSystem fs = getAbfs(true);
|
||||
String fileName = methodName.getMethodName();
|
||||
final String fileName = path(methodName.getMethodName()).toString();
|
||||
createFileAndGetContent(fs, fileName, FILE_SIZE);
|
||||
|
||||
String valSent = "testValue";
|
||||
|
@ -325,7 +326,8 @@ public class ITestCustomerProvidedKey extends AbstractAbfsIntegrationTest {
|
|||
AzureBlobFileSystem fs1 = getAbfs(true);
|
||||
int fileSize = FILE_SIZE_FOR_COPY_BETWEEN_ACCOUNTS;
|
||||
byte[] fileContent = getRandomBytesArray(fileSize);
|
||||
Path testFilePath = createFileWithContent(fs1, "fs1-file.txt", fileContent);
|
||||
Path testFilePath = createFileWithContent(fs1,
|
||||
String.format("fs1-file%s.txt", UUID.randomUUID()), fileContent);
|
||||
|
||||
// Create fs2 with different CPK
|
||||
Configuration conf = new Configuration();
|
||||
|
@ -340,7 +342,8 @@ public class ITestCustomerProvidedKey extends AbstractAbfsIntegrationTest {
|
|||
AzureBlobFileSystem fs2 = (AzureBlobFileSystem) FileSystem.newInstance(conf);
|
||||
|
||||
// Read from fs1 and write to fs2, fs1 and fs2 are having different CPK
|
||||
Path fs2DestFilePath = new Path("fs2-dest-file.txt");
|
||||
Path fs2DestFilePath = new Path(
|
||||
String.format("fs2-dest-file%s.txt", UUID.randomUUID()));
|
||||
FSDataOutputStream ops = fs2.create(fs2DestFilePath);
|
||||
try (FSDataInputStream iStream = fs1.open(testFilePath)) {
|
||||
long totalBytesRead = 0;
|
||||
|
@ -408,8 +411,8 @@ public class ITestCustomerProvidedKey extends AbstractAbfsIntegrationTest {
|
|||
|
||||
private void testListPath(final boolean isWithCPK) throws Exception {
|
||||
final AzureBlobFileSystem fs = getAbfs(isWithCPK);
|
||||
String testDirName = "/" + methodName.getMethodName();
|
||||
final Path testPath = new Path(testDirName);
|
||||
final Path testPath = path("/" + methodName.getMethodName());
|
||||
String testDirName = testPath.toString();
|
||||
fs.mkdirs(testPath);
|
||||
createFileAndGetContent(fs, testDirName + "/aaa", FILE_SIZE);
|
||||
createFileAndGetContent(fs, testDirName + "/bbb", FILE_SIZE);
|
||||
|
@ -468,7 +471,8 @@ public class ITestCustomerProvidedKey extends AbstractAbfsIntegrationTest {
|
|||
|
||||
private void testCreatePath(final boolean isWithCPK) throws Exception {
|
||||
final AzureBlobFileSystem fs = getAbfs(isWithCPK);
|
||||
final String testFileName = "/" + methodName.getMethodName();
|
||||
final String testFileName = path("/" + methodName.getMethodName())
|
||||
.toString();
|
||||
createFileAndGetContent(fs, testFileName, FILE_SIZE);
|
||||
|
||||
AbfsClient abfsClient = fs.getAbfsClient();
|
||||
|
@ -511,7 +515,8 @@ public class ITestCustomerProvidedKey extends AbstractAbfsIntegrationTest {
|
|||
|
||||
private void testRenamePath(final boolean isWithCPK) throws Exception {
|
||||
final AzureBlobFileSystem fs = getAbfs(isWithCPK);
|
||||
final String testFileName = "/" + methodName.getMethodName();
|
||||
final String testFileName = path("/" + methodName.getMethodName())
|
||||
.toString();
|
||||
createFileAndGetContent(fs, testFileName, FILE_SIZE);
|
||||
|
||||
FileStatus fileStatusBeforeRename = fs
|
||||
|
@ -546,15 +551,17 @@ public class ITestCustomerProvidedKey extends AbstractAbfsIntegrationTest {
|
|||
|
||||
private void testFlush(final boolean isWithCPK) throws Exception {
|
||||
final AzureBlobFileSystem fs = getAbfs(isWithCPK);
|
||||
final String testFileName = "/" + methodName.getMethodName();
|
||||
fs.create(new Path(testFileName));
|
||||
final String testFileName = path("/" + methodName.getMethodName())
|
||||
.toString();
|
||||
fs.create(new Path(testFileName)).close();
|
||||
AbfsClient abfsClient = fs.getAbfsClient();
|
||||
String expectedCPKSha = getCPKSha(fs);
|
||||
|
||||
byte[] fileContent = getRandomBytesArray(FILE_SIZE);
|
||||
Path testFilePath = new Path(testFileName + "1");
|
||||
FSDataOutputStream oStream = fs.create(testFilePath);
|
||||
try (FSDataOutputStream oStream = fs.create(testFilePath)) {
|
||||
oStream.write(fileContent);
|
||||
}
|
||||
|
||||
// Trying to read with different CPK headers
|
||||
Configuration conf = fs.getConf();
|
||||
|
@ -605,7 +612,8 @@ public class ITestCustomerProvidedKey extends AbstractAbfsIntegrationTest {
|
|||
|
||||
private void testSetPathProperties(final boolean isWithCPK) throws Exception {
|
||||
final AzureBlobFileSystem fs = getAbfs(isWithCPK);
|
||||
final String testFileName = "/" + methodName.getMethodName();
|
||||
final String testFileName = path("/" + methodName.getMethodName())
|
||||
.toString();
|
||||
createFileAndGetContent(fs, testFileName, FILE_SIZE);
|
||||
|
||||
AbfsClient abfsClient = fs.getAbfsClient();
|
||||
|
@ -635,7 +643,8 @@ public class ITestCustomerProvidedKey extends AbstractAbfsIntegrationTest {
|
|||
|
||||
private void testGetPathStatusFile(final boolean isWithCPK) throws Exception {
|
||||
final AzureBlobFileSystem fs = getAbfs(isWithCPK);
|
||||
final String testFileName = "/" + methodName.getMethodName();
|
||||
final String testFileName = path("/" + methodName.getMethodName())
|
||||
.toString();
|
||||
createFileAndGetContent(fs, testFileName, FILE_SIZE);
|
||||
|
||||
AbfsClient abfsClient = fs.getAbfsClient();
|
||||
|
@ -672,7 +681,8 @@ public class ITestCustomerProvidedKey extends AbstractAbfsIntegrationTest {
|
|||
|
||||
private void testDeletePath(final boolean isWithCPK) throws Exception {
|
||||
final AzureBlobFileSystem fs = getAbfs(isWithCPK);
|
||||
final String testFileName = "/" + methodName.getMethodName();
|
||||
final String testFileName = path("/" + methodName.getMethodName())
|
||||
.toString();
|
||||
createFileAndGetContent(fs, testFileName, FILE_SIZE);
|
||||
|
||||
FileStatus[] listStatuses = fs.listStatus(new Path(testFileName));
|
||||
|
@ -702,7 +712,8 @@ public class ITestCustomerProvidedKey extends AbstractAbfsIntegrationTest {
|
|||
|
||||
private void testSetPermission(final boolean isWithCPK) throws Exception {
|
||||
final AzureBlobFileSystem fs = getAbfs(isWithCPK);
|
||||
final String testFileName = "/" + methodName.getMethodName();
|
||||
final String testFileName = path("/" + methodName.getMethodName())
|
||||
.toString();
|
||||
Assume.assumeTrue(fs.getIsNamespaceEnabled(getTestTracingContext(fs, false)));
|
||||
createFileAndGetContent(fs, testFileName, FILE_SIZE);
|
||||
AbfsClient abfsClient = fs.getAbfsClient();
|
||||
|
@ -727,7 +738,8 @@ public class ITestCustomerProvidedKey extends AbstractAbfsIntegrationTest {
|
|||
|
||||
private void testSetAcl(final boolean isWithCPK) throws Exception {
|
||||
final AzureBlobFileSystem fs = getAbfs(isWithCPK);
|
||||
final String testFileName = "/" + methodName.getMethodName();
|
||||
final String testFileName = path("/" + methodName.getMethodName())
|
||||
.toString();
|
||||
TracingContext tracingContext = getTestTracingContext(fs, false);
|
||||
Assume.assumeTrue(fs.getIsNamespaceEnabled(tracingContext));
|
||||
createFileAndGetContent(fs, testFileName, FILE_SIZE);
|
||||
|
@ -756,7 +768,8 @@ public class ITestCustomerProvidedKey extends AbstractAbfsIntegrationTest {
|
|||
|
||||
private void testGetAcl(final boolean isWithCPK) throws Exception {
|
||||
final AzureBlobFileSystem fs = getAbfs(isWithCPK);
|
||||
final String testFileName = "/" + methodName.getMethodName();
|
||||
final String testFileName = path("/" + methodName.getMethodName())
|
||||
.toString();
|
||||
TracingContext tracingContext = getTestTracingContext(fs, false);
|
||||
Assume.assumeTrue(fs.getIsNamespaceEnabled(tracingContext));
|
||||
createFileAndGetContent(fs, testFileName, FILE_SIZE);
|
||||
|
@ -786,8 +799,9 @@ public class ITestCustomerProvidedKey extends AbstractAbfsIntegrationTest {
|
|||
getAuthType() == AuthType.OAuth);
|
||||
|
||||
final AzureBlobFileSystem fs = getAbfs(isWithCPK);
|
||||
final String testFileName = "/" + methodName.getMethodName();
|
||||
fs.create(new Path(testFileName));
|
||||
final String testFileName = path("/" + methodName.getMethodName())
|
||||
.toString();
|
||||
fs.create(new Path(testFileName)).close();
|
||||
AbfsClient abfsClient = fs.getAbfsClient();
|
||||
AbfsRestOperation abfsRestOperation = abfsClient
|
||||
.checkAccess(testFileName, "rwx", getTestTracingContext(fs, false));
|
||||
|
|
|
@ -33,16 +33,22 @@ import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
|
|||
*/
|
||||
public class ITestFileSystemProperties extends AbstractAbfsIntegrationTest {
|
||||
private static final int TEST_DATA = 100;
|
||||
private static final Path TEST_PATH = new Path("/testfile");
|
||||
private static final String TEST_PATH = "/testfile";
|
||||
public ITestFileSystemProperties() throws Exception {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReadWriteBytesToFileAndEnsureThreadPoolCleanup() throws Exception {
|
||||
final AzureBlobFileSystem fs = getFileSystem();
|
||||
testWriteOneByteToFileAndEnsureThreadPoolCleanup();
|
||||
Path testPath = path(TEST_PATH);
|
||||
try(FSDataOutputStream stream = fs.create(testPath)) {
|
||||
stream.write(TEST_DATA);
|
||||
}
|
||||
|
||||
try(FSDataInputStream inputStream = fs.open(TEST_PATH, 4 * 1024 * 1024)) {
|
||||
FileStatus fileStatus = fs.getFileStatus(testPath);
|
||||
assertEquals(1, fileStatus.getLen());
|
||||
|
||||
try(FSDataInputStream inputStream = fs.open(testPath, 4 * 1024 * 1024)) {
|
||||
int i = inputStream.read();
|
||||
assertEquals(TEST_DATA, i);
|
||||
}
|
||||
|
@ -51,11 +57,12 @@ public class ITestFileSystemProperties extends AbstractAbfsIntegrationTest {
|
|||
@Test
|
||||
public void testWriteOneByteToFileAndEnsureThreadPoolCleanup() throws Exception {
|
||||
final AzureBlobFileSystem fs = getFileSystem();
|
||||
try(FSDataOutputStream stream = fs.create(TEST_PATH)) {
|
||||
Path testPath = path(TEST_PATH);
|
||||
try(FSDataOutputStream stream = fs.create(testPath)) {
|
||||
stream.write(TEST_DATA);
|
||||
}
|
||||
|
||||
FileStatus fileStatus = fs.getFileStatus(TEST_PATH);
|
||||
FileStatus fileStatus = fs.getFileStatus(testPath);
|
||||
assertEquals(1, fileStatus.getLen());
|
||||
}
|
||||
|
||||
|
@ -78,11 +85,12 @@ public class ITestFileSystemProperties extends AbstractAbfsIntegrationTest {
|
|||
final AzureBlobFileSystem fs = getFileSystem();
|
||||
final Hashtable<String, String> properties = new Hashtable<>();
|
||||
properties.put("key", "{ value: valueTest }");
|
||||
touch(TEST_PATH);
|
||||
Path testPath = path(TEST_PATH);
|
||||
touch(testPath);
|
||||
TracingContext tracingContext = getTestTracingContext(fs, true);
|
||||
fs.getAbfsStore().setPathProperties(TEST_PATH, properties, tracingContext);
|
||||
fs.getAbfsStore().setPathProperties(testPath, properties, tracingContext);
|
||||
Hashtable<String, String> fetchedProperties = fs.getAbfsStore()
|
||||
.getPathStatus(TEST_PATH, tracingContext);
|
||||
.getPathStatus(testPath, tracingContext);
|
||||
|
||||
assertEquals(properties, fetchedProperties);
|
||||
}
|
||||
|
@ -105,11 +113,12 @@ public class ITestFileSystemProperties extends AbstractAbfsIntegrationTest {
|
|||
final AzureBlobFileSystem fs = getFileSystem();
|
||||
final Hashtable<String, String> properties = new Hashtable<>();
|
||||
properties.put("key", "{ value: valueTest兩 }");
|
||||
touch(TEST_PATH);
|
||||
Path testPath = path(TEST_PATH);
|
||||
touch(testPath);
|
||||
TracingContext tracingContext = getTestTracingContext(fs, true);
|
||||
fs.getAbfsStore().setPathProperties(TEST_PATH, properties, tracingContext);
|
||||
fs.getAbfsStore().setPathProperties(testPath, properties, tracingContext);
|
||||
Hashtable<String, String> fetchedProperties = fs.getAbfsStore()
|
||||
.getPathStatus(TEST_PATH, tracingContext);
|
||||
.getPathStatus(testPath, tracingContext);
|
||||
|
||||
assertEquals(properties, fetchedProperties);
|
||||
}
|
||||
|
|
|
@ -62,7 +62,8 @@ public class ITestWasbAbfsCompatibility extends AbstractAbfsIntegrationTest {
|
|||
|
||||
NativeAzureFileSystem wasb = getWasbFileSystem();
|
||||
|
||||
Path path1 = new Path("/testfiles/~12/!008/3/abFsTestfile");
|
||||
Path testFiles = path("/testfiles");
|
||||
Path path1 = new Path(testFiles + "/~12/!008/3/abFsTestfile");
|
||||
try(FSDataOutputStream abfsStream = fs.create(path1, true)) {
|
||||
abfsStream.write(ABFS_TEST_CONTEXT.getBytes());
|
||||
abfsStream.flush();
|
||||
|
@ -70,7 +71,7 @@ public class ITestWasbAbfsCompatibility extends AbstractAbfsIntegrationTest {
|
|||
}
|
||||
|
||||
// create file using wasb
|
||||
Path path2 = new Path("/testfiles/~12/!008/3/nativeFsTestfile");
|
||||
Path path2 = new Path(testFiles + "/~12/!008/3/nativeFsTestfile");
|
||||
LOG.info("{}", wasb.getUri());
|
||||
try(FSDataOutputStream nativeFsStream = wasb.create(path2, true)) {
|
||||
nativeFsStream.write(WASB_TEST_CONTEXT.getBytes());
|
||||
|
@ -78,8 +79,8 @@ public class ITestWasbAbfsCompatibility extends AbstractAbfsIntegrationTest {
|
|||
nativeFsStream.hsync();
|
||||
}
|
||||
// list file using abfs and wasb
|
||||
FileStatus[] abfsFileStatus = fs.listStatus(new Path("/testfiles/~12/!008/3/"));
|
||||
FileStatus[] nativeFsFileStatus = wasb.listStatus(new Path("/testfiles/~12/!008/3/"));
|
||||
FileStatus[] abfsFileStatus = fs.listStatus(new Path(testFiles + "/~12/!008/3/"));
|
||||
FileStatus[] nativeFsFileStatus = wasb.listStatus(new Path(testFiles + "/~12/!008/3/"));
|
||||
|
||||
assertEquals(2, abfsFileStatus.length);
|
||||
assertEquals(2, nativeFsFileStatus.length);
|
||||
|
@ -97,8 +98,9 @@ public class ITestWasbAbfsCompatibility extends AbstractAbfsIntegrationTest {
|
|||
|
||||
NativeAzureFileSystem wasb = getWasbFileSystem();
|
||||
|
||||
Path testFile = path("/testReadFile");
|
||||
for (int i = 0; i< 4; i++) {
|
||||
Path path = new Path("/testReadFile/~12/!008/testfile" + i);
|
||||
Path path = new Path(testFile + "/~12/!008/testfile" + i);
|
||||
final FileSystem createFs = createFileWithAbfs[i] ? abfs : wasb;
|
||||
|
||||
// Write
|
||||
|
@ -137,8 +139,9 @@ public class ITestWasbAbfsCompatibility extends AbstractAbfsIntegrationTest {
|
|||
|
||||
NativeAzureFileSystem wasb = getWasbFileSystem();
|
||||
|
||||
Path testDir = path("/testDir");
|
||||
for (int i = 0; i < 4; i++) {
|
||||
Path path = new Path("/testDir/t" + i);
|
||||
Path path = new Path(testDir + "/t" + i);
|
||||
//create
|
||||
final FileSystem createFs = createDirWithAbfs[i] ? abfs : wasb;
|
||||
assertTrue(createFs.mkdirs(path));
|
||||
|
@ -172,11 +175,12 @@ public class ITestWasbAbfsCompatibility extends AbstractAbfsIntegrationTest {
|
|||
|
||||
NativeAzureFileSystem wasb = getWasbFileSystem();
|
||||
|
||||
Path d1d4 = new Path("/d1/d2/d3/d4");
|
||||
Path d1 = path("/d1");
|
||||
Path d1d4 = new Path(d1 + "/d2/d3/d4");
|
||||
assertMkdirs(abfs, d1d4);
|
||||
|
||||
//set working directory to path1
|
||||
Path path1 = new Path("/d1/d2");
|
||||
Path path1 = new Path(d1 + "/d2");
|
||||
wasb.setWorkingDirectory(path1);
|
||||
abfs.setWorkingDirectory(path1);
|
||||
assertEquals(path1, wasb.getWorkingDirectory());
|
||||
|
|
|
@ -23,7 +23,6 @@ import org.junit.Test;
|
|||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
|
||||
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
|
||||
import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys;
|
||||
|
@ -32,7 +31,7 @@ import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys;
|
|||
* Test create operation.
|
||||
*/
|
||||
public class ITestAbfsOutputStream extends AbstractAbfsIntegrationTest {
|
||||
private static final Path TEST_FILE_PATH = new Path("testfile");
|
||||
private static final String TEST_FILE_PATH = "testfile";
|
||||
|
||||
public ITestAbfsOutputStream() throws Exception {
|
||||
super();
|
||||
|
@ -42,7 +41,7 @@ public class ITestAbfsOutputStream extends AbstractAbfsIntegrationTest {
|
|||
public void testMaxRequestsAndQueueCapacityDefaults() throws Exception {
|
||||
Configuration conf = getRawConfiguration();
|
||||
final AzureBlobFileSystem fs = getFileSystem(conf);
|
||||
try (FSDataOutputStream out = fs.create(TEST_FILE_PATH)) {
|
||||
try (FSDataOutputStream out = fs.create(path(TEST_FILE_PATH))) {
|
||||
AbfsOutputStream stream = (AbfsOutputStream) out.getWrappedStream();
|
||||
|
||||
int maxConcurrentRequests
|
||||
|
@ -71,19 +70,18 @@ public class ITestAbfsOutputStream extends AbstractAbfsIntegrationTest {
|
|||
conf.set(ConfigurationKeys.AZURE_WRITE_MAX_REQUESTS_TO_QUEUE,
|
||||
"" + maxRequestsToQueue);
|
||||
final AzureBlobFileSystem fs = getFileSystem(conf);
|
||||
FSDataOutputStream out = fs.create(TEST_FILE_PATH);
|
||||
try (FSDataOutputStream out = fs.create(path(TEST_FILE_PATH))) {
|
||||
AbfsOutputStream stream = (AbfsOutputStream) out.getWrappedStream();
|
||||
|
||||
if (stream.isAppendBlobStream()) {
|
||||
maxConcurrentRequests = 1;
|
||||
}
|
||||
|
||||
Assertions.assertThat(stream.getMaxConcurrentRequestCount())
|
||||
.describedAs("maxConcurrentRequests should be " + maxConcurrentRequests)
|
||||
.isEqualTo(maxConcurrentRequests);
|
||||
Assertions.assertThat(stream.getMaxRequestsThatCanBeQueued())
|
||||
.describedAs("maxRequestsToQueue should be " + maxRequestsToQueue)
|
||||
Assertions.assertThat(stream.getMaxConcurrentRequestCount()).describedAs(
|
||||
"maxConcurrentRequests should be " + maxConcurrentRequests).isEqualTo(maxConcurrentRequests);
|
||||
Assertions.assertThat(stream.getMaxRequestsThatCanBeQueued()).describedAs("maxRequestsToQueue should be " + maxRequestsToQueue)
|
||||
.isEqualTo(maxRequestsToQueue);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -583,13 +583,14 @@ public class TestAbfsInputStream extends
|
|||
Configuration config = getRawConfiguration();
|
||||
config.unset(FS_AZURE_READ_AHEAD_QUEUE_DEPTH);
|
||||
AzureBlobFileSystem fs = getFileSystem(config);
|
||||
Path testFile = new Path("/testFile");
|
||||
fs.create(testFile);
|
||||
Path testFile = path("/testFile");
|
||||
fs.create(testFile).close();
|
||||
FSDataInputStream in = fs.open(testFile);
|
||||
Assertions.assertThat(
|
||||
((AbfsInputStream) in.getWrappedStream()).getReadAheadQueueDepth())
|
||||
.describedAs("readahead queue depth should be set to default value 2")
|
||||
.isEqualTo(2);
|
||||
in.close();
|
||||
}
|
||||
|
||||
|
||||
|
@ -646,8 +647,7 @@ public class TestAbfsInputStream extends
|
|||
readAheadBlockSize = readRequestSize;
|
||||
}
|
||||
|
||||
Path testPath = new Path(
|
||||
"/testReadAheadConfigs");
|
||||
Path testPath = path("/testReadAheadConfigs");
|
||||
final AzureBlobFileSystem fs = createTestFile(testPath,
|
||||
ALWAYS_READ_BUFFER_SIZE_TEST_FILE_SIZE, config);
|
||||
byte[] byteBuffer = new byte[ONE_MB];
|
||||
|
|
Loading…
Reference in New Issue