HADOOP-12533. Introduce FileNotFoundException in WASB for read and seek API. Contributed by Dushyanth.
(cherry picked from commit 28790692624177d89fb1e4f59e2f83a659fc3089) Conflicts: hadoop-common-project/hadoop-common/CHANGES.txt
This commit is contained in:
parent
d65ca63c68
commit
c260d7ec78
|
@ -699,6 +699,9 @@ Release 2.8.0 - UNRELEASED
|
||||||
HADOOP-9242. Duplicate surefire plugin config in hadoop-common.
|
HADOOP-9242. Duplicate surefire plugin config in hadoop-common.
|
||||||
(Andrey Klochkov via suresh)
|
(Andrey Klochkov via suresh)
|
||||||
|
|
||||||
|
HADOOP-12533. Introduce FileNotFoundException in WASB for read and seek API.
|
||||||
|
(Dushyanth via cnauroth)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
HADOOP-12051. ProtobufRpcEngine.invoke() should use Exception.toString()
|
HADOOP-12051. ProtobufRpcEngine.invoke() should use Exception.toString()
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
package org.apache.hadoop.fs.azure;
|
package org.apache.hadoop.fs.azure;
|
||||||
|
|
||||||
import java.io.DataInputStream;
|
import java.io.DataInputStream;
|
||||||
|
import java.io.EOFException;
|
||||||
import java.io.FileNotFoundException;
|
import java.io.FileNotFoundException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
|
@ -49,6 +50,7 @@ import org.apache.hadoop.fs.BufferedFSInputStream;
|
||||||
import org.apache.hadoop.fs.CreateFlag;
|
import org.apache.hadoop.fs.CreateFlag;
|
||||||
import org.apache.hadoop.fs.FSDataInputStream;
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
|
import org.apache.hadoop.fs.FSExceptionMessages;
|
||||||
import org.apache.hadoop.fs.FSInputStream;
|
import org.apache.hadoop.fs.FSInputStream;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
@ -62,7 +64,6 @@ import org.apache.hadoop.fs.azure.StorageInterface.CloudBlobWrapper;
|
||||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.util.Progressable;
|
import org.apache.hadoop.util.Progressable;
|
||||||
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.codehaus.jackson.JsonNode;
|
import org.codehaus.jackson.JsonNode;
|
||||||
|
@ -74,9 +75,11 @@ import org.codehaus.jackson.map.ObjectMapper;
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.microsoft.azure.storage.AccessCondition;
|
import com.microsoft.azure.storage.AccessCondition;
|
||||||
import com.microsoft.azure.storage.OperationContext;
|
import com.microsoft.azure.storage.OperationContext;
|
||||||
|
import com.microsoft.azure.storage.StorageErrorCode;
|
||||||
import com.microsoft.azure.storage.StorageException;
|
import com.microsoft.azure.storage.StorageException;
|
||||||
import com.microsoft.azure.storage.blob.CloudBlob;
|
import com.microsoft.azure.storage.blob.CloudBlob;
|
||||||
import com.microsoft.azure.storage.core.*;
|
import com.microsoft.azure.storage.StorageErrorCodeStrings;
|
||||||
|
import org.apache.hadoop.io.IOUtils;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A {@link FileSystem} for reading and writing files stored on <a
|
* A {@link FileSystem} for reading and writing files stored on <a
|
||||||
|
@ -88,7 +91,6 @@ import com.microsoft.azure.storage.core.*;
|
||||||
@InterfaceStability.Stable
|
@InterfaceStability.Stable
|
||||||
public class NativeAzureFileSystem extends FileSystem {
|
public class NativeAzureFileSystem extends FileSystem {
|
||||||
private static final int USER_WX_PERMISION = 0300;
|
private static final int USER_WX_PERMISION = 0300;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A description of a folder rename operation, including the source and
|
* A description of a folder rename operation, including the source and
|
||||||
* destination keys, and descriptions of the files in the source folder.
|
* destination keys, and descriptions of the files in the source folder.
|
||||||
|
@ -712,7 +714,7 @@ public class NativeAzureFileSystem extends FileSystem {
|
||||||
* @returns int An integer corresponding to the byte read.
|
* @returns int An integer corresponding to the byte read.
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public synchronized int read() throws IOException {
|
public synchronized int read() throws FileNotFoundException, IOException {
|
||||||
try {
|
try {
|
||||||
int result = 0;
|
int result = 0;
|
||||||
result = in.read();
|
result = in.read();
|
||||||
|
@ -726,12 +728,20 @@ public class NativeAzureFileSystem extends FileSystem {
|
||||||
//
|
//
|
||||||
return result;
|
return result;
|
||||||
} catch(IOException e) {
|
} catch(IOException e) {
|
||||||
if (e.getCause() instanceof StorageException) {
|
|
||||||
StorageException storageExcp = (StorageException) e.getCause();
|
Throwable innerException = checkForAzureStorageException(e);
|
||||||
|
|
||||||
|
if (innerException instanceof StorageException) {
|
||||||
|
|
||||||
LOG.error("Encountered Storage Exception for read on Blob : {}"
|
LOG.error("Encountered Storage Exception for read on Blob : {}"
|
||||||
+ " Exception details: {} Error Code : {}",
|
+ " Exception details: {} Error Code : {}",
|
||||||
key, e.getMessage(), storageExcp.getErrorCode());
|
key, e, ((StorageException) innerException).getErrorCode());
|
||||||
|
|
||||||
|
if (isFileNotFoundException((StorageException) innerException)) {
|
||||||
|
throw new FileNotFoundException(String.format("%s is not found", key));
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
throw e;
|
throw e;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -757,7 +767,7 @@ public class NativeAzureFileSystem extends FileSystem {
|
||||||
* there is no more data because the end of stream is reached.
|
* there is no more data because the end of stream is reached.
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public synchronized int read(byte[] b, int off, int len) throws IOException {
|
public synchronized int read(byte[] b, int off, int len) throws FileNotFoundException, IOException {
|
||||||
try {
|
try {
|
||||||
int result = 0;
|
int result = 0;
|
||||||
result = in.read(b, off, len);
|
result = in.read(b, off, len);
|
||||||
|
@ -772,29 +782,56 @@ public class NativeAzureFileSystem extends FileSystem {
|
||||||
// Return to the caller with the result.
|
// Return to the caller with the result.
|
||||||
return result;
|
return result;
|
||||||
} catch(IOException e) {
|
} catch(IOException e) {
|
||||||
if (e.getCause() instanceof StorageException) {
|
|
||||||
StorageException storageExcp = (StorageException) e.getCause();
|
Throwable innerException = checkForAzureStorageException(e);
|
||||||
|
|
||||||
|
if (innerException instanceof StorageException) {
|
||||||
|
|
||||||
LOG.error("Encountered Storage Exception for read on Blob : {}"
|
LOG.error("Encountered Storage Exception for read on Blob : {}"
|
||||||
+ " Exception details: {} Error Code : {}",
|
+ " Exception details: {} Error Code : {}",
|
||||||
key, e.getMessage(), storageExcp.getErrorCode());
|
key, e, ((StorageException) innerException).getErrorCode());
|
||||||
|
|
||||||
|
if (isFileNotFoundException((StorageException) innerException)) {
|
||||||
|
throw new FileNotFoundException(String.format("%s is not found", key));
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
throw e;
|
throw e;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() throws IOException {
|
public synchronized void close() throws IOException {
|
||||||
in.close();
|
if (!closed) {
|
||||||
closed = true;
|
closed = true;
|
||||||
|
IOUtils.closeStream(in);
|
||||||
|
in = null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized void seek(long pos) throws IOException {
|
public synchronized void seek(long pos) throws FileNotFoundException, EOFException, IOException {
|
||||||
in.close();
|
try {
|
||||||
|
checkNotClosed();
|
||||||
|
if (pos < 0) {
|
||||||
|
throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK);
|
||||||
|
}
|
||||||
|
IOUtils.closeStream(in);
|
||||||
in = store.retrieve(key);
|
in = store.retrieve(key);
|
||||||
this.pos = in.skip(pos);
|
this.pos = in.skip(pos);
|
||||||
LOG.debug("Seek to position {}. Bytes skipped {}", pos,
|
LOG.debug("Seek to position {}. Bytes skipped {}", pos,
|
||||||
this.pos);
|
this.pos);
|
||||||
|
} catch(IOException e) {
|
||||||
|
|
||||||
|
Throwable innerException = checkForAzureStorageException(e);
|
||||||
|
|
||||||
|
if (innerException instanceof StorageException
|
||||||
|
&& isFileNotFoundException((StorageException) innerException)) {
|
||||||
|
throw new FileNotFoundException(String.format("%s is not found", key));
|
||||||
|
}
|
||||||
|
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -806,6 +843,50 @@ public class NativeAzureFileSystem extends FileSystem {
|
||||||
public boolean seekToNewSource(long targetPos) throws IOException {
|
public boolean seekToNewSource(long targetPos) throws IOException {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Helper method to recursively check if the cause of the exception is
|
||||||
|
* a Azure storage exception.
|
||||||
|
*/
|
||||||
|
private Throwable checkForAzureStorageException(IOException e) {
|
||||||
|
|
||||||
|
Throwable innerException = e.getCause();
|
||||||
|
|
||||||
|
while (innerException != null
|
||||||
|
&& !(innerException instanceof StorageException)) {
|
||||||
|
innerException = innerException.getCause();
|
||||||
|
}
|
||||||
|
|
||||||
|
return innerException;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Helper method to check if the AzureStorageException is
|
||||||
|
* because backing blob was not found.
|
||||||
|
*/
|
||||||
|
private boolean isFileNotFoundException(StorageException e) {
|
||||||
|
|
||||||
|
String errorCode = ((StorageException) e).getErrorCode();
|
||||||
|
if (errorCode != null
|
||||||
|
&& (errorCode.equals(StorageErrorCodeStrings.BLOB_NOT_FOUND)
|
||||||
|
|| errorCode.equals(StorageErrorCodeStrings.RESOURCE_NOT_FOUND)
|
||||||
|
|| errorCode.equals(StorageErrorCode.BLOB_NOT_FOUND.toString())
|
||||||
|
|| errorCode.equals(StorageErrorCode.RESOURCE_NOT_FOUND.toString()))) {
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Helper method to check if a stream is closed.
|
||||||
|
*/
|
||||||
|
private void checkNotClosed() throws IOException {
|
||||||
|
if (closed) {
|
||||||
|
throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private class NativeAzureFsOutputStream extends OutputStream {
|
private class NativeAzureFsOutputStream extends OutputStream {
|
||||||
|
|
|
@ -0,0 +1,131 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.fs.azure;
|
||||||
|
|
||||||
|
import java.io.FileNotFoundException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
|
||||||
|
public class TestFileSystemOperationExceptionHandling extends
|
||||||
|
NativeAzureFileSystemBaseTest {
|
||||||
|
|
||||||
|
FSDataInputStream inputStream = null;
|
||||||
|
/*
|
||||||
|
* Helper method to create a PageBlob test storage account.
|
||||||
|
*/
|
||||||
|
private AzureBlobStorageTestAccount getPageBlobTestStorageAccount()
|
||||||
|
throws Exception {
|
||||||
|
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
|
||||||
|
// Configure the page blob directories key so every file created is a page blob.
|
||||||
|
conf.set(AzureNativeFileSystemStore.KEY_PAGE_BLOB_DIRECTORIES, "/");
|
||||||
|
|
||||||
|
// Configure the atomic rename directories key so every folder will have
|
||||||
|
// atomic rename applied.
|
||||||
|
conf.set(AzureNativeFileSystemStore.KEY_ATOMIC_RENAME_DIRECTORIES, "/");
|
||||||
|
return AzureBlobStorageTestAccount.create(conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Helper method that creates a InputStream to validate exceptions
|
||||||
|
* for various scenarios
|
||||||
|
*/
|
||||||
|
private void setupInputStreamToTest(AzureBlobStorageTestAccount testAccount)
|
||||||
|
throws Exception {
|
||||||
|
|
||||||
|
fs = testAccount.getFileSystem();
|
||||||
|
|
||||||
|
// Step 1: Create a file and write dummy data.
|
||||||
|
Path testFilePath1 = new Path("test1.dat");
|
||||||
|
Path testFilePath2 = new Path("test2.dat");
|
||||||
|
FSDataOutputStream outputStream = fs.create(testFilePath1);
|
||||||
|
String testString = "This is a test string";
|
||||||
|
outputStream.write(testString.getBytes());
|
||||||
|
outputStream.close();
|
||||||
|
|
||||||
|
// Step 2: Open a read stream on the file.
|
||||||
|
inputStream = fs.open(testFilePath1);
|
||||||
|
|
||||||
|
// Step 3: Rename the file
|
||||||
|
fs.rename(testFilePath1, testFilePath2);
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Tests a basic single threaded read scenario for Page blobs.
|
||||||
|
*/
|
||||||
|
@Test(expected=FileNotFoundException.class)
|
||||||
|
public void testSingleThreadedPageBlobReadScenario() throws Throwable {
|
||||||
|
AzureBlobStorageTestAccount testAccount = getPageBlobTestStorageAccount();
|
||||||
|
setupInputStreamToTest(testAccount);
|
||||||
|
byte[] readBuffer = new byte[512];
|
||||||
|
inputStream.read(readBuffer);
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Tests a basic single threaded seek scenario for Page blobs.
|
||||||
|
*/
|
||||||
|
@Test(expected=FileNotFoundException.class)
|
||||||
|
public void testSingleThreadedPageBlobSeekScenario() throws Throwable {
|
||||||
|
AzureBlobStorageTestAccount testAccount = getPageBlobTestStorageAccount();
|
||||||
|
setupInputStreamToTest(testAccount);
|
||||||
|
inputStream.seek(5);
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Test a basic single thread seek scenario for Block blobs.
|
||||||
|
*/
|
||||||
|
@Test(expected=FileNotFoundException.class)
|
||||||
|
public void testSingleThreadBlockBlobSeekScenario() throws Throwable {
|
||||||
|
|
||||||
|
AzureBlobStorageTestAccount testAccount = createTestAccount();
|
||||||
|
setupInputStreamToTest(testAccount);
|
||||||
|
inputStream.seek(5);
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Tests a basic single threaded read scenario for Block blobs.
|
||||||
|
*/
|
||||||
|
@Test(expected=FileNotFoundException.class)
|
||||||
|
public void testSingledThreadBlockBlobReadScenario() throws Throwable{
|
||||||
|
AzureBlobStorageTestAccount testAccount = createTestAccount();
|
||||||
|
setupInputStreamToTest(testAccount);
|
||||||
|
byte[] readBuffer = new byte[512];
|
||||||
|
inputStream.read(readBuffer);
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() throws Exception {
|
||||||
|
if (inputStream != null) {
|
||||||
|
inputStream.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected AzureBlobStorageTestAccount createTestAccount() throws Exception {
|
||||||
|
return AzureBlobStorageTestAccount.create();
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,185 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.fs.azure;
|
||||||
|
|
||||||
|
import java.io.FileNotFoundException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class TestFileSystemOperationsExceptionHandlingMultiThreaded extends
|
||||||
|
NativeAzureFileSystemBaseTest {
|
||||||
|
|
||||||
|
FSDataInputStream inputStream = null;
|
||||||
|
/*
|
||||||
|
* Helper method to creates an input stream to test various scenarios.
|
||||||
|
*/
|
||||||
|
private void getInputStreamToTest(FileSystem fs, Path testPath) throws Throwable {
|
||||||
|
|
||||||
|
FSDataOutputStream outputStream = fs.create(testPath);
|
||||||
|
String testString = "This is a test string";
|
||||||
|
outputStream.write(testString.getBytes());
|
||||||
|
outputStream.close();
|
||||||
|
|
||||||
|
inputStream = fs.open(testPath);
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Test to validate correct exception is thrown for Multithreaded read
|
||||||
|
* scenario for block blobs
|
||||||
|
*/
|
||||||
|
@Test(expected=FileNotFoundException.class)
|
||||||
|
public void testMultiThreadedBlockBlobReadScenario() throws Throwable {
|
||||||
|
|
||||||
|
AzureBlobStorageTestAccount testAccount = createTestAccount();
|
||||||
|
fs = testAccount.getFileSystem();
|
||||||
|
Path testFilePath1 = new Path("test1.dat");
|
||||||
|
|
||||||
|
getInputStreamToTest(fs, testFilePath1);
|
||||||
|
Thread renameThread = new Thread(new RenameThread(fs, testFilePath1));
|
||||||
|
renameThread.start();
|
||||||
|
|
||||||
|
renameThread.join();
|
||||||
|
|
||||||
|
byte[] readBuffer = new byte[512];
|
||||||
|
inputStream.read(readBuffer);
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Test to validate correct exception is thrown for Multithreaded seek
|
||||||
|
* scenario for block blobs
|
||||||
|
*/
|
||||||
|
|
||||||
|
@Test(expected=FileNotFoundException.class)
|
||||||
|
public void testMultiThreadBlockBlobSeekScenario() throws Throwable {
|
||||||
|
|
||||||
|
AzureBlobStorageTestAccount testAccount = createTestAccount();
|
||||||
|
fs = testAccount.getFileSystem();
|
||||||
|
Path testFilePath1 = new Path("test1.dat");
|
||||||
|
|
||||||
|
getInputStreamToTest(fs, testFilePath1);
|
||||||
|
Thread renameThread = new Thread(new RenameThread(fs, testFilePath1));
|
||||||
|
renameThread.start();
|
||||||
|
|
||||||
|
renameThread.join();
|
||||||
|
|
||||||
|
inputStream.seek(5);
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Test to validate correct exception is thrown for Multithreaded read
|
||||||
|
* scenario for page blobs
|
||||||
|
*/
|
||||||
|
|
||||||
|
@Test(expected=FileNotFoundException.class)
|
||||||
|
public void testMultiThreadedPageBlobReadScenario() throws Throwable {
|
||||||
|
|
||||||
|
AzureBlobStorageTestAccount testAccount = getPageBlobTestStorageAccount();
|
||||||
|
fs = testAccount.getFileSystem();
|
||||||
|
Path testFilePath1 = new Path("test1.dat");
|
||||||
|
|
||||||
|
getInputStreamToTest(fs, testFilePath1);
|
||||||
|
Thread renameThread = new Thread(new RenameThread(fs, testFilePath1));
|
||||||
|
renameThread.start();
|
||||||
|
|
||||||
|
renameThread.join();
|
||||||
|
byte[] readBuffer = new byte[512];
|
||||||
|
inputStream.read(readBuffer);
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Test to validate correct exception is thrown for Multithreaded seek
|
||||||
|
* scenario for page blobs
|
||||||
|
*/
|
||||||
|
|
||||||
|
@Test(expected=FileNotFoundException.class)
|
||||||
|
public void testMultiThreadedPageBlobSeekScenario() throws Throwable {
|
||||||
|
|
||||||
|
AzureBlobStorageTestAccount testAccount = getPageBlobTestStorageAccount();
|
||||||
|
fs = testAccount.getFileSystem();
|
||||||
|
Path testFilePath1 = new Path("test1.dat");
|
||||||
|
|
||||||
|
getInputStreamToTest(fs, testFilePath1);
|
||||||
|
Thread renameThread = new Thread(new RenameThread(fs, testFilePath1));
|
||||||
|
renameThread.start();
|
||||||
|
|
||||||
|
renameThread.join();
|
||||||
|
inputStream.seek(5);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected AzureBlobStorageTestAccount createTestAccount() throws Exception {
|
||||||
|
return AzureBlobStorageTestAccount.create();
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Helper method to create a PageBlob test storage account.
|
||||||
|
*/
|
||||||
|
private AzureBlobStorageTestAccount getPageBlobTestStorageAccount()
|
||||||
|
throws Exception {
|
||||||
|
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
|
||||||
|
// Configure the page blob directories key so every file created is a page blob.
|
||||||
|
conf.set(AzureNativeFileSystemStore.KEY_PAGE_BLOB_DIRECTORIES, "/");
|
||||||
|
|
||||||
|
// Configure the atomic rename directories key so every folder will have
|
||||||
|
// atomic rename applied.
|
||||||
|
conf.set(AzureNativeFileSystemStore.KEY_ATOMIC_RENAME_DIRECTORIES, "/");
|
||||||
|
return AzureBlobStorageTestAccount.create(conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() throws Exception {
|
||||||
|
if (inputStream != null) {
|
||||||
|
inputStream.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Helper thread that just renames the test file.
|
||||||
|
*/
|
||||||
|
class RenameThread implements Runnable {
|
||||||
|
|
||||||
|
private FileSystem fs;
|
||||||
|
private Path testPath;
|
||||||
|
private Path renamePath = new Path("test2.dat");
|
||||||
|
|
||||||
|
public RenameThread(FileSystem fs, Path testPath) {
|
||||||
|
this.fs = fs;
|
||||||
|
this.testPath = testPath;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run(){
|
||||||
|
try {
|
||||||
|
fs.rename(testPath, renamePath);
|
||||||
|
}catch (Exception e) {
|
||||||
|
// Swallowing the exception as the
|
||||||
|
// correctness of the test is controlled
|
||||||
|
// by the other thread
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue