Added HdfsBlobStoreContainer tests

Added BlobContainer tests for HDFS storage
and caught a bug at the same time in which
deleteBlob was not raising an IOException
when the blobName did not exist.
This commit is contained in:
gfyoung 2016-06-25 11:20:43 -04:00 committed by Ali Beyad
parent 5eb4797955
commit dfcdadb59f
4 changed files with 78 additions and 3 deletions

View File

@ -87,7 +87,7 @@ public class FsBlobContainer extends AbstractBlobContainer {
Path blobPath = path.resolve(blobName);
if (!Files.deleteIfExists(blobPath)) {
// blobPath does not exist
throw new IOException("File " + blobPath.toString() + " does not exist");
throw new IOException("File [" + blobPath.toString() + "] does not exist");
}
}

View File

@ -99,7 +99,7 @@ public class AzureStorageServiceMock extends AbstractComponent implements AzureS
MapBuilder<String, BlobMetaData> blobsBuilder = MapBuilder.newMapBuilder();
for (String blobName : blobs.keySet()) {
final String checkBlob;
if (keyPath != null || keyPath.isEmpty()) {
if (keyPath != null && !keyPath.isEmpty()) {
// strip off key path from the beginning of the blob name
checkBlob = blobName.replace(keyPath, "");
} else {

View File

@ -68,6 +68,10 @@ final class HdfsBlobContainer extends AbstractBlobContainer {
@Override
public void deleteBlob(String blobName) throws IOException {
if (!blobExists(blobName)) {
throw new IOException("Blob [" + blobName + "] does not exist");
}
store.execute(new Operation<Boolean>() {
@Override
public Boolean run(FileContext fileContext) throws IOException {

View File

@ -16,18 +16,89 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.repositories.hdfs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.AbstractFileSystem;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.repositories.ESBlobStoreContainerTestCase;
import javax.security.auth.Subject;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.net.URI;
import java.net.URISyntaxException;
import java.security.AccessController;
import java.security.Principal;
import java.security.PrivilegedAction;
import java.util.Collections;
public class HdfsBlobStoreContainerTests extends ESBlobStoreContainerTestCase {
@Override
protected BlobStore newBlobStore() throws IOException {
return new HdfsBlobStore(FileContext.getFileContext(), "", 100);
return AccessController.doPrivileged(
new PrivilegedAction<HdfsBlobStore>() {
@Override
public HdfsBlobStore run() {
try {
FileContext fileContext = createContext(new URI("hdfs:///"));
return new HdfsBlobStore(fileContext, "temp", 1024);
} catch (IOException | URISyntaxException e) {
throw new RuntimeException(e);
}
}
});
}
public FileContext createContext(URI uri) {
// mirrors HdfsRepository.java behaviour
Configuration cfg = new Configuration(true);
cfg.setClassLoader(HdfsRepository.class.getClassLoader());
cfg.reloadConfiguration();
Constructor<?> ctor;
Subject subject;
try {
Class<?> clazz = Class.forName("org.apache.hadoop.security.User");
ctor = clazz.getConstructor(String.class);
ctor.setAccessible(true);
} catch (ClassNotFoundException | NoSuchMethodException e) {
throw new RuntimeException(e);
}
try {
Principal principal = (Principal) ctor.newInstance(System.getProperty("user.name"));
subject = new Subject(false, Collections.singleton(principal),
Collections.emptySet(), Collections.emptySet());
} catch (InstantiationException | IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
}
// disable file system cache
cfg.setBoolean("fs.hdfs.impl.disable.cache", true);
// set file system to TestingFs to avoid a bunch of security
// checks, similar to what is done in HdfsTests.java
cfg.set(String.format("fs.AbstractFileSystem.%s.impl", uri.getScheme()),
TestingFs.class.getName());
// create the FileContext with our user
return Subject.doAs(subject, new PrivilegedAction<FileContext>() {
@Override
public FileContext run() {
try {
TestingFs fs = (TestingFs) AbstractFileSystem.get(uri, cfg);
return FileContext.getFileContext(fs, cfg);
} catch (UnsupportedFileSystemException e) {
throw new RuntimeException(e);
}
}
});
}
}