HADOOP-17765. ABFS: Use Unique File Paths in Tests. (#3153)
Contributed by Sumangala Patki
This commit is contained in:
parent
aecfcf165f
commit
10ba4cc892
|
@ -26,11 +26,11 @@ import java.util.UUID;
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.Callable;
|
||||||
|
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Assert;
|
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
@ -84,6 +84,7 @@ public abstract class AbstractAbfsIntegrationTest extends
|
||||||
private AuthType authType;
|
private AuthType authType;
|
||||||
private boolean useConfiguredFileSystem = false;
|
private boolean useConfiguredFileSystem = false;
|
||||||
private boolean usingFilesystemForSASTests = false;
|
private boolean usingFilesystemForSASTests = false;
|
||||||
|
private static final int SHORTENED_GUID_LEN = 12;
|
||||||
|
|
||||||
protected AbstractAbfsIntegrationTest() throws Exception {
|
protected AbstractAbfsIntegrationTest() throws Exception {
|
||||||
fileSystemName = TEST_CONTAINER_PREFIX + UUID.randomUUID().toString();
|
fileSystemName = TEST_CONTAINER_PREFIX + UUID.randomUUID().toString();
|
||||||
|
@ -270,7 +271,8 @@ public abstract class AbstractAbfsIntegrationTest extends
|
||||||
// so first create temporary instance of the filesystem using SharedKey
|
// so first create temporary instance of the filesystem using SharedKey
|
||||||
// then re-use the filesystem it creates with SAS auth instead of SharedKey.
|
// then re-use the filesystem it creates with SAS auth instead of SharedKey.
|
||||||
AzureBlobFileSystem tempFs = (AzureBlobFileSystem) FileSystem.newInstance(rawConfig);
|
AzureBlobFileSystem tempFs = (AzureBlobFileSystem) FileSystem.newInstance(rawConfig);
|
||||||
Assert.assertTrue(tempFs.exists(new Path("/")));
|
ContractTestUtils.assertPathExists(tempFs, "This path should exist",
|
||||||
|
new Path("/"));
|
||||||
abfsConfig.set(FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME, AuthType.SAS.name());
|
abfsConfig.set(FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME, AuthType.SAS.name());
|
||||||
usingFilesystemForSASTests = true;
|
usingFilesystemForSASTests = true;
|
||||||
}
|
}
|
||||||
|
@ -440,7 +442,20 @@ public abstract class AbstractAbfsIntegrationTest extends
|
||||||
*/
|
*/
|
||||||
protected Path path(String filepath) throws IOException {
|
protected Path path(String filepath) throws IOException {
|
||||||
return getFileSystem().makeQualified(
|
return getFileSystem().makeQualified(
|
||||||
new Path(getTestPath(), filepath));
|
new Path(getTestPath(), getUniquePath(filepath)));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Generate a unique path using the given filepath.
|
||||||
|
* @param filepath path string
|
||||||
|
* @return unique path created from filepath and a GUID
|
||||||
|
*/
|
||||||
|
protected Path getUniquePath(String filepath) {
|
||||||
|
if (filepath.equals("/")) {
|
||||||
|
return new Path(filepath);
|
||||||
|
}
|
||||||
|
return new Path(filepath + StringUtils
|
||||||
|
.right(UUID.randomUUID().toString(), SHORTENED_GUID_LEN));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -93,7 +93,7 @@ public final class ITestAbfsClient extends AbstractAbfsIntegrationTest {
|
||||||
public void testListPathWithValidListMaxResultsValues()
|
public void testListPathWithValidListMaxResultsValues()
|
||||||
throws IOException, ExecutionException, InterruptedException {
|
throws IOException, ExecutionException, InterruptedException {
|
||||||
final int fileCount = 10;
|
final int fileCount = 10;
|
||||||
final String directory = "testWithValidListMaxResultsValues";
|
final Path directory = getUniquePath("testWithValidListMaxResultsValues");
|
||||||
createDirectoryWithNFiles(directory, fileCount);
|
createDirectoryWithNFiles(directory, fileCount);
|
||||||
final int[] testData = {fileCount + 100, fileCount + 1, fileCount,
|
final int[] testData = {fileCount + 100, fileCount + 1, fileCount,
|
||||||
fileCount - 1, 1};
|
fileCount - 1, 1};
|
||||||
|
@ -102,7 +102,7 @@ public final class ITestAbfsClient extends AbstractAbfsIntegrationTest {
|
||||||
setListMaxResults(listMaxResults);
|
setListMaxResults(listMaxResults);
|
||||||
int expectedListResultsSize =
|
int expectedListResultsSize =
|
||||||
listMaxResults > fileCount ? fileCount : listMaxResults;
|
listMaxResults > fileCount ? fileCount : listMaxResults;
|
||||||
Assertions.assertThat(listPath(directory)).describedAs(
|
Assertions.assertThat(listPath(directory.toString())).describedAs(
|
||||||
"AbfsClient.listPath result should contain %d items when "
|
"AbfsClient.listPath result should contain %d items when "
|
||||||
+ "listMaxResults is %d and directory contains %d items",
|
+ "listMaxResults is %d and directory contains %d items",
|
||||||
expectedListResultsSize, listMaxResults, fileCount)
|
expectedListResultsSize, listMaxResults, fileCount)
|
||||||
|
@ -114,9 +114,10 @@ public final class ITestAbfsClient extends AbstractAbfsIntegrationTest {
|
||||||
public void testListPathWithValueGreaterThanServerMaximum()
|
public void testListPathWithValueGreaterThanServerMaximum()
|
||||||
throws IOException, ExecutionException, InterruptedException {
|
throws IOException, ExecutionException, InterruptedException {
|
||||||
setListMaxResults(LIST_MAX_RESULTS_SERVER + 100);
|
setListMaxResults(LIST_MAX_RESULTS_SERVER + 100);
|
||||||
final String directory = "testWithValueGreaterThanServerMaximum";
|
final Path directory = getUniquePath(
|
||||||
|
"testWithValueGreaterThanServerMaximum");
|
||||||
createDirectoryWithNFiles(directory, LIST_MAX_RESULTS_SERVER + 200);
|
createDirectoryWithNFiles(directory, LIST_MAX_RESULTS_SERVER + 200);
|
||||||
Assertions.assertThat(listPath(directory)).describedAs(
|
Assertions.assertThat(listPath(directory.toString())).describedAs(
|
||||||
"AbfsClient.listPath result will contain a maximum of %d items "
|
"AbfsClient.listPath result will contain a maximum of %d items "
|
||||||
+ "even if listMaxResults >= %d or directory "
|
+ "even if listMaxResults >= %d or directory "
|
||||||
+ "contains more than %d items", LIST_MAX_RESULTS_SERVER,
|
+ "contains more than %d items", LIST_MAX_RESULTS_SERVER,
|
||||||
|
@ -152,7 +153,7 @@ public final class ITestAbfsClient extends AbstractAbfsIntegrationTest {
|
||||||
.setListMaxResults(listMaxResults);
|
.setListMaxResults(listMaxResults);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void createDirectoryWithNFiles(String directory, int n)
|
private void createDirectoryWithNFiles(Path directory, int n)
|
||||||
throws ExecutionException, InterruptedException {
|
throws ExecutionException, InterruptedException {
|
||||||
final List<Future<Void>> tasks = new ArrayList<>();
|
final List<Future<Void>> tasks = new ArrayList<>();
|
||||||
ExecutorService es = Executors.newFixedThreadPool(10);
|
ExecutorService es = Executors.newFixedThreadPool(10);
|
||||||
|
|
|
@ -237,8 +237,8 @@ public class ITestAbfsListStatusRemoteIterator extends AbstractAbfsIntegrationTe
|
||||||
@Test
|
@Test
|
||||||
public void testHasNextForFile() throws Exception {
|
public void testHasNextForFile() throws Exception {
|
||||||
final AzureBlobFileSystem fs = getFileSystem();
|
final AzureBlobFileSystem fs = getFileSystem();
|
||||||
String testFileName = "testFile";
|
Path testFile = path("testFile");
|
||||||
Path testFile = new Path(testFileName);
|
String testFileName = testFile.toString();
|
||||||
getFileSystem().create(testFile);
|
getFileSystem().create(testFile);
|
||||||
setPageSize(10);
|
setPageSize(10);
|
||||||
RemoteIterator<FileStatus> fsItr = fs.listStatusIterator(testFile);
|
RemoteIterator<FileStatus> fsItr = fs.listStatusIterator(testFile);
|
||||||
|
@ -304,7 +304,7 @@ public class ITestAbfsListStatusRemoteIterator extends AbstractAbfsIntegrationTe
|
||||||
|
|
||||||
private Path createTestDirectory() throws IOException {
|
private Path createTestDirectory() throws IOException {
|
||||||
String testDirectoryName = "testDirectory" + System.currentTimeMillis();
|
String testDirectoryName = "testDirectory" + System.currentTimeMillis();
|
||||||
Path testDirectory = new Path(testDirectoryName);
|
Path testDirectory = path(testDirectoryName);
|
||||||
getFileSystem().mkdirs(testDirectory);
|
getFileSystem().mkdirs(testDirectory);
|
||||||
return testDirectory;
|
return testDirectory;
|
||||||
}
|
}
|
||||||
|
|
|
@ -45,7 +45,7 @@ import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.M
|
||||||
*/
|
*/
|
||||||
@RunWith(Parameterized.class)
|
@RunWith(Parameterized.class)
|
||||||
public class ITestAbfsReadWriteAndSeek extends AbstractAbfsScaleTest {
|
public class ITestAbfsReadWriteAndSeek extends AbstractAbfsScaleTest {
|
||||||
private static final Path TEST_PATH = new Path("/testfile");
|
private static final String TEST_PATH = "/testfile";
|
||||||
|
|
||||||
@Parameterized.Parameters(name = "Size={0}")
|
@Parameterized.Parameters(name = "Size={0}")
|
||||||
public static Iterable<Object[]> sizes() {
|
public static Iterable<Object[]> sizes() {
|
||||||
|
@ -75,13 +75,14 @@ public class ITestAbfsReadWriteAndSeek extends AbstractAbfsScaleTest {
|
||||||
final byte[] b = new byte[2 * bufferSize];
|
final byte[] b = new byte[2 * bufferSize];
|
||||||
new Random().nextBytes(b);
|
new Random().nextBytes(b);
|
||||||
|
|
||||||
try (FSDataOutputStream stream = fs.create(TEST_PATH)) {
|
Path testPath = path(TEST_PATH);
|
||||||
|
try (FSDataOutputStream stream = fs.create(testPath)) {
|
||||||
stream.write(b);
|
stream.write(b);
|
||||||
}
|
}
|
||||||
|
|
||||||
final byte[] readBuffer = new byte[2 * bufferSize];
|
final byte[] readBuffer = new byte[2 * bufferSize];
|
||||||
int result;
|
int result;
|
||||||
try (FSDataInputStream inputStream = fs.open(TEST_PATH)) {
|
try (FSDataInputStream inputStream = fs.open(testPath)) {
|
||||||
((AbfsInputStream) inputStream.getWrappedStream()).registerListener(
|
((AbfsInputStream) inputStream.getWrappedStream()).registerListener(
|
||||||
new TracingHeaderValidator(abfsConfiguration.getClientCorrelationId(),
|
new TracingHeaderValidator(abfsConfiguration.getClientCorrelationId(),
|
||||||
fs.getFileSystemId(), FSOperationType.READ, true, 0,
|
fs.getFileSystemId(), FSOperationType.READ, true, 0,
|
||||||
|
@ -112,7 +113,8 @@ public class ITestAbfsReadWriteAndSeek extends AbstractAbfsScaleTest {
|
||||||
|
|
||||||
final byte[] b = new byte[bufferSize * 10];
|
final byte[] b = new byte[bufferSize * 10];
|
||||||
new Random().nextBytes(b);
|
new Random().nextBytes(b);
|
||||||
try (FSDataOutputStream stream = fs.create(TEST_PATH)) {
|
Path testPath = path(TEST_PATH);
|
||||||
|
try (FSDataOutputStream stream = fs.create(testPath)) {
|
||||||
((AbfsOutputStream) stream.getWrappedStream()).registerListener(
|
((AbfsOutputStream) stream.getWrappedStream()).registerListener(
|
||||||
new TracingHeaderValidator(abfsConfiguration.getClientCorrelationId(),
|
new TracingHeaderValidator(abfsConfiguration.getClientCorrelationId(),
|
||||||
fs.getFileSystemId(), FSOperationType.WRITE, false, 0,
|
fs.getFileSystemId(), FSOperationType.WRITE, false, 0,
|
||||||
|
@ -126,7 +128,7 @@ public class ITestAbfsReadWriteAndSeek extends AbstractAbfsScaleTest {
|
||||||
fs.registerListener(
|
fs.registerListener(
|
||||||
new TracingHeaderValidator(abfsConfiguration.getClientCorrelationId(),
|
new TracingHeaderValidator(abfsConfiguration.getClientCorrelationId(),
|
||||||
fs.getFileSystemId(), FSOperationType.OPEN, false, 0));
|
fs.getFileSystemId(), FSOperationType.OPEN, false, 0));
|
||||||
try (FSDataInputStream inputStream = fs.open(TEST_PATH)) {
|
try (FSDataInputStream inputStream = fs.open(testPath)) {
|
||||||
((AbfsInputStream) inputStream.getWrappedStream()).registerListener(
|
((AbfsInputStream) inputStream.getWrappedStream()).registerListener(
|
||||||
new TracingHeaderValidator(abfsConfiguration.getClientCorrelationId(),
|
new TracingHeaderValidator(abfsConfiguration.getClientCorrelationId(),
|
||||||
fs.getFileSystemId(), FSOperationType.READ, false, 0,
|
fs.getFileSystemId(), FSOperationType.READ, false, 0,
|
||||||
|
|
|
@ -91,7 +91,7 @@ public class ITestAbfsStatistics extends AbstractAbfsIntegrationTest {
|
||||||
|
|
||||||
fs.mkdirs(createDirectoryPath);
|
fs.mkdirs(createDirectoryPath);
|
||||||
fs.createNonRecursive(createFilePath, FsPermission
|
fs.createNonRecursive(createFilePath, FsPermission
|
||||||
.getDefault(), false, 1024, (short) 1, 1024, null);
|
.getDefault(), false, 1024, (short) 1, 1024, null).close();
|
||||||
|
|
||||||
Map<String, Long> metricMap = fs.getInstrumentationMap();
|
Map<String, Long> metricMap = fs.getInstrumentationMap();
|
||||||
/*
|
/*
|
||||||
|
@ -117,7 +117,7 @@ public class ITestAbfsStatistics extends AbstractAbfsIntegrationTest {
|
||||||
fs.mkdirs(path(getMethodName() + "Dir" + i));
|
fs.mkdirs(path(getMethodName() + "Dir" + i));
|
||||||
fs.createNonRecursive(path(getMethodName() + i),
|
fs.createNonRecursive(path(getMethodName() + i),
|
||||||
FsPermission.getDefault(), false, 1024, (short) 1,
|
FsPermission.getDefault(), false, 1024, (short) 1,
|
||||||
1024, null);
|
1024, null).close();
|
||||||
}
|
}
|
||||||
|
|
||||||
metricMap = fs.getInstrumentationMap();
|
metricMap = fs.getInstrumentationMap();
|
||||||
|
@ -160,7 +160,7 @@ public class ITestAbfsStatistics extends AbstractAbfsIntegrationTest {
|
||||||
files_deleted counters.
|
files_deleted counters.
|
||||||
*/
|
*/
|
||||||
fs.mkdirs(createDirectoryPath);
|
fs.mkdirs(createDirectoryPath);
|
||||||
fs.create(path(createDirectoryPath + getMethodName()));
|
fs.create(path(createDirectoryPath + getMethodName())).close();
|
||||||
fs.delete(createDirectoryPath, true);
|
fs.delete(createDirectoryPath, true);
|
||||||
|
|
||||||
Map<String, Long> metricMap = fs.getInstrumentationMap();
|
Map<String, Long> metricMap = fs.getInstrumentationMap();
|
||||||
|
@ -179,7 +179,7 @@ public class ITestAbfsStatistics extends AbstractAbfsIntegrationTest {
|
||||||
directories_deleted is called or not.
|
directories_deleted is called or not.
|
||||||
*/
|
*/
|
||||||
fs.mkdirs(createDirectoryPath);
|
fs.mkdirs(createDirectoryPath);
|
||||||
fs.create(createFilePath);
|
fs.create(createFilePath).close();
|
||||||
fs.delete(createDirectoryPath, true);
|
fs.delete(createDirectoryPath, true);
|
||||||
metricMap = fs.getInstrumentationMap();
|
metricMap = fs.getInstrumentationMap();
|
||||||
|
|
||||||
|
@ -199,9 +199,9 @@ public class ITestAbfsStatistics extends AbstractAbfsIntegrationTest {
|
||||||
Path createFilePath = path(getMethodName());
|
Path createFilePath = path(getMethodName());
|
||||||
Path destCreateFilePath = path(getMethodName() + "New");
|
Path destCreateFilePath = path(getMethodName() + "New");
|
||||||
|
|
||||||
fs.create(createFilePath);
|
fs.create(createFilePath).close();
|
||||||
fs.open(createFilePath);
|
fs.open(createFilePath).close();
|
||||||
fs.append(createFilePath);
|
fs.append(createFilePath).close();
|
||||||
assertTrue(fs.rename(createFilePath, destCreateFilePath));
|
assertTrue(fs.rename(createFilePath, destCreateFilePath));
|
||||||
|
|
||||||
Map<String, Long> metricMap = fs.getInstrumentationMap();
|
Map<String, Long> metricMap = fs.getInstrumentationMap();
|
||||||
|
@ -225,11 +225,11 @@ public class ITestAbfsStatistics extends AbstractAbfsIntegrationTest {
|
||||||
//re-initialising Abfs to reset statistic values.
|
//re-initialising Abfs to reset statistic values.
|
||||||
fs.initialize(fs.getUri(), fs.getConf());
|
fs.initialize(fs.getUri(), fs.getConf());
|
||||||
|
|
||||||
fs.create(destCreateFilePath);
|
fs.create(destCreateFilePath).close();
|
||||||
|
|
||||||
for (int i = 0; i < NUMBER_OF_OPS; i++) {
|
for (int i = 0; i < NUMBER_OF_OPS; i++) {
|
||||||
fs.open(destCreateFilePath);
|
fs.open(destCreateFilePath);
|
||||||
fs.append(destCreateFilePath);
|
fs.append(destCreateFilePath).close();
|
||||||
}
|
}
|
||||||
|
|
||||||
metricMap = fs.getInstrumentationMap();
|
metricMap = fs.getInstrumentationMap();
|
||||||
|
|
|
@ -52,8 +52,8 @@ public class ITestAbfsStreamStatistics extends AbstractAbfsIntegrationTest {
|
||||||
+ "Abfs");
|
+ "Abfs");
|
||||||
|
|
||||||
final AzureBlobFileSystem fs = getFileSystem();
|
final AzureBlobFileSystem fs = getFileSystem();
|
||||||
Path smallOperationsFile = new Path("testOneReadWriteOps");
|
Path smallOperationsFile = path("testOneReadWriteOps");
|
||||||
Path largeOperationsFile = new Path("testLargeReadWriteOps");
|
Path largeOperationsFile = path("testLargeReadWriteOps");
|
||||||
FileSystem.Statistics statistics = fs.getFsStatistics();
|
FileSystem.Statistics statistics = fs.getFsStatistics();
|
||||||
String testReadWriteOps = "test this";
|
String testReadWriteOps = "test this";
|
||||||
statistics.reset();
|
statistics.reset();
|
||||||
|
|
|
@ -35,8 +35,8 @@ import org.apache.hadoop.fs.contract.ContractTestUtils;
|
||||||
*/
|
*/
|
||||||
public class ITestAzureBlobFileSystemAppend extends
|
public class ITestAzureBlobFileSystemAppend extends
|
||||||
AbstractAbfsIntegrationTest {
|
AbstractAbfsIntegrationTest {
|
||||||
private static final Path TEST_FILE_PATH = new Path("testfile");
|
private static final String TEST_FILE_PATH = "testfile";
|
||||||
private static final Path TEST_FOLDER_PATH = new Path("testFolder");
|
private static final String TEST_FOLDER_PATH = "testFolder";
|
||||||
|
|
||||||
public ITestAzureBlobFileSystemAppend() throws Exception {
|
public ITestAzureBlobFileSystemAppend() throws Exception {
|
||||||
super();
|
super();
|
||||||
|
@ -45,15 +45,15 @@ public class ITestAzureBlobFileSystemAppend extends
|
||||||
@Test(expected = FileNotFoundException.class)
|
@Test(expected = FileNotFoundException.class)
|
||||||
public void testAppendDirShouldFail() throws Exception {
|
public void testAppendDirShouldFail() throws Exception {
|
||||||
final AzureBlobFileSystem fs = getFileSystem();
|
final AzureBlobFileSystem fs = getFileSystem();
|
||||||
final Path filePath = TEST_FILE_PATH;
|
final Path filePath = path(TEST_FILE_PATH);
|
||||||
fs.mkdirs(filePath);
|
fs.mkdirs(filePath);
|
||||||
fs.append(filePath, 0);
|
fs.append(filePath, 0).close();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testAppendWithLength0() throws Exception {
|
public void testAppendWithLength0() throws Exception {
|
||||||
final AzureBlobFileSystem fs = getFileSystem();
|
final AzureBlobFileSystem fs = getFileSystem();
|
||||||
try(FSDataOutputStream stream = fs.create(TEST_FILE_PATH)) {
|
try(FSDataOutputStream stream = fs.create(path(TEST_FILE_PATH))) {
|
||||||
final byte[] b = new byte[1024];
|
final byte[] b = new byte[1024];
|
||||||
new Random().nextBytes(b);
|
new Random().nextBytes(b);
|
||||||
stream.write(b, 1000, 0);
|
stream.write(b, 1000, 0);
|
||||||
|
@ -65,28 +65,29 @@ public class ITestAzureBlobFileSystemAppend extends
|
||||||
@Test(expected = FileNotFoundException.class)
|
@Test(expected = FileNotFoundException.class)
|
||||||
public void testAppendFileAfterDelete() throws Exception {
|
public void testAppendFileAfterDelete() throws Exception {
|
||||||
final AzureBlobFileSystem fs = getFileSystem();
|
final AzureBlobFileSystem fs = getFileSystem();
|
||||||
final Path filePath = TEST_FILE_PATH;
|
final Path filePath = path(TEST_FILE_PATH);
|
||||||
ContractTestUtils.touch(fs, filePath);
|
ContractTestUtils.touch(fs, filePath);
|
||||||
fs.delete(filePath, false);
|
fs.delete(filePath, false);
|
||||||
|
|
||||||
fs.append(filePath);
|
fs.append(filePath).close();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(expected = FileNotFoundException.class)
|
@Test(expected = FileNotFoundException.class)
|
||||||
public void testAppendDirectory() throws Exception {
|
public void testAppendDirectory() throws Exception {
|
||||||
final AzureBlobFileSystem fs = getFileSystem();
|
final AzureBlobFileSystem fs = getFileSystem();
|
||||||
final Path folderPath = TEST_FOLDER_PATH;
|
final Path folderPath = path(TEST_FOLDER_PATH);
|
||||||
fs.mkdirs(folderPath);
|
fs.mkdirs(folderPath);
|
||||||
fs.append(folderPath);
|
fs.append(folderPath).close();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testTracingForAppend() throws IOException {
|
public void testTracingForAppend() throws IOException {
|
||||||
AzureBlobFileSystem fs = getFileSystem();
|
AzureBlobFileSystem fs = getFileSystem();
|
||||||
fs.create(TEST_FILE_PATH);
|
Path testPath = path(TEST_FILE_PATH);
|
||||||
|
fs.create(testPath).close();
|
||||||
fs.registerListener(new TracingHeaderValidator(
|
fs.registerListener(new TracingHeaderValidator(
|
||||||
fs.getAbfsStore().getAbfsConfiguration().getClientCorrelationId(),
|
fs.getAbfsStore().getAbfsConfiguration().getClientCorrelationId(),
|
||||||
fs.getFileSystemId(), FSOperationType.APPEND, false, 0));
|
fs.getFileSystemId(), FSOperationType.APPEND, false, 0));
|
||||||
fs.append(TEST_FILE_PATH, 10);
|
fs.append(testPath, 10);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -99,7 +99,7 @@ public class ITestAzureBlobFileSystemAuthorization extends AbstractAbfsIntegrati
|
||||||
this.getConfiguration().getRawConfiguration());
|
this.getConfiguration().getRawConfiguration());
|
||||||
intercept(SASTokenProviderException.class,
|
intercept(SASTokenProviderException.class,
|
||||||
() -> {
|
() -> {
|
||||||
testFs.create(new org.apache.hadoop.fs.Path("/testFile"));
|
testFs.create(new org.apache.hadoop.fs.Path("/testFile")).close();
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -114,7 +114,7 @@ public class ITestAzureBlobFileSystemAuthorization extends AbstractAbfsIntegrati
|
||||||
testFs.initialize(fs.getUri(), this.getConfiguration().getRawConfiguration());
|
testFs.initialize(fs.getUri(), this.getConfiguration().getRawConfiguration());
|
||||||
intercept(SASTokenProviderException.class,
|
intercept(SASTokenProviderException.class,
|
||||||
()-> {
|
()-> {
|
||||||
testFs.create(new org.apache.hadoop.fs.Path("/testFile"));
|
testFs.create(new org.apache.hadoop.fs.Path("/testFile")).close();
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -297,7 +297,7 @@ public class ITestAzureBlobFileSystemAuthorization extends AbstractAbfsIntegrati
|
||||||
fs.listStatus(reqPath);
|
fs.listStatus(reqPath);
|
||||||
break;
|
break;
|
||||||
case CreatePath:
|
case CreatePath:
|
||||||
fs.create(reqPath);
|
fs.create(reqPath).close();
|
||||||
break;
|
break;
|
||||||
case RenamePath:
|
case RenamePath:
|
||||||
fs.rename(reqPath,
|
fs.rename(reqPath,
|
||||||
|
|
|
@ -50,13 +50,16 @@ public class ITestAzureBlobFileSystemBackCompat extends
|
||||||
CloudBlobContainer container = blobClient.getContainerReference(this.getFileSystemName());
|
CloudBlobContainer container = blobClient.getContainerReference(this.getFileSystemName());
|
||||||
container.createIfNotExists();
|
container.createIfNotExists();
|
||||||
|
|
||||||
CloudBlockBlob blockBlob = container.getBlockBlobReference("test/10/10/10");
|
Path testPath = getUniquePath("test");
|
||||||
|
CloudBlockBlob blockBlob = container
|
||||||
|
.getBlockBlobReference(testPath + "/10/10/10");
|
||||||
blockBlob.uploadText("");
|
blockBlob.uploadText("");
|
||||||
|
|
||||||
blockBlob = container.getBlockBlobReference("test/10/123/3/2/1/3");
|
blockBlob = container.getBlockBlobReference(testPath + "/10/123/3/2/1/3");
|
||||||
blockBlob.uploadText("");
|
blockBlob.uploadText("");
|
||||||
|
|
||||||
FileStatus[] fileStatuses = fs.listStatus(new Path("/test/10/"));
|
FileStatus[] fileStatuses = fs
|
||||||
|
.listStatus(new Path(String.format("/%s/10/", testPath)));
|
||||||
assertEquals(2, fileStatuses.length);
|
assertEquals(2, fileStatuses.length);
|
||||||
assertEquals("10", fileStatuses[0].getPath().getName());
|
assertEquals("10", fileStatuses[0].getPath().getName());
|
||||||
assertTrue(fileStatuses[0].isDirectory());
|
assertTrue(fileStatuses[0].isDirectory());
|
||||||
|
|
|
@ -352,7 +352,8 @@ public class ITestAzureBlobFileSystemCheckAccess
|
||||||
|
|
||||||
private Path setupTestDirectoryAndUserAccess(String testFileName,
|
private Path setupTestDirectoryAndUserAccess(String testFileName,
|
||||||
FsAction fsAction) throws Exception {
|
FsAction fsAction) throws Exception {
|
||||||
Path file = new Path(TEST_FOLDER_PATH + testFileName);
|
Path testPath = path(TEST_FOLDER_PATH);
|
||||||
|
Path file = new Path(testPath + testFileName);
|
||||||
file = this.superUserFs.makeQualified(file);
|
file = this.superUserFs.makeQualified(file);
|
||||||
this.superUserFs.delete(file, true);
|
this.superUserFs.delete(file, true);
|
||||||
this.superUserFs.create(file);
|
this.superUserFs.create(file);
|
||||||
|
|
|
@ -53,7 +53,7 @@ public class ITestAzureBlobFileSystemCopy extends AbstractAbfsIntegrationTest {
|
||||||
localFs.delete(localFilePath, true);
|
localFs.delete(localFilePath, true);
|
||||||
try {
|
try {
|
||||||
writeString(localFs, localFilePath, "Testing");
|
writeString(localFs, localFilePath, "Testing");
|
||||||
Path dstPath = new Path("copiedFromLocal");
|
Path dstPath = path("copiedFromLocal");
|
||||||
assertTrue(FileUtil.copy(localFs, localFilePath, fs, dstPath, false,
|
assertTrue(FileUtil.copy(localFs, localFilePath, fs, dstPath, false,
|
||||||
fs.getConf()));
|
fs.getConf()));
|
||||||
assertIsFile(fs, dstPath);
|
assertIsFile(fs, dstPath);
|
||||||
|
|
|
@ -69,7 +69,7 @@ import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CONNECTIONS_MADE;
|
||||||
public class ITestAzureBlobFileSystemCreate extends
|
public class ITestAzureBlobFileSystemCreate extends
|
||||||
AbstractAbfsIntegrationTest {
|
AbstractAbfsIntegrationTest {
|
||||||
private static final Path TEST_FILE_PATH = new Path("testfile");
|
private static final Path TEST_FILE_PATH = new Path("testfile");
|
||||||
private static final Path TEST_FOLDER_PATH = new Path("testFolder");
|
private static final String TEST_FOLDER_PATH = "testFolder";
|
||||||
private static final String TEST_CHILD_FILE = "childFile";
|
private static final String TEST_CHILD_FILE = "childFile";
|
||||||
|
|
||||||
public ITestAzureBlobFileSystemCreate() throws Exception {
|
public ITestAzureBlobFileSystemCreate() throws Exception {
|
||||||
|
@ -92,7 +92,8 @@ public class ITestAzureBlobFileSystemCreate extends
|
||||||
@SuppressWarnings("deprecation")
|
@SuppressWarnings("deprecation")
|
||||||
public void testCreateNonRecursive() throws Exception {
|
public void testCreateNonRecursive() throws Exception {
|
||||||
final AzureBlobFileSystem fs = getFileSystem();
|
final AzureBlobFileSystem fs = getFileSystem();
|
||||||
Path testFile = new Path(TEST_FOLDER_PATH, TEST_CHILD_FILE);
|
Path testFolderPath = path(TEST_FOLDER_PATH);
|
||||||
|
Path testFile = new Path(testFolderPath, TEST_CHILD_FILE);
|
||||||
try {
|
try {
|
||||||
fs.createNonRecursive(testFile, true, 1024, (short) 1, 1024, null);
|
fs.createNonRecursive(testFile, true, 1024, (short) 1, 1024, null);
|
||||||
fail("Should've thrown");
|
fail("Should've thrown");
|
||||||
|
@ -101,7 +102,7 @@ public class ITestAzureBlobFileSystemCreate extends
|
||||||
fs.registerListener(new TracingHeaderValidator(
|
fs.registerListener(new TracingHeaderValidator(
|
||||||
fs.getAbfsStore().getAbfsConfiguration().getClientCorrelationId(),
|
fs.getAbfsStore().getAbfsConfiguration().getClientCorrelationId(),
|
||||||
fs.getFileSystemId(), FSOperationType.MKDIR, false, 0));
|
fs.getFileSystemId(), FSOperationType.MKDIR, false, 0));
|
||||||
fs.mkdirs(TEST_FOLDER_PATH);
|
fs.mkdirs(testFolderPath);
|
||||||
fs.registerListener(null);
|
fs.registerListener(null);
|
||||||
|
|
||||||
fs.createNonRecursive(testFile, true, 1024, (short) 1, 1024, null)
|
fs.createNonRecursive(testFile, true, 1024, (short) 1, 1024, null)
|
||||||
|
@ -113,13 +114,14 @@ public class ITestAzureBlobFileSystemCreate extends
|
||||||
@SuppressWarnings("deprecation")
|
@SuppressWarnings("deprecation")
|
||||||
public void testCreateNonRecursive1() throws Exception {
|
public void testCreateNonRecursive1() throws Exception {
|
||||||
final AzureBlobFileSystem fs = getFileSystem();
|
final AzureBlobFileSystem fs = getFileSystem();
|
||||||
Path testFile = new Path(TEST_FOLDER_PATH, TEST_CHILD_FILE);
|
Path testFolderPath = path(TEST_FOLDER_PATH);
|
||||||
|
Path testFile = new Path(testFolderPath, TEST_CHILD_FILE);
|
||||||
try {
|
try {
|
||||||
fs.createNonRecursive(testFile, FsPermission.getDefault(), EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE), 1024, (short) 1, 1024, null);
|
fs.createNonRecursive(testFile, FsPermission.getDefault(), EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE), 1024, (short) 1, 1024, null);
|
||||||
fail("Should've thrown");
|
fail("Should've thrown");
|
||||||
} catch (FileNotFoundException expected) {
|
} catch (FileNotFoundException expected) {
|
||||||
}
|
}
|
||||||
fs.mkdirs(TEST_FOLDER_PATH);
|
fs.mkdirs(testFolderPath);
|
||||||
fs.createNonRecursive(testFile, true, 1024, (short) 1, 1024, null)
|
fs.createNonRecursive(testFile, true, 1024, (short) 1, 1024, null)
|
||||||
.close();
|
.close();
|
||||||
assertIsFile(fs, testFile);
|
assertIsFile(fs, testFile);
|
||||||
|
@ -131,13 +133,14 @@ public class ITestAzureBlobFileSystemCreate extends
|
||||||
public void testCreateNonRecursive2() throws Exception {
|
public void testCreateNonRecursive2() throws Exception {
|
||||||
final AzureBlobFileSystem fs = getFileSystem();
|
final AzureBlobFileSystem fs = getFileSystem();
|
||||||
|
|
||||||
Path testFile = new Path(TEST_FOLDER_PATH, TEST_CHILD_FILE);
|
Path testFolderPath = path(TEST_FOLDER_PATH);
|
||||||
|
Path testFile = new Path(testFolderPath, TEST_CHILD_FILE);
|
||||||
try {
|
try {
|
||||||
fs.createNonRecursive(testFile, FsPermission.getDefault(), false, 1024, (short) 1, 1024, null);
|
fs.createNonRecursive(testFile, FsPermission.getDefault(), false, 1024, (short) 1, 1024, null);
|
||||||
fail("Should've thrown");
|
fail("Should've thrown");
|
||||||
} catch (FileNotFoundException e) {
|
} catch (FileNotFoundException e) {
|
||||||
}
|
}
|
||||||
fs.mkdirs(TEST_FOLDER_PATH);
|
fs.mkdirs(testFolderPath);
|
||||||
fs.createNonRecursive(testFile, true, 1024, (short) 1, 1024, null)
|
fs.createNonRecursive(testFile, true, 1024, (short) 1, 1024, null)
|
||||||
.close();
|
.close();
|
||||||
assertIsFile(fs, testFile);
|
assertIsFile(fs, testFile);
|
||||||
|
@ -149,7 +152,8 @@ public class ITestAzureBlobFileSystemCreate extends
|
||||||
@Test
|
@Test
|
||||||
public void testWriteAfterClose() throws Throwable {
|
public void testWriteAfterClose() throws Throwable {
|
||||||
final AzureBlobFileSystem fs = getFileSystem();
|
final AzureBlobFileSystem fs = getFileSystem();
|
||||||
Path testPath = new Path(TEST_FOLDER_PATH, TEST_CHILD_FILE);
|
Path testFolderPath = path(TEST_FOLDER_PATH);
|
||||||
|
Path testPath = new Path(testFolderPath, TEST_CHILD_FILE);
|
||||||
FSDataOutputStream out = fs.create(testPath);
|
FSDataOutputStream out = fs.create(testPath);
|
||||||
out.close();
|
out.close();
|
||||||
intercept(IOException.class, () -> out.write('a'));
|
intercept(IOException.class, () -> out.write('a'));
|
||||||
|
@ -169,7 +173,8 @@ public class ITestAzureBlobFileSystemCreate extends
|
||||||
@Test
|
@Test
|
||||||
public void testTryWithResources() throws Throwable {
|
public void testTryWithResources() throws Throwable {
|
||||||
final AzureBlobFileSystem fs = getFileSystem();
|
final AzureBlobFileSystem fs = getFileSystem();
|
||||||
Path testPath = new Path(TEST_FOLDER_PATH, TEST_CHILD_FILE);
|
Path testFolderPath = path(TEST_FOLDER_PATH);
|
||||||
|
Path testPath = new Path(testFolderPath, TEST_CHILD_FILE);
|
||||||
try (FSDataOutputStream out = fs.create(testPath)) {
|
try (FSDataOutputStream out = fs.create(testPath)) {
|
||||||
out.write('1');
|
out.write('1');
|
||||||
out.hsync();
|
out.hsync();
|
||||||
|
@ -202,7 +207,8 @@ public class ITestAzureBlobFileSystemCreate extends
|
||||||
@Test
|
@Test
|
||||||
public void testFilterFSWriteAfterClose() throws Throwable {
|
public void testFilterFSWriteAfterClose() throws Throwable {
|
||||||
final AzureBlobFileSystem fs = getFileSystem();
|
final AzureBlobFileSystem fs = getFileSystem();
|
||||||
Path testPath = new Path(TEST_FOLDER_PATH, TEST_CHILD_FILE);
|
Path testFolderPath = path(TEST_FOLDER_PATH);
|
||||||
|
Path testPath = new Path(testFolderPath, TEST_CHILD_FILE);
|
||||||
FSDataOutputStream out = fs.create(testPath);
|
FSDataOutputStream out = fs.create(testPath);
|
||||||
intercept(FileNotFoundException.class,
|
intercept(FileNotFoundException.class,
|
||||||
() -> {
|
() -> {
|
||||||
|
|
|
@ -53,6 +53,8 @@ import org.apache.hadoop.security.AccessControlException;
|
||||||
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_SAS_TOKEN_PROVIDER_TYPE;
|
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_SAS_TOKEN_PROVIDER_TYPE;
|
||||||
import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.AUTHORIZATION_PERMISSION_MISS_MATCH;
|
import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.AUTHORIZATION_PERMISSION_MISS_MATCH;
|
||||||
import static org.apache.hadoop.fs.azurebfs.utils.AclTestHelpers.aclEntry;
|
import static org.apache.hadoop.fs.azurebfs.utils.AclTestHelpers.aclEntry;
|
||||||
|
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertPathDoesNotExist;
|
||||||
|
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertPathExists;
|
||||||
import static org.apache.hadoop.fs.permission.AclEntryScope.ACCESS;
|
import static org.apache.hadoop.fs.permission.AclEntryScope.ACCESS;
|
||||||
import static org.apache.hadoop.fs.permission.AclEntryScope.DEFAULT;
|
import static org.apache.hadoop.fs.permission.AclEntryScope.DEFAULT;
|
||||||
import static org.apache.hadoop.fs.permission.AclEntryType.GROUP;
|
import static org.apache.hadoop.fs.permission.AclEntryType.GROUP;
|
||||||
|
@ -223,15 +225,15 @@ public class ITestAzureBlobFileSystemDelegationSAS extends AbstractAbfsIntegrati
|
||||||
stream.writeBytes("hello");
|
stream.writeBytes("hello");
|
||||||
}
|
}
|
||||||
|
|
||||||
assertFalse(fs.exists(destinationPath));
|
assertPathDoesNotExist(fs, "This path should not exist", destinationPath);
|
||||||
fs.rename(sourcePath, destinationPath);
|
fs.rename(sourcePath, destinationPath);
|
||||||
assertFalse(fs.exists(sourcePath));
|
assertPathDoesNotExist(fs, "This path should not exist", sourcePath);
|
||||||
assertTrue(fs.exists(destinationPath));
|
assertPathExists(fs, "This path should exist", destinationPath);
|
||||||
|
|
||||||
assertFalse(fs.exists(destinationDir));
|
assertPathDoesNotExist(fs, "This path should not exist", destinationDir);
|
||||||
fs.rename(sourceDir, destinationDir);
|
fs.rename(sourceDir, destinationDir);
|
||||||
assertFalse(fs.exists(sourceDir));
|
assertPathDoesNotExist(fs, "This path should not exist", sourceDir);
|
||||||
assertTrue(fs.exists(destinationDir));
|
assertPathExists(fs, "This path should exist", destinationDir);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -246,13 +248,13 @@ public class ITestAzureBlobFileSystemDelegationSAS extends AbstractAbfsIntegrati
|
||||||
stream.writeBytes("hello");
|
stream.writeBytes("hello");
|
||||||
}
|
}
|
||||||
|
|
||||||
assertTrue(fs.exists(filePath));
|
assertPathExists(fs, "This path should exist", filePath);
|
||||||
fs.delete(filePath, false);
|
fs.delete(filePath, false);
|
||||||
assertFalse(fs.exists(filePath));
|
assertPathDoesNotExist(fs, "This path should not exist", filePath);
|
||||||
|
|
||||||
assertTrue(fs.exists(dirPath));
|
assertPathExists(fs, "This path should exist", dirPath);
|
||||||
fs.delete(dirPath, false);
|
fs.delete(dirPath, false);
|
||||||
assertFalse(fs.exists(dirPath));
|
assertPathDoesNotExist(fs, "This path should not exist", dirPath);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -267,11 +269,11 @@ public class ITestAzureBlobFileSystemDelegationSAS extends AbstractAbfsIntegrati
|
||||||
stream.writeBytes("hello");
|
stream.writeBytes("hello");
|
||||||
}
|
}
|
||||||
|
|
||||||
assertTrue(fs.exists(dirPath));
|
assertPathExists(fs, "This path should exist", dirPath);
|
||||||
assertTrue(fs.exists(filePath));
|
assertPathExists(fs, "This path should exist", filePath);
|
||||||
fs.delete(dirPath, true);
|
fs.delete(dirPath, true);
|
||||||
assertFalse(fs.exists(filePath));
|
assertPathDoesNotExist(fs, "This path should not exist", filePath);
|
||||||
assertFalse(fs.exists(dirPath));
|
assertPathDoesNotExist(fs, "This path should not exist", dirPath);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -395,8 +397,8 @@ public class ITestAzureBlobFileSystemDelegationSAS extends AbstractAbfsIntegrati
|
||||||
@Test
|
@Test
|
||||||
public void testSignatureMask() throws Exception {
|
public void testSignatureMask() throws Exception {
|
||||||
final AzureBlobFileSystem fs = getFileSystem();
|
final AzureBlobFileSystem fs = getFileSystem();
|
||||||
String src = "/testABC/test.xt";
|
String src = String.format("/testABC/test%s.xt", UUID.randomUUID());
|
||||||
fs.create(new Path(src));
|
fs.create(new Path(src)).close();
|
||||||
AbfsRestOperation abfsHttpRestOperation = fs.getAbfsClient()
|
AbfsRestOperation abfsHttpRestOperation = fs.getAbfsClient()
|
||||||
.renamePath(src, "/testABC" + "/abc.txt", null,
|
.renamePath(src, "/testABC" + "/abc.txt", null,
|
||||||
getTestTracingContext(fs, false));
|
getTestTracingContext(fs, false));
|
||||||
|
|
|
@ -79,12 +79,13 @@ public class ITestAzureBlobFileSystemDelete extends
|
||||||
public void testDeleteRoot() throws Exception {
|
public void testDeleteRoot() throws Exception {
|
||||||
final AzureBlobFileSystem fs = getFileSystem();
|
final AzureBlobFileSystem fs = getFileSystem();
|
||||||
|
|
||||||
fs.mkdirs(new Path("/testFolder0"));
|
Path testPath = path("/testFolder");
|
||||||
fs.mkdirs(new Path("/testFolder1"));
|
fs.mkdirs(new Path(testPath + "_0"));
|
||||||
fs.mkdirs(new Path("/testFolder2"));
|
fs.mkdirs(new Path(testPath + "_1"));
|
||||||
touch(new Path("/testFolder1/testfile"));
|
fs.mkdirs(new Path(testPath + "_2"));
|
||||||
touch(new Path("/testFolder1/testfile2"));
|
touch(new Path(testPath + "_1/testfile"));
|
||||||
touch(new Path("/testFolder1/testfile3"));
|
touch(new Path(testPath + "_1/testfile2"));
|
||||||
|
touch(new Path(testPath + "_1/testfile3"));
|
||||||
|
|
||||||
Path root = new Path("/");
|
Path root = new Path("/");
|
||||||
FileStatus[] ls = fs.listStatus(root);
|
FileStatus[] ls = fs.listStatus(root);
|
||||||
|
@ -98,7 +99,7 @@ public class ITestAzureBlobFileSystemDelete extends
|
||||||
@Test()
|
@Test()
|
||||||
public void testOpenFileAfterDelete() throws Exception {
|
public void testOpenFileAfterDelete() throws Exception {
|
||||||
final AzureBlobFileSystem fs = getFileSystem();
|
final AzureBlobFileSystem fs = getFileSystem();
|
||||||
Path testfile = new Path("/testFile");
|
Path testfile = path("/testFile");
|
||||||
touch(testfile);
|
touch(testfile);
|
||||||
assertDeleted(fs, testfile, false);
|
assertDeleted(fs, testfile, false);
|
||||||
|
|
||||||
|
@ -109,7 +110,7 @@ public class ITestAzureBlobFileSystemDelete extends
|
||||||
@Test
|
@Test
|
||||||
public void testEnsureFileIsDeleted() throws Exception {
|
public void testEnsureFileIsDeleted() throws Exception {
|
||||||
final AzureBlobFileSystem fs = getFileSystem();
|
final AzureBlobFileSystem fs = getFileSystem();
|
||||||
Path testfile = new Path("testfile");
|
Path testfile = path("testfile");
|
||||||
touch(testfile);
|
touch(testfile);
|
||||||
assertDeleted(fs, testfile, false);
|
assertDeleted(fs, testfile, false);
|
||||||
assertPathDoesNotExist(fs, "deleted", testfile);
|
assertPathDoesNotExist(fs, "deleted", testfile);
|
||||||
|
@ -118,10 +119,10 @@ public class ITestAzureBlobFileSystemDelete extends
|
||||||
@Test
|
@Test
|
||||||
public void testDeleteDirectory() throws Exception {
|
public void testDeleteDirectory() throws Exception {
|
||||||
final AzureBlobFileSystem fs = getFileSystem();
|
final AzureBlobFileSystem fs = getFileSystem();
|
||||||
Path dir = new Path("testfile");
|
Path dir = path("testfile");
|
||||||
fs.mkdirs(dir);
|
fs.mkdirs(dir);
|
||||||
fs.mkdirs(new Path("testfile/test1"));
|
fs.mkdirs(new Path(dir + "/test1"));
|
||||||
fs.mkdirs(new Path("testfile/test1/test2"));
|
fs.mkdirs(new Path(dir + "/test1/test2"));
|
||||||
|
|
||||||
assertDeleted(fs, dir, true);
|
assertDeleted(fs, dir, true);
|
||||||
assertPathDoesNotExist(fs, "deleted", dir);
|
assertPathDoesNotExist(fs, "deleted", dir);
|
||||||
|
@ -133,8 +134,9 @@ public class ITestAzureBlobFileSystemDelete extends
|
||||||
final List<Future<Void>> tasks = new ArrayList<>();
|
final List<Future<Void>> tasks = new ArrayList<>();
|
||||||
|
|
||||||
ExecutorService es = Executors.newFixedThreadPool(10);
|
ExecutorService es = Executors.newFixedThreadPool(10);
|
||||||
|
Path dir = path("/test");
|
||||||
for (int i = 0; i < 1000; i++) {
|
for (int i = 0; i < 1000; i++) {
|
||||||
final Path fileName = new Path("/test/" + i);
|
final Path fileName = new Path(dir + "/" + i);
|
||||||
Callable<Void> callable = new Callable<Void>() {
|
Callable<Void> callable = new Callable<Void>() {
|
||||||
@Override
|
@Override
|
||||||
public Void call() throws Exception {
|
public Void call() throws Exception {
|
||||||
|
@ -151,7 +153,6 @@ public class ITestAzureBlobFileSystemDelete extends
|
||||||
}
|
}
|
||||||
|
|
||||||
es.shutdownNow();
|
es.shutdownNow();
|
||||||
Path dir = new Path("/test");
|
|
||||||
fs.registerListener(new TracingHeaderValidator(
|
fs.registerListener(new TracingHeaderValidator(
|
||||||
fs.getAbfsStore().getAbfsConfiguration().getClientCorrelationId(),
|
fs.getAbfsStore().getAbfsConfiguration().getClientCorrelationId(),
|
||||||
fs.getFileSystemId(), FSOperationType.DELETE, false, 0));
|
fs.getFileSystemId(), FSOperationType.DELETE, false, 0));
|
||||||
|
|
|
@ -33,6 +33,8 @@ import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys;
|
import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys;
|
||||||
|
|
||||||
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_TOLERATE_CONCURRENT_APPEND;
|
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_TOLERATE_CONCURRENT_APPEND;
|
||||||
|
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertPathDoesNotExist;
|
||||||
|
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertPathExists;
|
||||||
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
|
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -52,14 +54,14 @@ public class ITestAzureBlobFileSystemE2E extends AbstractAbfsIntegrationTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testWriteOneByteToFile() throws Exception {
|
public void testWriteOneByteToFile() throws Exception {
|
||||||
final Path testFilePath = new Path(methodName.getMethodName());
|
final Path testFilePath = path(methodName.getMethodName());
|
||||||
testWriteOneByteToFile(testFilePath);
|
testWriteOneByteToFile(testFilePath);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testReadWriteBytesToFile() throws Exception {
|
public void testReadWriteBytesToFile() throws Exception {
|
||||||
final AzureBlobFileSystem fs = getFileSystem();
|
final AzureBlobFileSystem fs = getFileSystem();
|
||||||
final Path testFilePath = new Path(methodName.getMethodName());
|
final Path testFilePath = path(methodName.getMethodName());
|
||||||
testWriteOneByteToFile(testFilePath);
|
testWriteOneByteToFile(testFilePath);
|
||||||
try(FSDataInputStream inputStream = fs.open(testFilePath,
|
try(FSDataInputStream inputStream = fs.open(testFilePath,
|
||||||
TEST_DEFAULT_BUFFER_SIZE)) {
|
TEST_DEFAULT_BUFFER_SIZE)) {
|
||||||
|
@ -78,7 +80,7 @@ public class ITestAzureBlobFileSystemE2E extends AbstractAbfsIntegrationTest {
|
||||||
final byte[] b = new byte[2 * readBufferSize];
|
final byte[] b = new byte[2 * readBufferSize];
|
||||||
new Random().nextBytes(b);
|
new Random().nextBytes(b);
|
||||||
|
|
||||||
final Path testFilePath = new Path(methodName.getMethodName());
|
final Path testFilePath = path(methodName.getMethodName());
|
||||||
try(FSDataOutputStream writeStream = fs.create(testFilePath)) {
|
try(FSDataOutputStream writeStream = fs.create(testFilePath)) {
|
||||||
writeStream.write(b);
|
writeStream.write(b);
|
||||||
writeStream.flush();
|
writeStream.flush();
|
||||||
|
@ -107,7 +109,7 @@ public class ITestAzureBlobFileSystemE2E extends AbstractAbfsIntegrationTest {
|
||||||
byte[] bytesToRead = new byte[readBufferSize];
|
byte[] bytesToRead = new byte[readBufferSize];
|
||||||
final byte[] b = new byte[2 * readBufferSize];
|
final byte[] b = new byte[2 * readBufferSize];
|
||||||
new Random().nextBytes(b);
|
new Random().nextBytes(b);
|
||||||
final Path testFilePath = new Path(methodName.getMethodName());
|
final Path testFilePath = path(methodName.getMethodName());
|
||||||
|
|
||||||
try (FSDataOutputStream writeStream = fs.create(testFilePath)) {
|
try (FSDataOutputStream writeStream = fs.create(testFilePath)) {
|
||||||
writeStream.write(b);
|
writeStream.write(b);
|
||||||
|
@ -130,7 +132,7 @@ public class ITestAzureBlobFileSystemE2E extends AbstractAbfsIntegrationTest {
|
||||||
@Test
|
@Test
|
||||||
public void testWriteWithBufferOffset() throws Exception {
|
public void testWriteWithBufferOffset() throws Exception {
|
||||||
final AzureBlobFileSystem fs = getFileSystem();
|
final AzureBlobFileSystem fs = getFileSystem();
|
||||||
final Path testFilePath = new Path(methodName.getMethodName());
|
final Path testFilePath = path(methodName.getMethodName());
|
||||||
|
|
||||||
final byte[] b = new byte[1024 * 1000];
|
final byte[] b = new byte[1024 * 1000];
|
||||||
new Random().nextBytes(b);
|
new Random().nextBytes(b);
|
||||||
|
@ -151,7 +153,7 @@ public class ITestAzureBlobFileSystemE2E extends AbstractAbfsIntegrationTest {
|
||||||
@Test
|
@Test
|
||||||
public void testReadWriteHeavyBytesToFileWithSmallerChunks() throws Exception {
|
public void testReadWriteHeavyBytesToFileWithSmallerChunks() throws Exception {
|
||||||
final AzureBlobFileSystem fs = getFileSystem();
|
final AzureBlobFileSystem fs = getFileSystem();
|
||||||
final Path testFilePath = new Path(methodName.getMethodName());
|
final Path testFilePath = path(methodName.getMethodName());
|
||||||
|
|
||||||
final byte[] writeBuffer = new byte[5 * 1000 * 1024];
|
final byte[] writeBuffer = new byte[5 * 1000 * 1024];
|
||||||
new Random().nextBytes(writeBuffer);
|
new Random().nextBytes(writeBuffer);
|
||||||
|
@ -171,50 +173,51 @@ public class ITestAzureBlobFileSystemE2E extends AbstractAbfsIntegrationTest {
|
||||||
@Test
|
@Test
|
||||||
public void testReadWithFileNotFoundException() throws Exception {
|
public void testReadWithFileNotFoundException() throws Exception {
|
||||||
final AzureBlobFileSystem fs = getFileSystem();
|
final AzureBlobFileSystem fs = getFileSystem();
|
||||||
final Path testFilePath = new Path(methodName.getMethodName());
|
final Path testFilePath = path(methodName.getMethodName());
|
||||||
testWriteOneByteToFile(testFilePath);
|
testWriteOneByteToFile(testFilePath);
|
||||||
|
|
||||||
FSDataInputStream inputStream = fs.open(testFilePath, TEST_DEFAULT_BUFFER_SIZE);
|
try (FSDataInputStream inputStream = fs.open(testFilePath,
|
||||||
|
TEST_DEFAULT_BUFFER_SIZE)) {
|
||||||
fs.delete(testFilePath, true);
|
fs.delete(testFilePath, true);
|
||||||
assertFalse(fs.exists(testFilePath));
|
assertPathDoesNotExist(fs, "This path should not exist", testFilePath);
|
||||||
|
|
||||||
intercept(FileNotFoundException.class,
|
intercept(FileNotFoundException.class, () -> inputStream.read(new byte[1]));
|
||||||
() -> inputStream.read(new byte[1]));
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testWriteWithFileNotFoundException() throws Exception {
|
public void testWriteWithFileNotFoundException() throws Exception {
|
||||||
final AzureBlobFileSystem fs = getFileSystem();
|
final AzureBlobFileSystem fs = getFileSystem();
|
||||||
final Path testFilePath = new Path(methodName.getMethodName());
|
final Path testFilePath = path(methodName.getMethodName());
|
||||||
|
|
||||||
FSDataOutputStream stream = fs.create(testFilePath);
|
try (FSDataOutputStream stream = fs.create(testFilePath)) {
|
||||||
assertTrue(fs.exists(testFilePath));
|
assertPathExists(fs, "Path should exist", testFilePath);
|
||||||
stream.write(TEST_BYTE);
|
stream.write(TEST_BYTE);
|
||||||
|
|
||||||
fs.delete(testFilePath, true);
|
fs.delete(testFilePath, true);
|
||||||
assertFalse(fs.exists(testFilePath));
|
assertPathDoesNotExist(fs, "This path should not exist", testFilePath);
|
||||||
|
|
||||||
// trigger append call
|
// trigger append call
|
||||||
intercept(FileNotFoundException.class,
|
intercept(FileNotFoundException.class, () -> stream.close());
|
||||||
() -> stream.close());
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testFlushWithFileNotFoundException() throws Exception {
|
public void testFlushWithFileNotFoundException() throws Exception {
|
||||||
final AzureBlobFileSystem fs = getFileSystem();
|
final AzureBlobFileSystem fs = getFileSystem();
|
||||||
final Path testFilePath = new Path(methodName.getMethodName());
|
final Path testFilePath = path(methodName.getMethodName());
|
||||||
if (fs.getAbfsStore().isAppendBlobKey(fs.makeQualified(testFilePath).toString())) {
|
if (fs.getAbfsStore().isAppendBlobKey(fs.makeQualified(testFilePath).toString())) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
FSDataOutputStream stream = fs.create(testFilePath);
|
try (FSDataOutputStream stream = fs.create(testFilePath)) {
|
||||||
assertTrue(fs.exists(testFilePath));
|
assertPathExists(fs, "This path should exist", testFilePath);
|
||||||
|
|
||||||
fs.delete(testFilePath, true);
|
fs.delete(testFilePath, true);
|
||||||
assertFalse(fs.exists(testFilePath));
|
assertPathDoesNotExist(fs, "This path should not exist", testFilePath);
|
||||||
|
|
||||||
intercept(FileNotFoundException.class,
|
intercept(FileNotFoundException.class, () -> stream.close());
|
||||||
() -> stream.close());
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void testWriteOneByteToFile(Path testFilePath) throws Exception {
|
private void testWriteOneByteToFile(Path testFilePath) throws Exception {
|
||||||
|
|
|
@ -27,6 +27,8 @@ import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertPathExists;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test FileStatus.
|
* Test FileStatus.
|
||||||
*/
|
*/
|
||||||
|
@ -37,8 +39,8 @@ public class ITestAzureBlobFileSystemFileStatus extends
|
||||||
private static final String DEFAULT_UMASK_VALUE = "027";
|
private static final String DEFAULT_UMASK_VALUE = "027";
|
||||||
private static final String FULL_PERMISSION = "777";
|
private static final String FULL_PERMISSION = "777";
|
||||||
|
|
||||||
private static final Path TEST_FILE = new Path("testFile");
|
private static final String TEST_FILE = "testFile";
|
||||||
private static final Path TEST_FOLDER = new Path("testDir");
|
private static final String TEST_FOLDER = "testDir";
|
||||||
|
|
||||||
public ITestAzureBlobFileSystemFileStatus() throws Exception {
|
public ITestAzureBlobFileSystemFileStatus() throws Exception {
|
||||||
super();
|
super();
|
||||||
|
@ -57,8 +59,9 @@ public class ITestAzureBlobFileSystemFileStatus extends
|
||||||
public void testFileStatusPermissionsAndOwnerAndGroup() throws Exception {
|
public void testFileStatusPermissionsAndOwnerAndGroup() throws Exception {
|
||||||
final AzureBlobFileSystem fs = this.getFileSystem();
|
final AzureBlobFileSystem fs = this.getFileSystem();
|
||||||
fs.getConf().set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, DEFAULT_UMASK_VALUE);
|
fs.getConf().set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, DEFAULT_UMASK_VALUE);
|
||||||
touch(TEST_FILE);
|
Path testFile = path(TEST_FILE);
|
||||||
validateStatus(fs, TEST_FILE, false);
|
touch(testFile);
|
||||||
|
validateStatus(fs, testFile, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
private FileStatus validateStatus(final AzureBlobFileSystem fs, final Path name, final boolean isDir)
|
private FileStatus validateStatus(final AzureBlobFileSystem fs, final Path name, final boolean isDir)
|
||||||
|
@ -93,9 +96,10 @@ public class ITestAzureBlobFileSystemFileStatus extends
|
||||||
public void testFolderStatusPermissionsAndOwnerAndGroup() throws Exception {
|
public void testFolderStatusPermissionsAndOwnerAndGroup() throws Exception {
|
||||||
final AzureBlobFileSystem fs = this.getFileSystem();
|
final AzureBlobFileSystem fs = this.getFileSystem();
|
||||||
fs.getConf().set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, DEFAULT_UMASK_VALUE);
|
fs.getConf().set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, DEFAULT_UMASK_VALUE);
|
||||||
fs.mkdirs(TEST_FOLDER);
|
Path testFolder = path(TEST_FOLDER);
|
||||||
|
fs.mkdirs(testFolder);
|
||||||
|
|
||||||
validateStatus(fs, TEST_FOLDER, true);
|
validateStatus(fs, testFolder, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -108,11 +112,11 @@ public class ITestAzureBlobFileSystemFileStatus extends
|
||||||
Path pathwithouthost2 = new Path("/abfs/file2.txt");
|
Path pathwithouthost2 = new Path("/abfs/file2.txt");
|
||||||
|
|
||||||
// verify compatibility of this path format
|
// verify compatibility of this path format
|
||||||
fs.create(pathWithHost1);
|
fs.create(pathWithHost1).close();
|
||||||
assertTrue(fs.exists(pathwithouthost1));
|
assertPathExists(fs, "This path should exist", pathwithouthost1);
|
||||||
|
|
||||||
fs.create(pathwithouthost2);
|
fs.create(pathwithouthost2).close();
|
||||||
assertTrue(fs.exists(pathWithHost2));
|
assertPathExists(fs, "This path should exist", pathWithHost2);
|
||||||
|
|
||||||
// verify get
|
// verify get
|
||||||
FileStatus fileStatus1 = fs.getFileStatus(pathWithHost1);
|
FileStatus fileStatus1 = fs.getFileStatus(pathWithHost1);
|
||||||
|
@ -125,13 +129,13 @@ public class ITestAzureBlobFileSystemFileStatus extends
|
||||||
@Test
|
@Test
|
||||||
public void testLastModifiedTime() throws IOException {
|
public void testLastModifiedTime() throws IOException {
|
||||||
AzureBlobFileSystem fs = this.getFileSystem();
|
AzureBlobFileSystem fs = this.getFileSystem();
|
||||||
Path testFilePath = new Path("childfile1.txt");
|
Path testFilePath = path("childfile1.txt");
|
||||||
long createStartTime = System.currentTimeMillis();
|
long createStartTime = System.currentTimeMillis();
|
||||||
long minCreateStartTime = (createStartTime / 1000) * 1000 - 1;
|
long minCreateStartTime = (createStartTime / 1000) * 1000 - 1;
|
||||||
// Dividing and multiplying by 1000 to make last 3 digits 0.
|
// Dividing and multiplying by 1000 to make last 3 digits 0.
|
||||||
// It is observed that modification time is returned with last 3
|
// It is observed that modification time is returned with last 3
|
||||||
// digits 0 always.
|
// digits 0 always.
|
||||||
fs.create(testFilePath);
|
fs.create(testFilePath).close();
|
||||||
long createEndTime = System.currentTimeMillis();
|
long createEndTime = System.currentTimeMillis();
|
||||||
FileStatus fStat = fs.getFileStatus(testFilePath);
|
FileStatus fStat = fs.getFileStatus(testFilePath);
|
||||||
long lastModifiedTime = fStat.getModificationTime();
|
long lastModifiedTime = fStat.getModificationTime();
|
||||||
|
|
|
@ -316,15 +316,14 @@ public class ITestAzureBlobFileSystemFlush extends AbstractAbfsScaleTest {
|
||||||
|
|
||||||
byte[] buf = new byte[10];
|
byte[] buf = new byte[10];
|
||||||
new Random().nextBytes(buf);
|
new Random().nextBytes(buf);
|
||||||
FSDataOutputStream out = fs.create(new Path("/testFile"));
|
try (FSDataOutputStream out = fs.create(new Path("/testFile"))) {
|
||||||
((AbfsOutputStream) out.getWrappedStream()).registerListener(
|
((AbfsOutputStream) out.getWrappedStream()).registerListener(new TracingHeaderValidator(
|
||||||
new TracingHeaderValidator(
|
fs.getAbfsStore().getAbfsConfiguration().getClientCorrelationId(), fs.getFileSystemId(), FSOperationType.WRITE, false, 0,
|
||||||
fs.getAbfsStore().getAbfsConfiguration().getClientCorrelationId(),
|
|
||||||
fs.getFileSystemId(), FSOperationType.WRITE, false, 0,
|
|
||||||
((AbfsOutputStream) out.getWrappedStream()).getStreamID()));
|
((AbfsOutputStream) out.getWrappedStream()).getStreamID()));
|
||||||
out.write(buf);
|
out.write(buf);
|
||||||
out.hsync();
|
out.hsync();
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testStreamCapabilitiesWithFlushDisabled() throws Exception {
|
public void testStreamCapabilitiesWithFlushDisabled() throws Exception {
|
||||||
|
|
|
@ -99,7 +99,7 @@ public class ITestAzureBlobFileSystemListStatus extends
|
||||||
@Test
|
@Test
|
||||||
public void testListFileVsListDir() throws Exception {
|
public void testListFileVsListDir() throws Exception {
|
||||||
final AzureBlobFileSystem fs = getFileSystem();
|
final AzureBlobFileSystem fs = getFileSystem();
|
||||||
Path path = new Path("/testFile");
|
Path path = path("/testFile");
|
||||||
try(FSDataOutputStream ignored = fs.create(path)) {
|
try(FSDataOutputStream ignored = fs.create(path)) {
|
||||||
FileStatus[] testFiles = fs.listStatus(path);
|
FileStatus[] testFiles = fs.listStatus(path);
|
||||||
assertEquals("length of test files", 1, testFiles.length);
|
assertEquals("length of test files", 1, testFiles.length);
|
||||||
|
@ -111,19 +111,20 @@ public class ITestAzureBlobFileSystemListStatus extends
|
||||||
@Test
|
@Test
|
||||||
public void testListFileVsListDir2() throws Exception {
|
public void testListFileVsListDir2() throws Exception {
|
||||||
final AzureBlobFileSystem fs = getFileSystem();
|
final AzureBlobFileSystem fs = getFileSystem();
|
||||||
fs.mkdirs(new Path("/testFolder"));
|
Path testFolder = path("/testFolder");
|
||||||
fs.mkdirs(new Path("/testFolder/testFolder2"));
|
fs.mkdirs(testFolder);
|
||||||
fs.mkdirs(new Path("/testFolder/testFolder2/testFolder3"));
|
fs.mkdirs(new Path(testFolder + "/testFolder2"));
|
||||||
Path testFile0Path = new Path("/testFolder/testFolder2/testFolder3/testFile");
|
fs.mkdirs(new Path(testFolder + "/testFolder2/testFolder3"));
|
||||||
|
Path testFile0Path = new Path(
|
||||||
|
testFolder + "/testFolder2/testFolder3/testFile");
|
||||||
ContractTestUtils.touch(fs, testFile0Path);
|
ContractTestUtils.touch(fs, testFile0Path);
|
||||||
|
|
||||||
FileStatus[] testFiles = fs.listStatus(testFile0Path);
|
FileStatus[] testFiles = fs.listStatus(testFile0Path);
|
||||||
assertEquals("Wrong listing size of file " + testFile0Path,
|
assertEquals("Wrong listing size of file " + testFile0Path,
|
||||||
1, testFiles.length);
|
1, testFiles.length);
|
||||||
FileStatus file0 = testFiles[0];
|
FileStatus file0 = testFiles[0];
|
||||||
assertEquals("Wrong path for " + file0,
|
assertEquals("Wrong path for " + file0, new Path(getTestUrl(),
|
||||||
new Path(getTestUrl(), "/testFolder/testFolder2/testFolder3/testFile"),
|
testFolder + "/testFolder2/testFolder3/testFile"), file0.getPath());
|
||||||
file0.getPath());
|
|
||||||
assertIsFileReference(file0);
|
assertIsFileReference(file0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -136,18 +137,18 @@ public class ITestAzureBlobFileSystemListStatus extends
|
||||||
@Test
|
@Test
|
||||||
public void testListFiles() throws Exception {
|
public void testListFiles() throws Exception {
|
||||||
final AzureBlobFileSystem fs = getFileSystem();
|
final AzureBlobFileSystem fs = getFileSystem();
|
||||||
Path testDir = new Path("/test");
|
Path testDir = path("/test");
|
||||||
fs.mkdirs(testDir);
|
fs.mkdirs(testDir);
|
||||||
|
|
||||||
FileStatus[] fileStatuses = fs.listStatus(new Path("/"));
|
FileStatus[] fileStatuses = fs.listStatus(new Path("/"));
|
||||||
assertEquals(1, fileStatuses.length);
|
assertEquals(1, fileStatuses.length);
|
||||||
|
|
||||||
fs.mkdirs(new Path("/test/sub"));
|
fs.mkdirs(new Path(testDir + "/sub"));
|
||||||
fileStatuses = fs.listStatus(testDir);
|
fileStatuses = fs.listStatus(testDir);
|
||||||
assertEquals(1, fileStatuses.length);
|
assertEquals(1, fileStatuses.length);
|
||||||
assertEquals("sub", fileStatuses[0].getPath().getName());
|
assertEquals("sub", fileStatuses[0].getPath().getName());
|
||||||
assertIsDirectoryReference(fileStatuses[0]);
|
assertIsDirectoryReference(fileStatuses[0]);
|
||||||
Path childF = fs.makeQualified(new Path("/test/f"));
|
Path childF = fs.makeQualified(new Path(testDir + "/f"));
|
||||||
touch(childF);
|
touch(childF);
|
||||||
fileStatuses = fs.listStatus(testDir);
|
fileStatuses = fs.listStatus(testDir);
|
||||||
assertEquals(2, fileStatuses.length);
|
assertEquals(2, fileStatuses.length);
|
||||||
|
@ -193,7 +194,7 @@ public class ITestAzureBlobFileSystemListStatus extends
|
||||||
final AzureBlobFileSystem fs = getFileSystem();
|
final AzureBlobFileSystem fs = getFileSystem();
|
||||||
|
|
||||||
Path nontrailingPeriodDir = path("testTrailingDir/dir");
|
Path nontrailingPeriodDir = path("testTrailingDir/dir");
|
||||||
Path trailingPeriodDir = path("testTrailingDir/dir.");
|
Path trailingPeriodDir = new Path("testMkdirTrailingDir/dir.");
|
||||||
|
|
||||||
assertMkdirs(fs, nontrailingPeriodDir);
|
assertMkdirs(fs, nontrailingPeriodDir);
|
||||||
|
|
||||||
|
@ -212,8 +213,8 @@ public class ITestAzureBlobFileSystemListStatus extends
|
||||||
boolean exceptionThrown = false;
|
boolean exceptionThrown = false;
|
||||||
final AzureBlobFileSystem fs = getFileSystem();
|
final AzureBlobFileSystem fs = getFileSystem();
|
||||||
|
|
||||||
Path trailingPeriodFile = path("testTrailingDir/file.");
|
Path trailingPeriodFile = new Path("testTrailingDir/file.");
|
||||||
Path nontrailingPeriodFile = path("testTrailingDir/file");
|
Path nontrailingPeriodFile = path("testCreateTrailingDir/file");
|
||||||
|
|
||||||
createFile(fs, nontrailingPeriodFile, false, new byte[0]);
|
createFile(fs, nontrailingPeriodFile, false, new byte[0]);
|
||||||
assertPathExists(fs, "Trailing period file does not exist",
|
assertPathExists(fs, "Trailing period file does not exist",
|
||||||
|
@ -235,7 +236,7 @@ public class ITestAzureBlobFileSystemListStatus extends
|
||||||
final AzureBlobFileSystem fs = getFileSystem();
|
final AzureBlobFileSystem fs = getFileSystem();
|
||||||
|
|
||||||
Path nonTrailingPeriodFile = path("testTrailingDir/file");
|
Path nonTrailingPeriodFile = path("testTrailingDir/file");
|
||||||
Path trailingPeriodFile = path("testTrailingDir/file.");
|
Path trailingPeriodFile = new Path("testRenameTrailingDir/file.");
|
||||||
|
|
||||||
createFile(fs, nonTrailingPeriodFile, false, new byte[0]);
|
createFile(fs, nonTrailingPeriodFile, false, new byte[0]);
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -49,7 +49,7 @@ public class ITestAzureBlobFileSystemMkDir extends AbstractAbfsIntegrationTest {
|
||||||
DEFAULT_FS_AZURE_ENABLE_MKDIR_OVERWRITE || !getIsNamespaceEnabled(
|
DEFAULT_FS_AZURE_ENABLE_MKDIR_OVERWRITE || !getIsNamespaceEnabled(
|
||||||
getFileSystem()));
|
getFileSystem()));
|
||||||
final AzureBlobFileSystem fs = getFileSystem();
|
final AzureBlobFileSystem fs = getFileSystem();
|
||||||
Path path = new Path("testFolder");
|
Path path = path("testFolder");
|
||||||
assertMkdirs(fs, path);
|
assertMkdirs(fs, path);
|
||||||
assertMkdirs(fs, path);
|
assertMkdirs(fs, path);
|
||||||
}
|
}
|
||||||
|
@ -64,7 +64,7 @@ public class ITestAzureBlobFileSystemMkDir extends AbstractAbfsIntegrationTest {
|
||||||
Configuration config = new Configuration(this.getRawConfiguration());
|
Configuration config = new Configuration(this.getRawConfiguration());
|
||||||
config.set(FS_AZURE_ENABLE_MKDIR_OVERWRITE, Boolean.toString(false));
|
config.set(FS_AZURE_ENABLE_MKDIR_OVERWRITE, Boolean.toString(false));
|
||||||
AzureBlobFileSystem fs = getFileSystem(config);
|
AzureBlobFileSystem fs = getFileSystem(config);
|
||||||
Path path = new Path("testFolder");
|
Path path = path("testFolder");
|
||||||
assertMkdirs(fs, path); //checks that mkdirs returns true
|
assertMkdirs(fs, path); //checks that mkdirs returns true
|
||||||
long timeCreated = fs.getFileStatus(path).getModificationTime();
|
long timeCreated = fs.getFileStatus(path).getModificationTime();
|
||||||
assertMkdirs(fs, path); //call to existing dir should return success
|
assertMkdirs(fs, path); //call to existing dir should return success
|
||||||
|
@ -78,8 +78,8 @@ public class ITestAzureBlobFileSystemMkDir extends AbstractAbfsIntegrationTest {
|
||||||
DEFAULT_FS_AZURE_ENABLE_MKDIR_OVERWRITE && getIsNamespaceEnabled(
|
DEFAULT_FS_AZURE_ENABLE_MKDIR_OVERWRITE && getIsNamespaceEnabled(
|
||||||
getFileSystem()));
|
getFileSystem()));
|
||||||
final AzureBlobFileSystem fs = getFileSystem();
|
final AzureBlobFileSystem fs = getFileSystem();
|
||||||
Path path = new Path("testFilePath");
|
Path path = path("testFilePath");
|
||||||
fs.create(path);
|
fs.create(path).close();
|
||||||
assertTrue(fs.getFileStatus(path).isFile());
|
assertTrue(fs.getFileStatus(path).isFile());
|
||||||
intercept(FileAlreadyExistsException.class, () -> fs.mkdirs(path));
|
intercept(FileAlreadyExistsException.class, () -> fs.mkdirs(path));
|
||||||
}
|
}
|
||||||
|
|
|
@ -45,6 +45,8 @@ import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_A
|
||||||
import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_BLOB_DATA_CONTRIBUTOR_CLIENT_SECRET;
|
import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_BLOB_DATA_CONTRIBUTOR_CLIENT_SECRET;
|
||||||
import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_BLOB_DATA_READER_CLIENT_ID;
|
import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_BLOB_DATA_READER_CLIENT_ID;
|
||||||
import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_BLOB_DATA_READER_CLIENT_SECRET;
|
import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_BLOB_DATA_READER_CLIENT_SECRET;
|
||||||
|
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertPathDoesNotExist;
|
||||||
|
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertPathExists;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test Azure Oauth with Blob Data contributor role and Blob Data Reader role.
|
* Test Azure Oauth with Blob Data contributor role and Blob Data Reader role.
|
||||||
|
@ -54,8 +56,8 @@ import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_A
|
||||||
public class ITestAzureBlobFileSystemOauth extends AbstractAbfsIntegrationTest{
|
public class ITestAzureBlobFileSystemOauth extends AbstractAbfsIntegrationTest{
|
||||||
|
|
||||||
private static final Path FILE_PATH = new Path("/testFile");
|
private static final Path FILE_PATH = new Path("/testFile");
|
||||||
private static final Path EXISTED_FILE_PATH = new Path("/existedFile");
|
private static final String EXISTED_FILE_PATH = "/existedFile";
|
||||||
private static final Path EXISTED_FOLDER_PATH = new Path("/existedFolder");
|
private static final String EXISTED_FOLDER_PATH = "/existedFolder";
|
||||||
private static final Logger LOG =
|
private static final Logger LOG =
|
||||||
LoggerFactory.getLogger(ITestAbfsStreamStatistics.class);
|
LoggerFactory.getLogger(ITestAbfsStreamStatistics.class);
|
||||||
|
|
||||||
|
@ -72,7 +74,9 @@ public class ITestAzureBlobFileSystemOauth extends AbstractAbfsIntegrationTest{
|
||||||
String secret = this.getConfiguration().get(TestConfigurationKeys.FS_AZURE_BLOB_DATA_CONTRIBUTOR_CLIENT_SECRET);
|
String secret = this.getConfiguration().get(TestConfigurationKeys.FS_AZURE_BLOB_DATA_CONTRIBUTOR_CLIENT_SECRET);
|
||||||
Assume.assumeTrue("Contributor client secret not provided", secret != null);
|
Assume.assumeTrue("Contributor client secret not provided", secret != null);
|
||||||
|
|
||||||
prepareFiles();
|
Path existedFilePath = path(EXISTED_FILE_PATH);
|
||||||
|
Path existedFolderPath = path(EXISTED_FOLDER_PATH);
|
||||||
|
prepareFiles(existedFilePath, existedFolderPath);
|
||||||
|
|
||||||
final AzureBlobFileSystem fs = getBlobConributor();
|
final AzureBlobFileSystem fs = getBlobConributor();
|
||||||
|
|
||||||
|
@ -80,39 +84,39 @@ public class ITestAzureBlobFileSystemOauth extends AbstractAbfsIntegrationTest{
|
||||||
try(FSDataOutputStream stream = fs.create(FILE_PATH)) {
|
try(FSDataOutputStream stream = fs.create(FILE_PATH)) {
|
||||||
stream.write(0);
|
stream.write(0);
|
||||||
}
|
}
|
||||||
assertTrue(fs.exists(FILE_PATH));
|
assertPathExists(fs, "This path should exist", FILE_PATH);
|
||||||
FileStatus fileStatus = fs.getFileStatus(FILE_PATH);
|
FileStatus fileStatus = fs.getFileStatus(FILE_PATH);
|
||||||
assertEquals(1, fileStatus.getLen());
|
assertEquals(1, fileStatus.getLen());
|
||||||
// delete file
|
// delete file
|
||||||
assertTrue(fs.delete(FILE_PATH, true));
|
assertTrue(fs.delete(FILE_PATH, true));
|
||||||
assertFalse(fs.exists(FILE_PATH));
|
assertPathDoesNotExist(fs, "This path should not exist", FILE_PATH);
|
||||||
|
|
||||||
// Verify Blob Data Contributor has full access to existed folder, file
|
// Verify Blob Data Contributor has full access to existed folder, file
|
||||||
|
|
||||||
// READ FOLDER
|
// READ FOLDER
|
||||||
assertTrue(fs.exists(EXISTED_FOLDER_PATH));
|
assertPathExists(fs, "This path should exist", existedFolderPath);
|
||||||
|
|
||||||
//DELETE FOLDER
|
//DELETE FOLDER
|
||||||
fs.delete(EXISTED_FOLDER_PATH, true);
|
fs.delete(existedFolderPath, true);
|
||||||
assertFalse(fs.exists(EXISTED_FOLDER_PATH));
|
assertPathDoesNotExist(fs, "This path should not exist", existedFolderPath);
|
||||||
|
|
||||||
// READ FILE
|
// READ FILE
|
||||||
try (FSDataInputStream stream = fs.open(EXISTED_FILE_PATH)) {
|
try (FSDataInputStream stream = fs.open(existedFilePath)) {
|
||||||
assertTrue(stream.read() != 0);
|
assertTrue(stream.read() != 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
assertEquals(0, fs.getFileStatus(EXISTED_FILE_PATH).getLen());
|
assertEquals(0, fs.getFileStatus(existedFilePath).getLen());
|
||||||
|
|
||||||
// WRITE FILE
|
// WRITE FILE
|
||||||
try (FSDataOutputStream stream = fs.append(EXISTED_FILE_PATH)) {
|
try (FSDataOutputStream stream = fs.append(existedFilePath)) {
|
||||||
stream.write(0);
|
stream.write(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
assertEquals(1, fs.getFileStatus(EXISTED_FILE_PATH).getLen());
|
assertEquals(1, fs.getFileStatus(existedFilePath).getLen());
|
||||||
|
|
||||||
// REMOVE FILE
|
// REMOVE FILE
|
||||||
fs.delete(EXISTED_FILE_PATH, true);
|
fs.delete(existedFilePath, true);
|
||||||
assertFalse(fs.exists(EXISTED_FILE_PATH));
|
assertPathDoesNotExist(fs, "This path should not exist", existedFilePath);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -125,7 +129,9 @@ public class ITestAzureBlobFileSystemOauth extends AbstractAbfsIntegrationTest{
|
||||||
String secret = this.getConfiguration().get(TestConfigurationKeys.FS_AZURE_BLOB_DATA_READER_CLIENT_SECRET);
|
String secret = this.getConfiguration().get(TestConfigurationKeys.FS_AZURE_BLOB_DATA_READER_CLIENT_SECRET);
|
||||||
Assume.assumeTrue("Reader client secret not provided", secret != null);
|
Assume.assumeTrue("Reader client secret not provided", secret != null);
|
||||||
|
|
||||||
prepareFiles();
|
Path existedFilePath = path(EXISTED_FILE_PATH);
|
||||||
|
Path existedFolderPath = path(EXISTED_FOLDER_PATH);
|
||||||
|
prepareFiles(existedFilePath, existedFolderPath);
|
||||||
final AzureBlobFileSystem fs = getBlobReader();
|
final AzureBlobFileSystem fs = getBlobReader();
|
||||||
|
|
||||||
// Use abfsStore in this test to verify the ERROR code in AbfsRestOperationException
|
// Use abfsStore in this test to verify the ERROR code in AbfsRestOperationException
|
||||||
|
@ -134,24 +140,24 @@ public class ITestAzureBlobFileSystemOauth extends AbstractAbfsIntegrationTest{
|
||||||
// TEST READ FS
|
// TEST READ FS
|
||||||
Map<String, String> properties = abfsStore.getFilesystemProperties(tracingContext);
|
Map<String, String> properties = abfsStore.getFilesystemProperties(tracingContext);
|
||||||
// TEST READ FOLDER
|
// TEST READ FOLDER
|
||||||
assertTrue(fs.exists(EXISTED_FOLDER_PATH));
|
assertPathExists(fs, "This path should exist", existedFolderPath);
|
||||||
|
|
||||||
// TEST DELETE FOLDER
|
// TEST DELETE FOLDER
|
||||||
try {
|
try {
|
||||||
abfsStore.delete(EXISTED_FOLDER_PATH, true, tracingContext);
|
abfsStore.delete(existedFolderPath, true, tracingContext);
|
||||||
} catch (AbfsRestOperationException e) {
|
} catch (AbfsRestOperationException e) {
|
||||||
assertEquals(AzureServiceErrorCode.AUTHORIZATION_PERMISSION_MISS_MATCH, e.getErrorCode());
|
assertEquals(AzureServiceErrorCode.AUTHORIZATION_PERMISSION_MISS_MATCH, e.getErrorCode());
|
||||||
}
|
}
|
||||||
|
|
||||||
// TEST READ FILE
|
// TEST READ FILE
|
||||||
try (InputStream inputStream = abfsStore.openFileForRead(EXISTED_FILE_PATH, null,
|
try (InputStream inputStream = abfsStore
|
||||||
tracingContext)) {
|
.openFileForRead(existedFilePath, null, tracingContext)) {
|
||||||
assertTrue(inputStream.read() != 0);
|
assertTrue(inputStream.read() != 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
// TEST WRITE FILE
|
// TEST WRITE FILE
|
||||||
try {
|
try {
|
||||||
abfsStore.openFileForWrite(EXISTED_FILE_PATH, fs.getFsStatistics(), true,
|
abfsStore.openFileForWrite(existedFilePath, fs.getFsStatistics(), true,
|
||||||
tracingContext);
|
tracingContext);
|
||||||
} catch (AbfsRestOperationException e) {
|
} catch (AbfsRestOperationException e) {
|
||||||
assertEquals(AzureServiceErrorCode.AUTHORIZATION_PERMISSION_MISS_MATCH, e.getErrorCode());
|
assertEquals(AzureServiceErrorCode.AUTHORIZATION_PERMISSION_MISS_MATCH, e.getErrorCode());
|
||||||
|
@ -161,14 +167,14 @@ public class ITestAzureBlobFileSystemOauth extends AbstractAbfsIntegrationTest{
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void prepareFiles() throws IOException {
|
private void prepareFiles(Path existedFilePath, Path existedFolderPath) throws IOException {
|
||||||
// create test files/folders to verify access control diff between
|
// create test files/folders to verify access control diff between
|
||||||
// Blob data contributor and Blob data reader
|
// Blob data contributor and Blob data reader
|
||||||
final AzureBlobFileSystem fs = this.getFileSystem();
|
final AzureBlobFileSystem fs = this.getFileSystem();
|
||||||
fs.create(EXISTED_FILE_PATH);
|
fs.create(existedFilePath).close();
|
||||||
assertTrue(fs.exists(EXISTED_FILE_PATH));
|
assertPathExists(fs, "This path should exist", existedFilePath);
|
||||||
fs.mkdirs(EXISTED_FOLDER_PATH);
|
fs.mkdirs(existedFolderPath);
|
||||||
assertTrue(fs.exists(EXISTED_FOLDER_PATH));
|
assertPathExists(fs, "This path should exist", existedFolderPath);
|
||||||
}
|
}
|
||||||
|
|
||||||
private AzureBlobFileSystem getBlobConributor() throws Exception {
|
private AzureBlobFileSystem getBlobConributor() throws Exception {
|
||||||
|
|
|
@ -84,7 +84,8 @@ public class ITestAzureBlobFileSystemPermission extends AbstractAbfsIntegrationT
|
||||||
new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE));
|
new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE));
|
||||||
fs.removeDefaultAcl(path.getParent());
|
fs.removeDefaultAcl(path.getParent());
|
||||||
|
|
||||||
fs.create(path, permission, true, KILOBYTE, (short) 1, KILOBYTE - 1, null);
|
fs.create(path, permission, true, KILOBYTE, (short) 1, KILOBYTE - 1,
|
||||||
|
null).close();
|
||||||
FileStatus status = fs.getFileStatus(path);
|
FileStatus status = fs.getFileStatus(path);
|
||||||
Assert.assertEquals(permission.applyUMask(DEFAULT_UMASK_PERMISSION), status.getPermission());
|
Assert.assertEquals(permission.applyUMask(DEFAULT_UMASK_PERMISSION), status.getPermission());
|
||||||
}
|
}
|
||||||
|
|
|
@ -86,7 +86,7 @@ public class ITestAzureBlobFileSystemRandomRead extends
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testBasicRead() throws Exception {
|
public void testBasicRead() throws Exception {
|
||||||
Path testPath = new Path(TEST_FILE_PREFIX + "_testBasicRead");
|
Path testPath = path(TEST_FILE_PREFIX + "_testBasicRead");
|
||||||
assumeHugeFileExists(testPath);
|
assumeHugeFileExists(testPath);
|
||||||
|
|
||||||
try (FSDataInputStream inputStream = this.getFileSystem().open(testPath)) {
|
try (FSDataInputStream inputStream = this.getFileSystem().open(testPath)) {
|
||||||
|
@ -115,7 +115,7 @@ public class ITestAzureBlobFileSystemRandomRead extends
|
||||||
public void testRandomRead() throws Exception {
|
public void testRandomRead() throws Exception {
|
||||||
Assume.assumeFalse("This test does not support namespace enabled account",
|
Assume.assumeFalse("This test does not support namespace enabled account",
|
||||||
getIsNamespaceEnabled(getFileSystem()));
|
getIsNamespaceEnabled(getFileSystem()));
|
||||||
Path testPath = new Path(TEST_FILE_PREFIX + "_testRandomRead");
|
Path testPath = path(TEST_FILE_PREFIX + "_testRandomRead");
|
||||||
assumeHugeFileExists(testPath);
|
assumeHugeFileExists(testPath);
|
||||||
|
|
||||||
try (
|
try (
|
||||||
|
@ -174,7 +174,7 @@ public class ITestAzureBlobFileSystemRandomRead extends
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testSeekToNewSource() throws Exception {
|
public void testSeekToNewSource() throws Exception {
|
||||||
Path testPath = new Path(TEST_FILE_PREFIX + "_testSeekToNewSource");
|
Path testPath = path(TEST_FILE_PREFIX + "_testSeekToNewSource");
|
||||||
assumeHugeFileExists(testPath);
|
assumeHugeFileExists(testPath);
|
||||||
|
|
||||||
try (FSDataInputStream inputStream = this.getFileSystem().open(testPath)) {
|
try (FSDataInputStream inputStream = this.getFileSystem().open(testPath)) {
|
||||||
|
@ -189,7 +189,7 @@ public class ITestAzureBlobFileSystemRandomRead extends
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testSkipBounds() throws Exception {
|
public void testSkipBounds() throws Exception {
|
||||||
Path testPath = new Path(TEST_FILE_PREFIX + "_testSkipBounds");
|
Path testPath = path(TEST_FILE_PREFIX + "_testSkipBounds");
|
||||||
long testFileLength = assumeHugeFileExists(testPath);
|
long testFileLength = assumeHugeFileExists(testPath);
|
||||||
|
|
||||||
try (FSDataInputStream inputStream = this.getFileSystem().open(testPath)) {
|
try (FSDataInputStream inputStream = this.getFileSystem().open(testPath)) {
|
||||||
|
@ -230,7 +230,7 @@ public class ITestAzureBlobFileSystemRandomRead extends
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testValidateSeekBounds() throws Exception {
|
public void testValidateSeekBounds() throws Exception {
|
||||||
Path testPath = new Path(TEST_FILE_PREFIX + "_testValidateSeekBounds");
|
Path testPath = path(TEST_FILE_PREFIX + "_testValidateSeekBounds");
|
||||||
long testFileLength = assumeHugeFileExists(testPath);
|
long testFileLength = assumeHugeFileExists(testPath);
|
||||||
|
|
||||||
try (FSDataInputStream inputStream = this.getFileSystem().open(testPath)) {
|
try (FSDataInputStream inputStream = this.getFileSystem().open(testPath)) {
|
||||||
|
@ -281,7 +281,7 @@ public class ITestAzureBlobFileSystemRandomRead extends
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testSeekAndAvailableAndPosition() throws Exception {
|
public void testSeekAndAvailableAndPosition() throws Exception {
|
||||||
Path testPath = new Path(TEST_FILE_PREFIX + "_testSeekAndAvailableAndPosition");
|
Path testPath = path(TEST_FILE_PREFIX + "_testSeekAndAvailableAndPosition");
|
||||||
long testFileLength = assumeHugeFileExists(testPath);
|
long testFileLength = assumeHugeFileExists(testPath);
|
||||||
|
|
||||||
try (FSDataInputStream inputStream = this.getFileSystem().open(testPath)) {
|
try (FSDataInputStream inputStream = this.getFileSystem().open(testPath)) {
|
||||||
|
@ -347,7 +347,7 @@ public class ITestAzureBlobFileSystemRandomRead extends
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testSkipAndAvailableAndPosition() throws Exception {
|
public void testSkipAndAvailableAndPosition() throws Exception {
|
||||||
Path testPath = new Path(TEST_FILE_PREFIX + "_testSkipAndAvailableAndPosition");
|
Path testPath = path(TEST_FILE_PREFIX + "_testSkipAndAvailableAndPosition");
|
||||||
long testFileLength = assumeHugeFileExists(testPath);
|
long testFileLength = assumeHugeFileExists(testPath);
|
||||||
|
|
||||||
try (FSDataInputStream inputStream = this.getFileSystem().open(testPath)) {
|
try (FSDataInputStream inputStream = this.getFileSystem().open(testPath)) {
|
||||||
|
@ -413,7 +413,8 @@ public class ITestAzureBlobFileSystemRandomRead extends
|
||||||
@Test
|
@Test
|
||||||
public void testSequentialReadAfterReverseSeekPerformance()
|
public void testSequentialReadAfterReverseSeekPerformance()
|
||||||
throws Exception {
|
throws Exception {
|
||||||
Path testPath = new Path(TEST_FILE_PREFIX + "_testSequentialReadAfterReverseSeekPerformance");
|
Path testPath = path(
|
||||||
|
TEST_FILE_PREFIX + "_testSequentialReadAfterReverseSeekPerformance");
|
||||||
assumeHugeFileExists(testPath);
|
assumeHugeFileExists(testPath);
|
||||||
final int maxAttempts = 10;
|
final int maxAttempts = 10;
|
||||||
final double maxAcceptableRatio = 1.01;
|
final double maxAcceptableRatio = 1.01;
|
||||||
|
@ -446,7 +447,7 @@ public class ITestAzureBlobFileSystemRandomRead extends
|
||||||
public void testRandomReadPerformance() throws Exception {
|
public void testRandomReadPerformance() throws Exception {
|
||||||
Assume.assumeFalse("This test does not support namespace enabled account",
|
Assume.assumeFalse("This test does not support namespace enabled account",
|
||||||
getIsNamespaceEnabled(getFileSystem()));
|
getIsNamespaceEnabled(getFileSystem()));
|
||||||
Path testPath = new Path(TEST_FILE_PREFIX + "_testRandomReadPerformance");
|
Path testPath = path(TEST_FILE_PREFIX + "_testRandomReadPerformance");
|
||||||
assumeHugeFileExists(testPath);
|
assumeHugeFileExists(testPath);
|
||||||
|
|
||||||
final AzureBlobFileSystem abFs = this.getFileSystem();
|
final AzureBlobFileSystem abFs = this.getFileSystem();
|
||||||
|
|
|
@ -51,6 +51,7 @@ import static org.mockito.Mockito.when;
|
||||||
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_CLOCK_SKEW_WITH_SERVER_IN_MS;
|
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_CLOCK_SKEW_WITH_SERVER_IN_MS;
|
||||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertMkdirs;
|
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertMkdirs;
|
||||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertPathDoesNotExist;
|
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertPathDoesNotExist;
|
||||||
|
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertPathExists;
|
||||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertRenameOutcome;
|
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertRenameOutcome;
|
||||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertIsFile;
|
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertIsFile;
|
||||||
|
|
||||||
|
@ -95,13 +96,13 @@ public class ITestAzureBlobFileSystemRename extends
|
||||||
@Test
|
@Test
|
||||||
public void testRenameFileUnderDir() throws Exception {
|
public void testRenameFileUnderDir() throws Exception {
|
||||||
final AzureBlobFileSystem fs = getFileSystem();
|
final AzureBlobFileSystem fs = getFileSystem();
|
||||||
Path sourceDir = new Path("/testSrc");
|
Path sourceDir = path("/testSrc");
|
||||||
assertMkdirs(fs, sourceDir);
|
assertMkdirs(fs, sourceDir);
|
||||||
String filename = "file1";
|
String filename = "file1";
|
||||||
Path file1 = new Path(sourceDir, filename);
|
Path file1 = new Path(sourceDir, filename);
|
||||||
touch(file1);
|
touch(file1);
|
||||||
|
|
||||||
Path destDir = new Path("/testDst");
|
Path destDir = path("/testDst");
|
||||||
assertRenameOutcome(fs, sourceDir, destDir, true);
|
assertRenameOutcome(fs, sourceDir, destDir, true);
|
||||||
FileStatus[] fileStatus = fs.listStatus(destDir);
|
FileStatus[] fileStatus = fs.listStatus(destDir);
|
||||||
assertNotNull("Null file status", fileStatus);
|
assertNotNull("Null file status", fileStatus);
|
||||||
|
@ -113,14 +114,15 @@ public class ITestAzureBlobFileSystemRename extends
|
||||||
@Test
|
@Test
|
||||||
public void testRenameDirectory() throws Exception {
|
public void testRenameDirectory() throws Exception {
|
||||||
final AzureBlobFileSystem fs = getFileSystem();
|
final AzureBlobFileSystem fs = getFileSystem();
|
||||||
fs.mkdirs(new Path("testDir"));
|
Path testDir = path("testDir");
|
||||||
Path test1 = new Path("testDir/test1");
|
fs.mkdirs(testDir);
|
||||||
|
Path test1 = new Path(testDir + "/test1");
|
||||||
fs.mkdirs(test1);
|
fs.mkdirs(test1);
|
||||||
fs.mkdirs(new Path("testDir/test1/test2"));
|
fs.mkdirs(new Path(testDir + "/test1/test2"));
|
||||||
fs.mkdirs(new Path("testDir/test1/test2/test3"));
|
fs.mkdirs(new Path(testDir + "/test1/test2/test3"));
|
||||||
|
|
||||||
assertRenameOutcome(fs, test1,
|
assertRenameOutcome(fs, test1,
|
||||||
new Path("testDir/test10"), true);
|
new Path(testDir + "/test10"), true);
|
||||||
assertPathDoesNotExist(fs, "rename source dir", test1);
|
assertPathDoesNotExist(fs, "rename source dir", test1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -130,8 +132,9 @@ public class ITestAzureBlobFileSystemRename extends
|
||||||
final List<Future<Void>> tasks = new ArrayList<>();
|
final List<Future<Void>> tasks = new ArrayList<>();
|
||||||
|
|
||||||
ExecutorService es = Executors.newFixedThreadPool(10);
|
ExecutorService es = Executors.newFixedThreadPool(10);
|
||||||
|
Path source = path("/test");
|
||||||
for (int i = 0; i < 1000; i++) {
|
for (int i = 0; i < 1000; i++) {
|
||||||
final Path fileName = new Path("/test/" + i);
|
final Path fileName = new Path(source + "/" + i);
|
||||||
Callable<Void> callable = new Callable<Void>() {
|
Callable<Void> callable = new Callable<Void>() {
|
||||||
@Override
|
@Override
|
||||||
public Void call() throws Exception {
|
public Void call() throws Exception {
|
||||||
|
@ -148,8 +151,7 @@ public class ITestAzureBlobFileSystemRename extends
|
||||||
}
|
}
|
||||||
|
|
||||||
es.shutdownNow();
|
es.shutdownNow();
|
||||||
Path source = new Path("/test");
|
Path dest = path("/renamedDir");
|
||||||
Path dest = new Path("/renamedDir");
|
|
||||||
assertRenameOutcome(fs, source, dest, true);
|
assertRenameOutcome(fs, source, dest, true);
|
||||||
|
|
||||||
FileStatus[] files = fs.listStatus(dest);
|
FileStatus[] files = fs.listStatus(dest);
|
||||||
|
@ -173,14 +175,19 @@ public class ITestAzureBlobFileSystemRename extends
|
||||||
@Test
|
@Test
|
||||||
public void testPosixRenameDirectory() throws Exception {
|
public void testPosixRenameDirectory() throws Exception {
|
||||||
final AzureBlobFileSystem fs = this.getFileSystem();
|
final AzureBlobFileSystem fs = this.getFileSystem();
|
||||||
fs.mkdirs(new Path("testDir2/test1/test2/test3"));
|
Path testDir2 = path("testDir2");
|
||||||
fs.mkdirs(new Path("testDir2/test4"));
|
fs.mkdirs(new Path(testDir2 + "/test1/test2/test3"));
|
||||||
Assert.assertTrue(fs.rename(new Path("testDir2/test1/test2/test3"), new Path("testDir2/test4")));
|
fs.mkdirs(new Path(testDir2 + "/test4"));
|
||||||
assertTrue(fs.exists(new Path("testDir2")));
|
Assert.assertTrue(fs.rename(new Path(testDir2 + "/test1/test2/test3"), new Path(testDir2 + "/test4")));
|
||||||
assertTrue(fs.exists(new Path("testDir2/test1/test2")));
|
assertPathExists(fs, "This path should exist", testDir2);
|
||||||
assertTrue(fs.exists(new Path("testDir2/test4")));
|
assertPathExists(fs, "This path should exist",
|
||||||
assertTrue(fs.exists(new Path("testDir2/test4/test3")));
|
new Path(testDir2 + "/test1/test2"));
|
||||||
assertFalse(fs.exists(new Path("testDir2/test1/test2/test3")));
|
assertPathExists(fs, "This path should exist",
|
||||||
|
new Path(testDir2 + "/test4"));
|
||||||
|
assertPathExists(fs, "This path should exist",
|
||||||
|
new Path(testDir2 + "/test4/test3"));
|
||||||
|
assertPathDoesNotExist(fs, "This path should not exist",
|
||||||
|
new Path(testDir2 + "/test1/test2/test3"));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -306,7 +313,7 @@ public class ITestAzureBlobFileSystemRename extends
|
||||||
when(op.getResult()).thenReturn(http400Op);
|
when(op.getResult()).thenReturn(http400Op);
|
||||||
} else if (renameRequestStatus == HTTP_NOT_FOUND) {
|
} else if (renameRequestStatus == HTTP_NOT_FOUND) {
|
||||||
// Create the file new.
|
// Create the file new.
|
||||||
fs.create(destinationPath);
|
fs.create(destinationPath).close();
|
||||||
when(op.getResult()).thenReturn(http404Op);
|
when(op.getResult()).thenReturn(http404Op);
|
||||||
|
|
||||||
if (isOldOp) {
|
if (isOldOp) {
|
||||||
|
|
|
@ -76,7 +76,7 @@ public class ITestAzureBlobFileSystemRenameUnicode extends
|
||||||
@Test
|
@Test
|
||||||
public void testRenameFileUsingUnicode() throws Exception {
|
public void testRenameFileUsingUnicode() throws Exception {
|
||||||
final AzureBlobFileSystem fs = getFileSystem();
|
final AzureBlobFileSystem fs = getFileSystem();
|
||||||
Path folderPath1 = new Path(srcDir);
|
Path folderPath1 = path(srcDir);
|
||||||
assertMkdirs(fs, folderPath1);
|
assertMkdirs(fs, folderPath1);
|
||||||
assertIsDirectory(fs, folderPath1);
|
assertIsDirectory(fs, folderPath1);
|
||||||
Path filePath = new Path(folderPath1 + "/" + filename);
|
Path filePath = new Path(folderPath1 + "/" + filename);
|
||||||
|
|
|
@ -40,6 +40,7 @@ import org.apache.hadoop.fs.permission.AclStatus;
|
||||||
import org.apache.hadoop.fs.permission.FsAction;
|
import org.apache.hadoop.fs.permission.FsAction;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertPathExists;
|
||||||
import static org.junit.Assume.assumeTrue;
|
import static org.junit.Assume.assumeTrue;
|
||||||
|
|
||||||
import static org.apache.hadoop.fs.permission.AclEntryScope.ACCESS;
|
import static org.apache.hadoop.fs.permission.AclEntryScope.ACCESS;
|
||||||
|
@ -1297,7 +1298,7 @@ public class ITestAzureBlobFilesystemAcl extends AbstractAbfsIntegrationTest {
|
||||||
final Path filePath = new Path(methodName.getMethodName());
|
final Path filePath = new Path(methodName.getMethodName());
|
||||||
fs.create(filePath);
|
fs.create(filePath);
|
||||||
|
|
||||||
assertTrue(fs.exists(filePath));
|
assertPathExists(fs, "This path should exist", filePath);
|
||||||
|
|
||||||
TracingHeaderValidator tracingHeaderValidator = new TracingHeaderValidator(
|
TracingHeaderValidator tracingHeaderValidator = new TracingHeaderValidator(
|
||||||
conf.getClientCorrelationId(), fs.getFileSystemId(),
|
conf.getClientCorrelationId(), fs.getFileSystemId(),
|
||||||
|
@ -1320,7 +1321,7 @@ public class ITestAzureBlobFilesystemAcl extends AbstractAbfsIntegrationTest {
|
||||||
final Path filePath = new Path(methodName.getMethodName());
|
final Path filePath = new Path(methodName.getMethodName());
|
||||||
fs.create(filePath);
|
fs.create(filePath);
|
||||||
|
|
||||||
assertTrue(fs.exists(filePath));
|
assertPathExists(fs, "This path should exist", filePath);
|
||||||
FsPermission oldPermission = fs.getFileStatus(filePath).getPermission();
|
FsPermission oldPermission = fs.getFileStatus(filePath).getPermission();
|
||||||
// default permission for non-namespace enabled account is "777"
|
// default permission for non-namespace enabled account is "777"
|
||||||
FsPermission newPermission = new FsPermission("557");
|
FsPermission newPermission = new FsPermission("557");
|
||||||
|
|
|
@ -33,6 +33,7 @@ import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
|
import java.util.UUID;
|
||||||
|
|
||||||
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
|
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
|
||||||
import org.apache.hadoop.fs.contract.ContractTestUtils;
|
import org.apache.hadoop.fs.contract.ContractTestUtils;
|
||||||
|
@ -107,7 +108,7 @@ public class ITestCustomerProvidedKey extends AbstractAbfsIntegrationTest {
|
||||||
@Test
|
@Test
|
||||||
public void testReadWithCPK() throws Exception {
|
public void testReadWithCPK() throws Exception {
|
||||||
final AzureBlobFileSystem fs = getAbfs(true);
|
final AzureBlobFileSystem fs = getAbfs(true);
|
||||||
String fileName = "/" + methodName.getMethodName();
|
String fileName = path("/" + methodName.getMethodName()).toString();
|
||||||
createFileAndGetContent(fs, fileName, FILE_SIZE);
|
createFileAndGetContent(fs, fileName, FILE_SIZE);
|
||||||
|
|
||||||
AbfsClient abfsClient = fs.getAbfsClient();
|
AbfsClient abfsClient = fs.getAbfsClient();
|
||||||
|
@ -157,7 +158,7 @@ public class ITestCustomerProvidedKey extends AbstractAbfsIntegrationTest {
|
||||||
@Test
|
@Test
|
||||||
public void testReadWithoutCPK() throws Exception {
|
public void testReadWithoutCPK() throws Exception {
|
||||||
final AzureBlobFileSystem fs = getAbfs(false);
|
final AzureBlobFileSystem fs = getAbfs(false);
|
||||||
String fileName = "/" + methodName.getMethodName();
|
String fileName = path("/" + methodName.getMethodName()).toString();
|
||||||
createFileAndGetContent(fs, fileName, FILE_SIZE);
|
createFileAndGetContent(fs, fileName, FILE_SIZE);
|
||||||
|
|
||||||
AbfsClient abfsClient = fs.getAbfsClient();
|
AbfsClient abfsClient = fs.getAbfsClient();
|
||||||
|
@ -196,7 +197,7 @@ public class ITestCustomerProvidedKey extends AbstractAbfsIntegrationTest {
|
||||||
@Test
|
@Test
|
||||||
public void testAppendWithCPK() throws Exception {
|
public void testAppendWithCPK() throws Exception {
|
||||||
final AzureBlobFileSystem fs = getAbfs(true);
|
final AzureBlobFileSystem fs = getAbfs(true);
|
||||||
final String fileName = "/" + methodName.getMethodName();
|
final String fileName = path("/" + methodName.getMethodName()).toString();
|
||||||
createFileAndGetContent(fs, fileName, FILE_SIZE);
|
createFileAndGetContent(fs, fileName, FILE_SIZE);
|
||||||
|
|
||||||
// Trying to append with correct CPK headers
|
// Trying to append with correct CPK headers
|
||||||
|
@ -241,7 +242,7 @@ public class ITestCustomerProvidedKey extends AbstractAbfsIntegrationTest {
|
||||||
@Test
|
@Test
|
||||||
public void testAppendWithoutCPK() throws Exception {
|
public void testAppendWithoutCPK() throws Exception {
|
||||||
final AzureBlobFileSystem fs = getAbfs(false);
|
final AzureBlobFileSystem fs = getAbfs(false);
|
||||||
final String fileName = "/" + methodName.getMethodName();
|
final String fileName = path("/" + methodName.getMethodName()).toString();
|
||||||
createFileAndGetContent(fs, fileName, FILE_SIZE);
|
createFileAndGetContent(fs, fileName, FILE_SIZE);
|
||||||
|
|
||||||
// Trying to append without CPK headers
|
// Trying to append without CPK headers
|
||||||
|
@ -277,7 +278,7 @@ public class ITestCustomerProvidedKey extends AbstractAbfsIntegrationTest {
|
||||||
@Test
|
@Test
|
||||||
public void testSetGetXAttr() throws Exception {
|
public void testSetGetXAttr() throws Exception {
|
||||||
final AzureBlobFileSystem fs = getAbfs(true);
|
final AzureBlobFileSystem fs = getAbfs(true);
|
||||||
String fileName = methodName.getMethodName();
|
final String fileName = path(methodName.getMethodName()).toString();
|
||||||
createFileAndGetContent(fs, fileName, FILE_SIZE);
|
createFileAndGetContent(fs, fileName, FILE_SIZE);
|
||||||
|
|
||||||
String valSent = "testValue";
|
String valSent = "testValue";
|
||||||
|
@ -325,7 +326,8 @@ public class ITestCustomerProvidedKey extends AbstractAbfsIntegrationTest {
|
||||||
AzureBlobFileSystem fs1 = getAbfs(true);
|
AzureBlobFileSystem fs1 = getAbfs(true);
|
||||||
int fileSize = FILE_SIZE_FOR_COPY_BETWEEN_ACCOUNTS;
|
int fileSize = FILE_SIZE_FOR_COPY_BETWEEN_ACCOUNTS;
|
||||||
byte[] fileContent = getRandomBytesArray(fileSize);
|
byte[] fileContent = getRandomBytesArray(fileSize);
|
||||||
Path testFilePath = createFileWithContent(fs1, "fs1-file.txt", fileContent);
|
Path testFilePath = createFileWithContent(fs1,
|
||||||
|
String.format("fs1-file%s.txt", UUID.randomUUID()), fileContent);
|
||||||
|
|
||||||
// Create fs2 with different CPK
|
// Create fs2 with different CPK
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
|
@ -340,7 +342,8 @@ public class ITestCustomerProvidedKey extends AbstractAbfsIntegrationTest {
|
||||||
AzureBlobFileSystem fs2 = (AzureBlobFileSystem) FileSystem.newInstance(conf);
|
AzureBlobFileSystem fs2 = (AzureBlobFileSystem) FileSystem.newInstance(conf);
|
||||||
|
|
||||||
// Read from fs1 and write to fs2, fs1 and fs2 are having different CPK
|
// Read from fs1 and write to fs2, fs1 and fs2 are having different CPK
|
||||||
Path fs2DestFilePath = new Path("fs2-dest-file.txt");
|
Path fs2DestFilePath = new Path(
|
||||||
|
String.format("fs2-dest-file%s.txt", UUID.randomUUID()));
|
||||||
FSDataOutputStream ops = fs2.create(fs2DestFilePath);
|
FSDataOutputStream ops = fs2.create(fs2DestFilePath);
|
||||||
try (FSDataInputStream iStream = fs1.open(testFilePath)) {
|
try (FSDataInputStream iStream = fs1.open(testFilePath)) {
|
||||||
long totalBytesRead = 0;
|
long totalBytesRead = 0;
|
||||||
|
@ -408,8 +411,8 @@ public class ITestCustomerProvidedKey extends AbstractAbfsIntegrationTest {
|
||||||
|
|
||||||
private void testListPath(final boolean isWithCPK) throws Exception {
|
private void testListPath(final boolean isWithCPK) throws Exception {
|
||||||
final AzureBlobFileSystem fs = getAbfs(isWithCPK);
|
final AzureBlobFileSystem fs = getAbfs(isWithCPK);
|
||||||
String testDirName = "/" + methodName.getMethodName();
|
final Path testPath = path("/" + methodName.getMethodName());
|
||||||
final Path testPath = new Path(testDirName);
|
String testDirName = testPath.toString();
|
||||||
fs.mkdirs(testPath);
|
fs.mkdirs(testPath);
|
||||||
createFileAndGetContent(fs, testDirName + "/aaa", FILE_SIZE);
|
createFileAndGetContent(fs, testDirName + "/aaa", FILE_SIZE);
|
||||||
createFileAndGetContent(fs, testDirName + "/bbb", FILE_SIZE);
|
createFileAndGetContent(fs, testDirName + "/bbb", FILE_SIZE);
|
||||||
|
@ -468,7 +471,8 @@ public class ITestCustomerProvidedKey extends AbstractAbfsIntegrationTest {
|
||||||
|
|
||||||
private void testCreatePath(final boolean isWithCPK) throws Exception {
|
private void testCreatePath(final boolean isWithCPK) throws Exception {
|
||||||
final AzureBlobFileSystem fs = getAbfs(isWithCPK);
|
final AzureBlobFileSystem fs = getAbfs(isWithCPK);
|
||||||
final String testFileName = "/" + methodName.getMethodName();
|
final String testFileName = path("/" + methodName.getMethodName())
|
||||||
|
.toString();
|
||||||
createFileAndGetContent(fs, testFileName, FILE_SIZE);
|
createFileAndGetContent(fs, testFileName, FILE_SIZE);
|
||||||
|
|
||||||
AbfsClient abfsClient = fs.getAbfsClient();
|
AbfsClient abfsClient = fs.getAbfsClient();
|
||||||
|
@ -511,7 +515,8 @@ public class ITestCustomerProvidedKey extends AbstractAbfsIntegrationTest {
|
||||||
|
|
||||||
private void testRenamePath(final boolean isWithCPK) throws Exception {
|
private void testRenamePath(final boolean isWithCPK) throws Exception {
|
||||||
final AzureBlobFileSystem fs = getAbfs(isWithCPK);
|
final AzureBlobFileSystem fs = getAbfs(isWithCPK);
|
||||||
final String testFileName = "/" + methodName.getMethodName();
|
final String testFileName = path("/" + methodName.getMethodName())
|
||||||
|
.toString();
|
||||||
createFileAndGetContent(fs, testFileName, FILE_SIZE);
|
createFileAndGetContent(fs, testFileName, FILE_SIZE);
|
||||||
|
|
||||||
FileStatus fileStatusBeforeRename = fs
|
FileStatus fileStatusBeforeRename = fs
|
||||||
|
@ -546,15 +551,17 @@ public class ITestCustomerProvidedKey extends AbstractAbfsIntegrationTest {
|
||||||
|
|
||||||
private void testFlush(final boolean isWithCPK) throws Exception {
|
private void testFlush(final boolean isWithCPK) throws Exception {
|
||||||
final AzureBlobFileSystem fs = getAbfs(isWithCPK);
|
final AzureBlobFileSystem fs = getAbfs(isWithCPK);
|
||||||
final String testFileName = "/" + methodName.getMethodName();
|
final String testFileName = path("/" + methodName.getMethodName())
|
||||||
fs.create(new Path(testFileName));
|
.toString();
|
||||||
|
fs.create(new Path(testFileName)).close();
|
||||||
AbfsClient abfsClient = fs.getAbfsClient();
|
AbfsClient abfsClient = fs.getAbfsClient();
|
||||||
String expectedCPKSha = getCPKSha(fs);
|
String expectedCPKSha = getCPKSha(fs);
|
||||||
|
|
||||||
byte[] fileContent = getRandomBytesArray(FILE_SIZE);
|
byte[] fileContent = getRandomBytesArray(FILE_SIZE);
|
||||||
Path testFilePath = new Path(testFileName + "1");
|
Path testFilePath = new Path(testFileName + "1");
|
||||||
FSDataOutputStream oStream = fs.create(testFilePath);
|
try (FSDataOutputStream oStream = fs.create(testFilePath)) {
|
||||||
oStream.write(fileContent);
|
oStream.write(fileContent);
|
||||||
|
}
|
||||||
|
|
||||||
// Trying to read with different CPK headers
|
// Trying to read with different CPK headers
|
||||||
Configuration conf = fs.getConf();
|
Configuration conf = fs.getConf();
|
||||||
|
@ -605,7 +612,8 @@ public class ITestCustomerProvidedKey extends AbstractAbfsIntegrationTest {
|
||||||
|
|
||||||
private void testSetPathProperties(final boolean isWithCPK) throws Exception {
|
private void testSetPathProperties(final boolean isWithCPK) throws Exception {
|
||||||
final AzureBlobFileSystem fs = getAbfs(isWithCPK);
|
final AzureBlobFileSystem fs = getAbfs(isWithCPK);
|
||||||
final String testFileName = "/" + methodName.getMethodName();
|
final String testFileName = path("/" + methodName.getMethodName())
|
||||||
|
.toString();
|
||||||
createFileAndGetContent(fs, testFileName, FILE_SIZE);
|
createFileAndGetContent(fs, testFileName, FILE_SIZE);
|
||||||
|
|
||||||
AbfsClient abfsClient = fs.getAbfsClient();
|
AbfsClient abfsClient = fs.getAbfsClient();
|
||||||
|
@ -635,7 +643,8 @@ public class ITestCustomerProvidedKey extends AbstractAbfsIntegrationTest {
|
||||||
|
|
||||||
private void testGetPathStatusFile(final boolean isWithCPK) throws Exception {
|
private void testGetPathStatusFile(final boolean isWithCPK) throws Exception {
|
||||||
final AzureBlobFileSystem fs = getAbfs(isWithCPK);
|
final AzureBlobFileSystem fs = getAbfs(isWithCPK);
|
||||||
final String testFileName = "/" + methodName.getMethodName();
|
final String testFileName = path("/" + methodName.getMethodName())
|
||||||
|
.toString();
|
||||||
createFileAndGetContent(fs, testFileName, FILE_SIZE);
|
createFileAndGetContent(fs, testFileName, FILE_SIZE);
|
||||||
|
|
||||||
AbfsClient abfsClient = fs.getAbfsClient();
|
AbfsClient abfsClient = fs.getAbfsClient();
|
||||||
|
@ -672,7 +681,8 @@ public class ITestCustomerProvidedKey extends AbstractAbfsIntegrationTest {
|
||||||
|
|
||||||
private void testDeletePath(final boolean isWithCPK) throws Exception {
|
private void testDeletePath(final boolean isWithCPK) throws Exception {
|
||||||
final AzureBlobFileSystem fs = getAbfs(isWithCPK);
|
final AzureBlobFileSystem fs = getAbfs(isWithCPK);
|
||||||
final String testFileName = "/" + methodName.getMethodName();
|
final String testFileName = path("/" + methodName.getMethodName())
|
||||||
|
.toString();
|
||||||
createFileAndGetContent(fs, testFileName, FILE_SIZE);
|
createFileAndGetContent(fs, testFileName, FILE_SIZE);
|
||||||
|
|
||||||
FileStatus[] listStatuses = fs.listStatus(new Path(testFileName));
|
FileStatus[] listStatuses = fs.listStatus(new Path(testFileName));
|
||||||
|
@ -702,7 +712,8 @@ public class ITestCustomerProvidedKey extends AbstractAbfsIntegrationTest {
|
||||||
|
|
||||||
private void testSetPermission(final boolean isWithCPK) throws Exception {
|
private void testSetPermission(final boolean isWithCPK) throws Exception {
|
||||||
final AzureBlobFileSystem fs = getAbfs(isWithCPK);
|
final AzureBlobFileSystem fs = getAbfs(isWithCPK);
|
||||||
final String testFileName = "/" + methodName.getMethodName();
|
final String testFileName = path("/" + methodName.getMethodName())
|
||||||
|
.toString();
|
||||||
Assume.assumeTrue(fs.getIsNamespaceEnabled(getTestTracingContext(fs, false)));
|
Assume.assumeTrue(fs.getIsNamespaceEnabled(getTestTracingContext(fs, false)));
|
||||||
createFileAndGetContent(fs, testFileName, FILE_SIZE);
|
createFileAndGetContent(fs, testFileName, FILE_SIZE);
|
||||||
AbfsClient abfsClient = fs.getAbfsClient();
|
AbfsClient abfsClient = fs.getAbfsClient();
|
||||||
|
@ -727,7 +738,8 @@ public class ITestCustomerProvidedKey extends AbstractAbfsIntegrationTest {
|
||||||
|
|
||||||
private void testSetAcl(final boolean isWithCPK) throws Exception {
|
private void testSetAcl(final boolean isWithCPK) throws Exception {
|
||||||
final AzureBlobFileSystem fs = getAbfs(isWithCPK);
|
final AzureBlobFileSystem fs = getAbfs(isWithCPK);
|
||||||
final String testFileName = "/" + methodName.getMethodName();
|
final String testFileName = path("/" + methodName.getMethodName())
|
||||||
|
.toString();
|
||||||
TracingContext tracingContext = getTestTracingContext(fs, false);
|
TracingContext tracingContext = getTestTracingContext(fs, false);
|
||||||
Assume.assumeTrue(fs.getIsNamespaceEnabled(tracingContext));
|
Assume.assumeTrue(fs.getIsNamespaceEnabled(tracingContext));
|
||||||
createFileAndGetContent(fs, testFileName, FILE_SIZE);
|
createFileAndGetContent(fs, testFileName, FILE_SIZE);
|
||||||
|
@ -756,7 +768,8 @@ public class ITestCustomerProvidedKey extends AbstractAbfsIntegrationTest {
|
||||||
|
|
||||||
private void testGetAcl(final boolean isWithCPK) throws Exception {
|
private void testGetAcl(final boolean isWithCPK) throws Exception {
|
||||||
final AzureBlobFileSystem fs = getAbfs(isWithCPK);
|
final AzureBlobFileSystem fs = getAbfs(isWithCPK);
|
||||||
final String testFileName = "/" + methodName.getMethodName();
|
final String testFileName = path("/" + methodName.getMethodName())
|
||||||
|
.toString();
|
||||||
TracingContext tracingContext = getTestTracingContext(fs, false);
|
TracingContext tracingContext = getTestTracingContext(fs, false);
|
||||||
Assume.assumeTrue(fs.getIsNamespaceEnabled(tracingContext));
|
Assume.assumeTrue(fs.getIsNamespaceEnabled(tracingContext));
|
||||||
createFileAndGetContent(fs, testFileName, FILE_SIZE);
|
createFileAndGetContent(fs, testFileName, FILE_SIZE);
|
||||||
|
@ -786,8 +799,9 @@ public class ITestCustomerProvidedKey extends AbstractAbfsIntegrationTest {
|
||||||
getAuthType() == AuthType.OAuth);
|
getAuthType() == AuthType.OAuth);
|
||||||
|
|
||||||
final AzureBlobFileSystem fs = getAbfs(isWithCPK);
|
final AzureBlobFileSystem fs = getAbfs(isWithCPK);
|
||||||
final String testFileName = "/" + methodName.getMethodName();
|
final String testFileName = path("/" + methodName.getMethodName())
|
||||||
fs.create(new Path(testFileName));
|
.toString();
|
||||||
|
fs.create(new Path(testFileName)).close();
|
||||||
AbfsClient abfsClient = fs.getAbfsClient();
|
AbfsClient abfsClient = fs.getAbfsClient();
|
||||||
AbfsRestOperation abfsRestOperation = abfsClient
|
AbfsRestOperation abfsRestOperation = abfsClient
|
||||||
.checkAccess(testFileName, "rwx", getTestTracingContext(fs, false));
|
.checkAccess(testFileName, "rwx", getTestTracingContext(fs, false));
|
||||||
|
|
|
@ -33,16 +33,22 @@ import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
|
||||||
*/
|
*/
|
||||||
public class ITestFileSystemProperties extends AbstractAbfsIntegrationTest {
|
public class ITestFileSystemProperties extends AbstractAbfsIntegrationTest {
|
||||||
private static final int TEST_DATA = 100;
|
private static final int TEST_DATA = 100;
|
||||||
private static final Path TEST_PATH = new Path("/testfile");
|
private static final String TEST_PATH = "/testfile";
|
||||||
public ITestFileSystemProperties() throws Exception {
|
public ITestFileSystemProperties() throws Exception {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testReadWriteBytesToFileAndEnsureThreadPoolCleanup() throws Exception {
|
public void testReadWriteBytesToFileAndEnsureThreadPoolCleanup() throws Exception {
|
||||||
final AzureBlobFileSystem fs = getFileSystem();
|
final AzureBlobFileSystem fs = getFileSystem();
|
||||||
testWriteOneByteToFileAndEnsureThreadPoolCleanup();
|
Path testPath = path(TEST_PATH);
|
||||||
|
try(FSDataOutputStream stream = fs.create(testPath)) {
|
||||||
|
stream.write(TEST_DATA);
|
||||||
|
}
|
||||||
|
|
||||||
try(FSDataInputStream inputStream = fs.open(TEST_PATH, 4 * 1024 * 1024)) {
|
FileStatus fileStatus = fs.getFileStatus(testPath);
|
||||||
|
assertEquals(1, fileStatus.getLen());
|
||||||
|
|
||||||
|
try(FSDataInputStream inputStream = fs.open(testPath, 4 * 1024 * 1024)) {
|
||||||
int i = inputStream.read();
|
int i = inputStream.read();
|
||||||
assertEquals(TEST_DATA, i);
|
assertEquals(TEST_DATA, i);
|
||||||
}
|
}
|
||||||
|
@ -51,11 +57,12 @@ public class ITestFileSystemProperties extends AbstractAbfsIntegrationTest {
|
||||||
@Test
|
@Test
|
||||||
public void testWriteOneByteToFileAndEnsureThreadPoolCleanup() throws Exception {
|
public void testWriteOneByteToFileAndEnsureThreadPoolCleanup() throws Exception {
|
||||||
final AzureBlobFileSystem fs = getFileSystem();
|
final AzureBlobFileSystem fs = getFileSystem();
|
||||||
try(FSDataOutputStream stream = fs.create(TEST_PATH)) {
|
Path testPath = path(TEST_PATH);
|
||||||
|
try(FSDataOutputStream stream = fs.create(testPath)) {
|
||||||
stream.write(TEST_DATA);
|
stream.write(TEST_DATA);
|
||||||
}
|
}
|
||||||
|
|
||||||
FileStatus fileStatus = fs.getFileStatus(TEST_PATH);
|
FileStatus fileStatus = fs.getFileStatus(testPath);
|
||||||
assertEquals(1, fileStatus.getLen());
|
assertEquals(1, fileStatus.getLen());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -78,11 +85,12 @@ public class ITestFileSystemProperties extends AbstractAbfsIntegrationTest {
|
||||||
final AzureBlobFileSystem fs = getFileSystem();
|
final AzureBlobFileSystem fs = getFileSystem();
|
||||||
final Hashtable<String, String> properties = new Hashtable<>();
|
final Hashtable<String, String> properties = new Hashtable<>();
|
||||||
properties.put("key", "{ value: valueTest }");
|
properties.put("key", "{ value: valueTest }");
|
||||||
touch(TEST_PATH);
|
Path testPath = path(TEST_PATH);
|
||||||
|
touch(testPath);
|
||||||
TracingContext tracingContext = getTestTracingContext(fs, true);
|
TracingContext tracingContext = getTestTracingContext(fs, true);
|
||||||
fs.getAbfsStore().setPathProperties(TEST_PATH, properties, tracingContext);
|
fs.getAbfsStore().setPathProperties(testPath, properties, tracingContext);
|
||||||
Hashtable<String, String> fetchedProperties = fs.getAbfsStore()
|
Hashtable<String, String> fetchedProperties = fs.getAbfsStore()
|
||||||
.getPathStatus(TEST_PATH, tracingContext);
|
.getPathStatus(testPath, tracingContext);
|
||||||
|
|
||||||
assertEquals(properties, fetchedProperties);
|
assertEquals(properties, fetchedProperties);
|
||||||
}
|
}
|
||||||
|
@ -105,11 +113,12 @@ public class ITestFileSystemProperties extends AbstractAbfsIntegrationTest {
|
||||||
final AzureBlobFileSystem fs = getFileSystem();
|
final AzureBlobFileSystem fs = getFileSystem();
|
||||||
final Hashtable<String, String> properties = new Hashtable<>();
|
final Hashtable<String, String> properties = new Hashtable<>();
|
||||||
properties.put("key", "{ value: valueTest兩 }");
|
properties.put("key", "{ value: valueTest兩 }");
|
||||||
touch(TEST_PATH);
|
Path testPath = path(TEST_PATH);
|
||||||
|
touch(testPath);
|
||||||
TracingContext tracingContext = getTestTracingContext(fs, true);
|
TracingContext tracingContext = getTestTracingContext(fs, true);
|
||||||
fs.getAbfsStore().setPathProperties(TEST_PATH, properties, tracingContext);
|
fs.getAbfsStore().setPathProperties(testPath, properties, tracingContext);
|
||||||
Hashtable<String, String> fetchedProperties = fs.getAbfsStore()
|
Hashtable<String, String> fetchedProperties = fs.getAbfsStore()
|
||||||
.getPathStatus(TEST_PATH, tracingContext);
|
.getPathStatus(testPath, tracingContext);
|
||||||
|
|
||||||
assertEquals(properties, fetchedProperties);
|
assertEquals(properties, fetchedProperties);
|
||||||
}
|
}
|
||||||
|
|
|
@ -62,7 +62,8 @@ public class ITestWasbAbfsCompatibility extends AbstractAbfsIntegrationTest {
|
||||||
|
|
||||||
NativeAzureFileSystem wasb = getWasbFileSystem();
|
NativeAzureFileSystem wasb = getWasbFileSystem();
|
||||||
|
|
||||||
Path path1 = new Path("/testfiles/~12/!008/3/abFsTestfile");
|
Path testFiles = path("/testfiles");
|
||||||
|
Path path1 = new Path(testFiles + "/~12/!008/3/abFsTestfile");
|
||||||
try(FSDataOutputStream abfsStream = fs.create(path1, true)) {
|
try(FSDataOutputStream abfsStream = fs.create(path1, true)) {
|
||||||
abfsStream.write(ABFS_TEST_CONTEXT.getBytes());
|
abfsStream.write(ABFS_TEST_CONTEXT.getBytes());
|
||||||
abfsStream.flush();
|
abfsStream.flush();
|
||||||
|
@ -70,7 +71,7 @@ public class ITestWasbAbfsCompatibility extends AbstractAbfsIntegrationTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
// create file using wasb
|
// create file using wasb
|
||||||
Path path2 = new Path("/testfiles/~12/!008/3/nativeFsTestfile");
|
Path path2 = new Path(testFiles + "/~12/!008/3/nativeFsTestfile");
|
||||||
LOG.info("{}", wasb.getUri());
|
LOG.info("{}", wasb.getUri());
|
||||||
try(FSDataOutputStream nativeFsStream = wasb.create(path2, true)) {
|
try(FSDataOutputStream nativeFsStream = wasb.create(path2, true)) {
|
||||||
nativeFsStream.write(WASB_TEST_CONTEXT.getBytes());
|
nativeFsStream.write(WASB_TEST_CONTEXT.getBytes());
|
||||||
|
@ -78,8 +79,8 @@ public class ITestWasbAbfsCompatibility extends AbstractAbfsIntegrationTest {
|
||||||
nativeFsStream.hsync();
|
nativeFsStream.hsync();
|
||||||
}
|
}
|
||||||
// list file using abfs and wasb
|
// list file using abfs and wasb
|
||||||
FileStatus[] abfsFileStatus = fs.listStatus(new Path("/testfiles/~12/!008/3/"));
|
FileStatus[] abfsFileStatus = fs.listStatus(new Path(testFiles + "/~12/!008/3/"));
|
||||||
FileStatus[] nativeFsFileStatus = wasb.listStatus(new Path("/testfiles/~12/!008/3/"));
|
FileStatus[] nativeFsFileStatus = wasb.listStatus(new Path(testFiles + "/~12/!008/3/"));
|
||||||
|
|
||||||
assertEquals(2, abfsFileStatus.length);
|
assertEquals(2, abfsFileStatus.length);
|
||||||
assertEquals(2, nativeFsFileStatus.length);
|
assertEquals(2, nativeFsFileStatus.length);
|
||||||
|
@ -97,8 +98,9 @@ public class ITestWasbAbfsCompatibility extends AbstractAbfsIntegrationTest {
|
||||||
|
|
||||||
NativeAzureFileSystem wasb = getWasbFileSystem();
|
NativeAzureFileSystem wasb = getWasbFileSystem();
|
||||||
|
|
||||||
|
Path testFile = path("/testReadFile");
|
||||||
for (int i = 0; i< 4; i++) {
|
for (int i = 0; i< 4; i++) {
|
||||||
Path path = new Path("/testReadFile/~12/!008/testfile" + i);
|
Path path = new Path(testFile + "/~12/!008/testfile" + i);
|
||||||
final FileSystem createFs = createFileWithAbfs[i] ? abfs : wasb;
|
final FileSystem createFs = createFileWithAbfs[i] ? abfs : wasb;
|
||||||
|
|
||||||
// Write
|
// Write
|
||||||
|
@ -137,8 +139,9 @@ public class ITestWasbAbfsCompatibility extends AbstractAbfsIntegrationTest {
|
||||||
|
|
||||||
NativeAzureFileSystem wasb = getWasbFileSystem();
|
NativeAzureFileSystem wasb = getWasbFileSystem();
|
||||||
|
|
||||||
|
Path testDir = path("/testDir");
|
||||||
for (int i = 0; i < 4; i++) {
|
for (int i = 0; i < 4; i++) {
|
||||||
Path path = new Path("/testDir/t" + i);
|
Path path = new Path(testDir + "/t" + i);
|
||||||
//create
|
//create
|
||||||
final FileSystem createFs = createDirWithAbfs[i] ? abfs : wasb;
|
final FileSystem createFs = createDirWithAbfs[i] ? abfs : wasb;
|
||||||
assertTrue(createFs.mkdirs(path));
|
assertTrue(createFs.mkdirs(path));
|
||||||
|
@ -172,11 +175,12 @@ public class ITestWasbAbfsCompatibility extends AbstractAbfsIntegrationTest {
|
||||||
|
|
||||||
NativeAzureFileSystem wasb = getWasbFileSystem();
|
NativeAzureFileSystem wasb = getWasbFileSystem();
|
||||||
|
|
||||||
Path d1d4 = new Path("/d1/d2/d3/d4");
|
Path d1 = path("/d1");
|
||||||
|
Path d1d4 = new Path(d1 + "/d2/d3/d4");
|
||||||
assertMkdirs(abfs, d1d4);
|
assertMkdirs(abfs, d1d4);
|
||||||
|
|
||||||
//set working directory to path1
|
//set working directory to path1
|
||||||
Path path1 = new Path("/d1/d2");
|
Path path1 = new Path(d1 + "/d2");
|
||||||
wasb.setWorkingDirectory(path1);
|
wasb.setWorkingDirectory(path1);
|
||||||
abfs.setWorkingDirectory(path1);
|
abfs.setWorkingDirectory(path1);
|
||||||
assertEquals(path1, wasb.getWorkingDirectory());
|
assertEquals(path1, wasb.getWorkingDirectory());
|
||||||
|
|
|
@ -23,7 +23,6 @@ import org.junit.Test;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
import org.apache.hadoop.fs.Path;
|
|
||||||
import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
|
import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
|
||||||
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
|
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
|
||||||
import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys;
|
import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys;
|
||||||
|
@ -32,7 +31,7 @@ import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys;
|
||||||
* Test create operation.
|
* Test create operation.
|
||||||
*/
|
*/
|
||||||
public class ITestAbfsOutputStream extends AbstractAbfsIntegrationTest {
|
public class ITestAbfsOutputStream extends AbstractAbfsIntegrationTest {
|
||||||
private static final Path TEST_FILE_PATH = new Path("testfile");
|
private static final String TEST_FILE_PATH = "testfile";
|
||||||
|
|
||||||
public ITestAbfsOutputStream() throws Exception {
|
public ITestAbfsOutputStream() throws Exception {
|
||||||
super();
|
super();
|
||||||
|
@ -42,7 +41,7 @@ public class ITestAbfsOutputStream extends AbstractAbfsIntegrationTest {
|
||||||
public void testMaxRequestsAndQueueCapacityDefaults() throws Exception {
|
public void testMaxRequestsAndQueueCapacityDefaults() throws Exception {
|
||||||
Configuration conf = getRawConfiguration();
|
Configuration conf = getRawConfiguration();
|
||||||
final AzureBlobFileSystem fs = getFileSystem(conf);
|
final AzureBlobFileSystem fs = getFileSystem(conf);
|
||||||
try (FSDataOutputStream out = fs.create(TEST_FILE_PATH)) {
|
try (FSDataOutputStream out = fs.create(path(TEST_FILE_PATH))) {
|
||||||
AbfsOutputStream stream = (AbfsOutputStream) out.getWrappedStream();
|
AbfsOutputStream stream = (AbfsOutputStream) out.getWrappedStream();
|
||||||
|
|
||||||
int maxConcurrentRequests
|
int maxConcurrentRequests
|
||||||
|
@ -71,19 +70,18 @@ public class ITestAbfsOutputStream extends AbstractAbfsIntegrationTest {
|
||||||
conf.set(ConfigurationKeys.AZURE_WRITE_MAX_REQUESTS_TO_QUEUE,
|
conf.set(ConfigurationKeys.AZURE_WRITE_MAX_REQUESTS_TO_QUEUE,
|
||||||
"" + maxRequestsToQueue);
|
"" + maxRequestsToQueue);
|
||||||
final AzureBlobFileSystem fs = getFileSystem(conf);
|
final AzureBlobFileSystem fs = getFileSystem(conf);
|
||||||
FSDataOutputStream out = fs.create(TEST_FILE_PATH);
|
try (FSDataOutputStream out = fs.create(path(TEST_FILE_PATH))) {
|
||||||
AbfsOutputStream stream = (AbfsOutputStream) out.getWrappedStream();
|
AbfsOutputStream stream = (AbfsOutputStream) out.getWrappedStream();
|
||||||
|
|
||||||
if (stream.isAppendBlobStream()) {
|
if (stream.isAppendBlobStream()) {
|
||||||
maxConcurrentRequests = 1;
|
maxConcurrentRequests = 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
Assertions.assertThat(stream.getMaxConcurrentRequestCount())
|
Assertions.assertThat(stream.getMaxConcurrentRequestCount()).describedAs(
|
||||||
.describedAs("maxConcurrentRequests should be " + maxConcurrentRequests)
|
"maxConcurrentRequests should be " + maxConcurrentRequests).isEqualTo(maxConcurrentRequests);
|
||||||
.isEqualTo(maxConcurrentRequests);
|
Assertions.assertThat(stream.getMaxRequestsThatCanBeQueued()).describedAs("maxRequestsToQueue should be " + maxRequestsToQueue)
|
||||||
Assertions.assertThat(stream.getMaxRequestsThatCanBeQueued())
|
|
||||||
.describedAs("maxRequestsToQueue should be " + maxRequestsToQueue)
|
|
||||||
.isEqualTo(maxRequestsToQueue);
|
.isEqualTo(maxRequestsToQueue);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -583,13 +583,14 @@ public class TestAbfsInputStream extends
|
||||||
Configuration config = getRawConfiguration();
|
Configuration config = getRawConfiguration();
|
||||||
config.unset(FS_AZURE_READ_AHEAD_QUEUE_DEPTH);
|
config.unset(FS_AZURE_READ_AHEAD_QUEUE_DEPTH);
|
||||||
AzureBlobFileSystem fs = getFileSystem(config);
|
AzureBlobFileSystem fs = getFileSystem(config);
|
||||||
Path testFile = new Path("/testFile");
|
Path testFile = path("/testFile");
|
||||||
fs.create(testFile);
|
fs.create(testFile).close();
|
||||||
FSDataInputStream in = fs.open(testFile);
|
FSDataInputStream in = fs.open(testFile);
|
||||||
Assertions.assertThat(
|
Assertions.assertThat(
|
||||||
((AbfsInputStream) in.getWrappedStream()).getReadAheadQueueDepth())
|
((AbfsInputStream) in.getWrappedStream()).getReadAheadQueueDepth())
|
||||||
.describedAs("readahead queue depth should be set to default value 2")
|
.describedAs("readahead queue depth should be set to default value 2")
|
||||||
.isEqualTo(2);
|
.isEqualTo(2);
|
||||||
|
in.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -646,8 +647,7 @@ public class TestAbfsInputStream extends
|
||||||
readAheadBlockSize = readRequestSize;
|
readAheadBlockSize = readRequestSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
Path testPath = new Path(
|
Path testPath = path("/testReadAheadConfigs");
|
||||||
"/testReadAheadConfigs");
|
|
||||||
final AzureBlobFileSystem fs = createTestFile(testPath,
|
final AzureBlobFileSystem fs = createTestFile(testPath,
|
||||||
ALWAYS_READ_BUFFER_SIZE_TEST_FILE_SIZE, config);
|
ALWAYS_READ_BUFFER_SIZE_TEST_FILE_SIZE, config);
|
||||||
byte[] byteBuffer = new byte[ONE_MB];
|
byte[] byteBuffer = new byte[ONE_MB];
|
||||||
|
|
Loading…
Reference in New Issue