Do not create directories if repository is readonly (#26909)
For FsBlobStore and HdfsBlobStore, if the repository is read only, the blob store should be aware of the readonly setting and do not create directories if they don't exist. Closes #21495
This commit is contained in:
parent
9abc26ee92
commit
0f21262b36
|
@ -39,10 +39,15 @@ public class FsBlobStore extends AbstractComponent implements BlobStore {
|
||||||
|
|
||||||
private final int bufferSizeInBytes;
|
private final int bufferSizeInBytes;
|
||||||
|
|
||||||
|
private final boolean readOnly;
|
||||||
|
|
||||||
public FsBlobStore(Settings settings, Path path) throws IOException {
|
public FsBlobStore(Settings settings, Path path) throws IOException {
|
||||||
super(settings);
|
super(settings);
|
||||||
this.path = path;
|
this.path = path;
|
||||||
Files.createDirectories(path);
|
this.readOnly = settings.getAsBoolean("readonly", false);
|
||||||
|
if (!this.readOnly) {
|
||||||
|
Files.createDirectories(path);
|
||||||
|
}
|
||||||
this.bufferSizeInBytes = (int) settings.getAsBytesSize("repositories.fs.buffer_size", new ByteSizeValue(100, ByteSizeUnit.KB)).getBytes();
|
this.bufferSizeInBytes = (int) settings.getAsBytesSize("repositories.fs.buffer_size", new ByteSizeValue(100, ByteSizeUnit.KB)).getBytes();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -80,7 +85,9 @@ public class FsBlobStore extends AbstractComponent implements BlobStore {
|
||||||
|
|
||||||
private synchronized Path buildAndCreate(BlobPath path) throws IOException {
|
private synchronized Path buildAndCreate(BlobPath path) throws IOException {
|
||||||
Path f = buildPath(path);
|
Path f = buildPath(path);
|
||||||
Files.createDirectories(f);
|
if (!readOnly) {
|
||||||
|
Files.createDirectories(f);
|
||||||
|
}
|
||||||
return f;
|
return f;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -20,12 +20,14 @@ package org.elasticsearch.common.blobstore;
|
||||||
|
|
||||||
import org.apache.lucene.util.LuceneTestCase;
|
import org.apache.lucene.util.LuceneTestCase;
|
||||||
import org.elasticsearch.common.blobstore.fs.FsBlobStore;
|
import org.elasticsearch.common.blobstore.fs.FsBlobStore;
|
||||||
|
import org.elasticsearch.common.bytes.BytesArray;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||||
import org.elasticsearch.repositories.ESBlobStoreTestCase;
|
import org.elasticsearch.repositories.ESBlobStoreTestCase;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.nio.file.Files;
|
||||||
import java.nio.file.Path;
|
import java.nio.file.Path;
|
||||||
|
|
||||||
@LuceneTestCase.SuppressFileSystems("ExtrasFS")
|
@LuceneTestCase.SuppressFileSystems("ExtrasFS")
|
||||||
|
@ -35,4 +37,39 @@ public class FsBlobStoreTests extends ESBlobStoreTestCase {
|
||||||
Settings settings = randomBoolean() ? Settings.EMPTY : Settings.builder().put("buffer_size", new ByteSizeValue(randomIntBetween(1, 100), ByteSizeUnit.KB)).build();
|
Settings settings = randomBoolean() ? Settings.EMPTY : Settings.builder().put("buffer_size", new ByteSizeValue(randomIntBetween(1, 100), ByteSizeUnit.KB)).build();
|
||||||
return new FsBlobStore(settings, tempDir);
|
return new FsBlobStore(settings, tempDir);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testReadOnly() throws Exception {
|
||||||
|
Settings settings = Settings.builder().put("readonly", true).build();
|
||||||
|
Path tempDir = createTempDir();
|
||||||
|
Path path = tempDir.resolve("bar");
|
||||||
|
|
||||||
|
try (FsBlobStore store = new FsBlobStore(settings, path)) {
|
||||||
|
assertFalse(Files.exists(path));
|
||||||
|
BlobPath blobPath = BlobPath.cleanPath().add("foo");
|
||||||
|
store.blobContainer(blobPath);
|
||||||
|
Path storePath = store.path();
|
||||||
|
for (String d : blobPath) {
|
||||||
|
storePath = storePath.resolve(d);
|
||||||
|
}
|
||||||
|
assertFalse(Files.exists(storePath));
|
||||||
|
}
|
||||||
|
|
||||||
|
settings = randomBoolean() ? Settings.EMPTY : Settings.builder().put("readonly", false).build();
|
||||||
|
try (FsBlobStore store = new FsBlobStore(settings, path)) {
|
||||||
|
assertTrue(Files.exists(path));
|
||||||
|
BlobPath blobPath = BlobPath.cleanPath().add("foo");
|
||||||
|
BlobContainer container = store.blobContainer(blobPath);
|
||||||
|
Path storePath = store.path();
|
||||||
|
for (String d : blobPath) {
|
||||||
|
storePath = storePath.resolve(d);
|
||||||
|
}
|
||||||
|
assertTrue(Files.exists(storePath));
|
||||||
|
assertTrue(Files.isDirectory(storePath));
|
||||||
|
|
||||||
|
byte[] data = randomBytes(randomIntBetween(10, scaledRandomIntBetween(1024, 1 << 16)));
|
||||||
|
writeBlob(container, "test", new BytesArray(data));
|
||||||
|
assertArrayEquals(readBlobFully(container, "test", data.length), data);
|
||||||
|
assertTrue(container.blobExists("test"));
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -39,17 +39,21 @@ final class HdfsBlobStore implements BlobStore {
|
||||||
private final FileContext fileContext;
|
private final FileContext fileContext;
|
||||||
private final HdfsSecurityContext securityContext;
|
private final HdfsSecurityContext securityContext;
|
||||||
private final int bufferSize;
|
private final int bufferSize;
|
||||||
|
private final boolean readOnly;
|
||||||
private volatile boolean closed;
|
private volatile boolean closed;
|
||||||
|
|
||||||
HdfsBlobStore(FileContext fileContext, String path, int bufferSize) throws IOException {
|
HdfsBlobStore(FileContext fileContext, String path, int bufferSize, boolean readOnly) throws IOException {
|
||||||
this.fileContext = fileContext;
|
this.fileContext = fileContext;
|
||||||
this.securityContext = new HdfsSecurityContext(fileContext.getUgi());
|
this.securityContext = new HdfsSecurityContext(fileContext.getUgi());
|
||||||
this.bufferSize = bufferSize;
|
this.bufferSize = bufferSize;
|
||||||
this.root = execute(fileContext1 -> fileContext1.makeQualified(new Path(path)));
|
this.root = execute(fileContext1 -> fileContext1.makeQualified(new Path(path)));
|
||||||
try {
|
this.readOnly = readOnly;
|
||||||
mkdirs(root);
|
if (!readOnly) {
|
||||||
} catch (FileAlreadyExistsException ok) {
|
try {
|
||||||
// behaves like Files.createDirectories
|
mkdirs(root);
|
||||||
|
} catch (FileAlreadyExistsException ok) {
|
||||||
|
// behaves like Files.createDirectories
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -80,12 +84,14 @@ final class HdfsBlobStore implements BlobStore {
|
||||||
|
|
||||||
private Path buildHdfsPath(BlobPath blobPath) {
|
private Path buildHdfsPath(BlobPath blobPath) {
|
||||||
final Path path = translateToHdfsPath(blobPath);
|
final Path path = translateToHdfsPath(blobPath);
|
||||||
try {
|
if (!readOnly) {
|
||||||
mkdirs(path);
|
try {
|
||||||
} catch (FileAlreadyExistsException ok) {
|
mkdirs(path);
|
||||||
// behaves like Files.createDirectories
|
} catch (FileAlreadyExistsException ok) {
|
||||||
} catch (IOException ex) {
|
// behaves like Files.createDirectories
|
||||||
throw new ElasticsearchException("failed to create blob container", ex);
|
} catch (IOException ex) {
|
||||||
|
throw new ElasticsearchException("failed to create blob container", ex);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return path;
|
return path;
|
||||||
}
|
}
|
||||||
|
|
|
@ -106,7 +106,7 @@ public final class HdfsRepository extends BlobStoreRepository {
|
||||||
SpecialPermission.check();
|
SpecialPermission.check();
|
||||||
FileContext fileContext = AccessController.doPrivileged((PrivilegedAction<FileContext>)
|
FileContext fileContext = AccessController.doPrivileged((PrivilegedAction<FileContext>)
|
||||||
() -> createContext(uri, getMetadata().settings()));
|
() -> createContext(uri, getMetadata().settings()));
|
||||||
blobStore = new HdfsBlobStore(fileContext, pathSetting, bufferSize);
|
blobStore = new HdfsBlobStore(fileContext, pathSetting, bufferSize, isReadOnly());
|
||||||
logger.debug("Using file-system [{}] for URI [{}], path [{}]", fileContext.getDefaultFileSystem(), fileContext.getDefaultFileSystem().getUri(), pathSetting);
|
logger.debug("Using file-system [{}] for URI [{}], path [{}]", fileContext.getDefaultFileSystem(), fileContext.getDefaultFileSystem().getUri(), pathSetting);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw new UncheckedIOException(String.format(Locale.ROOT, "Cannot create HDFS repository for uri [%s]", uri), e);
|
throw new UncheckedIOException(String.format(Locale.ROOT, "Cannot create HDFS repository for uri [%s]", uri), e);
|
||||||
|
|
|
@ -19,6 +19,20 @@
|
||||||
|
|
||||||
package org.elasticsearch.repositories.hdfs;
|
package org.elasticsearch.repositories.hdfs;
|
||||||
|
|
||||||
|
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.AbstractFileSystem;
|
||||||
|
import org.apache.hadoop.fs.FileContext;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.UnsupportedFileSystemException;
|
||||||
|
import org.elasticsearch.common.SuppressForbidden;
|
||||||
|
import org.elasticsearch.common.blobstore.BlobContainer;
|
||||||
|
import org.elasticsearch.common.blobstore.BlobPath;
|
||||||
|
import org.elasticsearch.common.blobstore.BlobStore;
|
||||||
|
import org.elasticsearch.common.bytes.BytesArray;
|
||||||
|
import org.elasticsearch.repositories.ESBlobStoreContainerTestCase;
|
||||||
|
|
||||||
|
import javax.security.auth.Subject;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.lang.reflect.Constructor;
|
import java.lang.reflect.Constructor;
|
||||||
import java.lang.reflect.InvocationTargetException;
|
import java.lang.reflect.InvocationTargetException;
|
||||||
|
@ -29,22 +43,20 @@ import java.security.PrivilegedAction;
|
||||||
import java.security.PrivilegedActionException;
|
import java.security.PrivilegedActionException;
|
||||||
import java.security.PrivilegedExceptionAction;
|
import java.security.PrivilegedExceptionAction;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import javax.security.auth.Subject;
|
|
||||||
|
|
||||||
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
|
import static org.elasticsearch.repositories.ESBlobStoreTestCase.randomBytes;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import static org.elasticsearch.repositories.ESBlobStoreTestCase.readBlobFully;
|
||||||
import org.apache.hadoop.fs.AbstractFileSystem;
|
|
||||||
import org.apache.hadoop.fs.FileContext;
|
|
||||||
import org.apache.hadoop.fs.UnsupportedFileSystemException;
|
|
||||||
import org.elasticsearch.common.SuppressForbidden;
|
|
||||||
import org.elasticsearch.common.blobstore.BlobStore;
|
|
||||||
import org.elasticsearch.repositories.ESBlobStoreContainerTestCase;
|
|
||||||
|
|
||||||
@ThreadLeakFilters(filters = {HdfsClientThreadLeakFilter.class})
|
@ThreadLeakFilters(filters = {HdfsClientThreadLeakFilter.class})
|
||||||
public class HdfsBlobStoreContainerTests extends ESBlobStoreContainerTestCase {
|
public class HdfsBlobStoreContainerTests extends ESBlobStoreContainerTestCase {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected BlobStore newBlobStore() throws IOException {
|
protected BlobStore newBlobStore() throws IOException {
|
||||||
|
return new HdfsBlobStore(createTestContext(), "temp", 1024, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
private FileContext createTestContext() {
|
||||||
FileContext fileContext;
|
FileContext fileContext;
|
||||||
try {
|
try {
|
||||||
fileContext = AccessController.doPrivileged((PrivilegedExceptionAction<FileContext>)
|
fileContext = AccessController.doPrivileged((PrivilegedExceptionAction<FileContext>)
|
||||||
|
@ -52,7 +64,7 @@ public class HdfsBlobStoreContainerTests extends ESBlobStoreContainerTestCase {
|
||||||
} catch (PrivilegedActionException e) {
|
} catch (PrivilegedActionException e) {
|
||||||
throw new RuntimeException(e.getCause());
|
throw new RuntimeException(e.getCause());
|
||||||
}
|
}
|
||||||
return new HdfsBlobStore(fileContext, "temp", 1024);
|
return fileContext;
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressForbidden(reason = "lesser of two evils (the other being a bunch of JNI/classloader nightmares)")
|
@SuppressForbidden(reason = "lesser of two evils (the other being a bunch of JNI/classloader nightmares)")
|
||||||
|
@ -69,7 +81,7 @@ public class HdfsBlobStoreContainerTests extends ESBlobStoreContainerTestCase {
|
||||||
Class<?> clazz = Class.forName("org.apache.hadoop.security.User");
|
Class<?> clazz = Class.forName("org.apache.hadoop.security.User");
|
||||||
ctor = clazz.getConstructor(String.class);
|
ctor = clazz.getConstructor(String.class);
|
||||||
ctor.setAccessible(true);
|
ctor.setAccessible(true);
|
||||||
} catch (ClassNotFoundException | NoSuchMethodException e) {
|
} catch (ClassNotFoundException | NoSuchMethodException e) {
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -98,4 +110,33 @@ public class HdfsBlobStoreContainerTests extends ESBlobStoreContainerTestCase {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testReadOnly() throws Exception {
|
||||||
|
FileContext fileContext = createTestContext();
|
||||||
|
// Constructor will not create dir if read only
|
||||||
|
HdfsBlobStore hdfsBlobStore = new HdfsBlobStore(fileContext, "dir", 1024, true);
|
||||||
|
FileContext.Util util = fileContext.util();
|
||||||
|
Path root = fileContext.makeQualified(new Path("dir"));
|
||||||
|
assertFalse(util.exists(root));
|
||||||
|
BlobPath blobPath = BlobPath.cleanPath().add("path");
|
||||||
|
|
||||||
|
// blobContainer() will not create path if read only
|
||||||
|
hdfsBlobStore.blobContainer(blobPath);
|
||||||
|
Path hdfsPath = root;
|
||||||
|
for (String p : blobPath) {
|
||||||
|
hdfsPath = new Path(hdfsPath, p);
|
||||||
|
}
|
||||||
|
assertFalse(util.exists(hdfsPath));
|
||||||
|
|
||||||
|
// if not read only, directory will be created
|
||||||
|
hdfsBlobStore = new HdfsBlobStore(fileContext, "dir", 1024, false);
|
||||||
|
assertTrue(util.exists(root));
|
||||||
|
BlobContainer container = hdfsBlobStore.blobContainer(blobPath);
|
||||||
|
assertTrue(util.exists(hdfsPath));
|
||||||
|
|
||||||
|
byte[] data = randomBytes(randomIntBetween(10, scaledRandomIntBetween(1024, 1 << 16)));
|
||||||
|
writeBlob(container, "foo", new BytesArray(data));
|
||||||
|
assertArrayEquals(readBlobFully(container, "foo", data.length), data);
|
||||||
|
assertTrue(container.blobExists("foo"));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -142,7 +142,7 @@ public abstract class ESBlobStoreContainerTestCase extends ESTestCase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void writeBlob(final BlobContainer container, final String blobName, final BytesArray bytesArray) throws IOException {
|
protected void writeBlob(final BlobContainer container, final String blobName, final BytesArray bytesArray) throws IOException {
|
||||||
try (InputStream stream = bytesArray.streamInput()) {
|
try (InputStream stream = bytesArray.streamInput()) {
|
||||||
container.writeBlob(blobName, stream, bytesArray.length());
|
container.writeBlob(blobName, stream, bytesArray.length());
|
||||||
}
|
}
|
||||||
|
|
|
@ -78,7 +78,7 @@ public abstract class ESBlobStoreTestCase extends ESTestCase {
|
||||||
return data;
|
return data;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void writeBlob(BlobContainer container, String blobName, BytesArray bytesArray) throws IOException {
|
protected static void writeBlob(BlobContainer container, String blobName, BytesArray bytesArray) throws IOException {
|
||||||
try (InputStream stream = bytesArray.streamInput()) {
|
try (InputStream stream = bytesArray.streamInput()) {
|
||||||
container.writeBlob(blobName, stream, bytesArray.length());
|
container.writeBlob(blobName, stream, bytesArray.length());
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue