Formalize and Streamline Buffer Sizes used by Repositories (#59771) (#60051)

Due to complicated access checks (reads and writes execute in their own access context) on some repositories (GCS, Azure, HDFS), using a hard coded buffer size of 4k for restores was needlessly inefficient.
By the same token, the use of stream copying with the default 8k buffer size  for blob writes was inefficient as well.

We also had dedicated, undocumented buffer size settings for HDFS and FS repositories. For these two we would use a 100k buffer by default. We did not have such a setting for e.g. GCS though, which would only use an 8k read buffer which is needlessly small for reading from a raw `URLConnection`.

This commit adds an undocumented setting that sets the default buffer size to `128k` for all repositories. It removes wasteful allocation of such a large buffer for small writes and reads in case of HDFS and FS repositories (i.e. still using the smaller buffer to write metadata) but uses a large buffer for doing restores and uploading segment blobs.

This should speed up Azure and GCS restores and snapshots in a non-trivial way as well as save some memory when reading small blobs on FS and HFDS repositories.
This commit is contained in:
Armin Braun 2020-07-22 21:06:31 +02:00 committed by GitHub
parent 9ba017f699
commit ebb6677815
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 77 additions and 72 deletions

View File

@ -43,9 +43,9 @@ import org.elasticsearch.common.blobstore.BlobStoreException;
import org.elasticsearch.common.blobstore.DeleteResult;
import org.elasticsearch.common.blobstore.support.PlainBlobMetadata;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.internal.io.Streams;
import java.io.ByteArrayInputStream;
import java.io.IOException;
@ -101,16 +101,19 @@ class GoogleCloudStorageBlobStore implements BlobStore {
private final String repositoryName;
private final GoogleCloudStorageService storageService;
private final GoogleCloudStorageOperationsStats stats;
private final int bufferSize;
GoogleCloudStorageBlobStore(String bucketName,
String clientName,
String repositoryName,
GoogleCloudStorageService storageService) {
GoogleCloudStorageService storageService,
int bufferSize) {
this.bucketName = bucketName;
this.clientName = clientName;
this.repositoryName = repositoryName;
this.storageService = storageService;
this.stats = new GoogleCloudStorageOperationsStats(bucketName);
this.bufferSize = bufferSize;
if (doesBucketExist(bucketName) == false) {
throw new BlobStoreException("Bucket [" + bucketName + "] does not exist");
}
@ -126,7 +129,7 @@ class GoogleCloudStorageBlobStore implements BlobStore {
}
@Override
public void close() throws IOException {
public void close() {
storageService.closeRepositoryClient(repositoryName);
}
@ -252,7 +255,7 @@ class GoogleCloudStorageBlobStore implements BlobStore {
void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException {
final BlobInfo blobInfo = BlobInfo.newBuilder(bucketName, blobName).build();
if (blobSize > getLargeBlobThresholdInBytes()) {
writeBlobResumable(blobInfo, inputStream, failIfAlreadyExists);
writeBlobResumable(blobInfo, inputStream, blobSize, failIfAlreadyExists);
} else {
writeBlobMultipart(blobInfo, inputStream, blobSize, failIfAlreadyExists);
}
@ -269,13 +272,16 @@ class GoogleCloudStorageBlobStore implements BlobStore {
* https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload
* @param blobInfo the info for the blob to be uploaded
* @param inputStream the stream containing the blob data
* @param size expected size of the blob to be written
* @param failIfAlreadyExists whether to throw a FileAlreadyExistsException if the given blob already exists
*/
private void writeBlobResumable(BlobInfo blobInfo, InputStream inputStream, boolean failIfAlreadyExists) throws IOException {
private void writeBlobResumable(BlobInfo blobInfo, InputStream inputStream, long size, boolean failIfAlreadyExists)
throws IOException {
// We retry 410 GONE errors to cover the unlikely but possible scenario where a resumable upload session becomes broken and
// needs to be restarted from scratch. Given how unlikely a 410 error should be according to SLAs we retry only twice.
assert inputStream.markSupported();
inputStream.mark(Integer.MAX_VALUE);
final byte[] buffer = new byte[size < bufferSize ? Math.toIntExact(size) : bufferSize];
StorageException storageException = null;
final Storage.BlobWriteOption[] writeOptions = failIfAlreadyExists ?
new Storage.BlobWriteOption[]{Storage.BlobWriteOption.doesNotExist()} : new Storage.BlobWriteOption[0];
@ -303,8 +309,7 @@ class GoogleCloudStorageBlobStore implements BlobStore {
public void close() throws IOException {
SocketAccess.doPrivilegedVoidIOException(writeChannel::close);
}
}));
}), buffer);
// We don't track this operation on the http layer as
// we do with the GET/LIST operations since this operations
// can trigger multiple underlying http requests but only one

View File

@ -98,7 +98,7 @@ class GoogleCloudStorageRepository extends BlobStoreRepository {
@Override
protected GoogleCloudStorageBlobStore createBlobStore() {
return new GoogleCloudStorageBlobStore(bucket, clientName, metadata.name(), storageService);
return new GoogleCloudStorageBlobStore(bucket, clientName, metadata.name(), storageService, bufferSize);
}
@Override

View File

@ -169,7 +169,8 @@ public class GoogleCloudStorageBlobContainerRetriesTests extends AbstractBlobCon
}))
);
final GoogleCloudStorageBlobStore blobStore = new GoogleCloudStorageBlobStore("bucket", client, "repo", service);
final GoogleCloudStorageBlobStore blobStore = new GoogleCloudStorageBlobStore("bucket", client, "repo", service,
randomIntBetween(1, 8) * 1024);
httpContexts.forEach(httpContext -> httpServer.removeContext(httpContext));
return new GoogleCloudStorageBlobContainer(BlobPath.cleanPath(), blobStore);

View File

@ -87,7 +87,8 @@ public class GoogleCloudStorageBlobStoreContainerTests extends ESTestCase {
final GoogleCloudStorageService storageService = mock(GoogleCloudStorageService.class);
when(storageService.client(any(String.class), any(String.class), any(GoogleCloudStorageOperationsStats.class))).thenReturn(storage);
try (BlobStore store = new GoogleCloudStorageBlobStore("bucket", "test", "repo", storageService)) {
try (BlobStore store = new GoogleCloudStorageBlobStore("bucket", "test", "repo", storageService,
randomIntBetween(1, 8) * 1024)) {
final BlobContainer container = store.blobContainer(new BlobPath());
IOException e = expectThrows(IOException.class, () -> container.deleteBlobsIgnoringIfNotExists(blobs));

View File

@ -265,7 +265,8 @@ public class GoogleCloudStorageBlobStoreRepositoryTests extends ESMockAPIBasedRe
@Override
protected GoogleCloudStorageBlobStore createBlobStore() {
return new GoogleCloudStorageBlobStore(
metadata.settings().get("bucket"), "test", metadata.name(), storageService) {
metadata.settings().get("bucket"), "test", metadata.name(), storageService,
randomIntBetween(1, 8) * 1024) {
@Override
long getLargeBlobThresholdInBytes() {
return ByteSizeUnit.MB.toBytes(1);

View File

@ -20,6 +20,7 @@ package org.elasticsearch.repositories.hdfs;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.Options.CreateOpts;
@ -111,23 +112,19 @@ final class HdfsBlobContainer extends AbstractBlobContainer {
}
@Override
public InputStream readBlob(String blobName, long position, long length) throws IOException {
public InputStream readBlob(String blobName, long position, long length) {
throw new UnsupportedOperationException();
}
@Override
public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException {
Path blob = new Path(path, blobName);
// we pass CREATE, which means it fails if a blob already exists.
final EnumSet<CreateFlag> flags = failIfAlreadyExists ? EnumSet.of(CreateFlag.CREATE, CreateFlag.SYNC_BLOCK)
: EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE, CreateFlag.SYNC_BLOCK);
store.execute((Operation<Void>) fileContext -> {
Path blob = new Path(path, blobName);
// we pass CREATE, which means it fails if a blob already exists.
EnumSet<CreateFlag> flags = failIfAlreadyExists ? EnumSet.of(CreateFlag.CREATE, CreateFlag.SYNC_BLOCK)
: EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE, CreateFlag.SYNC_BLOCK);
try (FSDataOutputStream stream = fileContext.create(blob, flags, CreateOpts.bufferSize(bufferSize))) {
int bytesRead;
byte[] buffer = new byte[bufferSize];
while ((bytesRead = inputStream.read(buffer)) != -1) {
stream.write(buffer, 0, bytesRead);
}
try {
writeToPath(inputStream, blobSize, fileContext, blob, flags);
} catch (org.apache.hadoop.fs.FileAlreadyExistsException faee) {
throw new FileAlreadyExistsException(blob.toString(), null, faee.getMessage());
}
@ -138,17 +135,10 @@ final class HdfsBlobContainer extends AbstractBlobContainer {
@Override
public void writeBlobAtomic(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException {
final String tempBlob = FsBlobContainer.tempBlobName(blobName);
final Path tempBlobPath = new Path(path, tempBlob);
final Path blob = new Path(path, blobName);
store.execute((Operation<Void>) fileContext -> {
final Path tempBlobPath = new Path(path, tempBlob);
try (FSDataOutputStream stream = fileContext.create(
tempBlobPath, EnumSet.of(CreateFlag.CREATE, CreateFlag.SYNC_BLOCK), CreateOpts.bufferSize(bufferSize))) {
int bytesRead;
byte[] buffer = new byte[bufferSize];
while ((bytesRead = inputStream.read(buffer)) != -1) {
stream.write(buffer, 0, bytesRead);
}
}
final Path blob = new Path(path, blobName);
writeToPath(inputStream, blobSize, fileContext, tempBlobPath, EnumSet.of(CreateFlag.CREATE, CreateFlag.SYNC_BLOCK));
try {
fileContext.rename(tempBlobPath, blob, failIfAlreadyExists ? Options.Rename.NONE : Options.Rename.OVERWRITE);
} catch (org.apache.hadoop.fs.FileAlreadyExistsException faee) {
@ -158,6 +148,17 @@ final class HdfsBlobContainer extends AbstractBlobContainer {
});
}
private void writeToPath(InputStream inputStream, long blobSize, FileContext fileContext, Path blobPath,
EnumSet<CreateFlag> createFlags) throws IOException {
final byte[] buffer = new byte[blobSize < bufferSize ? Math.toIntExact(blobSize) : bufferSize];
try (FSDataOutputStream stream = fileContext.create(blobPath, createFlags, CreateOpts.bufferSize(buffer.length))) {
int bytesRead;
while ((bytesRead = inputStream.read(buffer)) != -1) {
stream.write(buffer, 0, bytesRead);
}
}
}
@Override
public Map<String, BlobMetadata> listBlobsByPrefix(@Nullable final String prefix) throws IOException {
FileStatus[] files = store.execute(fileContext -> fileContext.util().listStatus(path,

View File

@ -36,7 +36,6 @@ import org.elasticsearch.common.Strings;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment;
@ -64,10 +63,6 @@ public final class HdfsRepository extends BlobStoreRepository {
private final URI uri;
private final String pathSetting;
// buffer size passed to HDFS read/write methods
// TODO: why 100KB?
private static final ByteSizeValue DEFAULT_BUFFER_SIZE = new ByteSizeValue(100, ByteSizeUnit.KB);
public HdfsRepository(
final RepositoryMetadata metadata,
final Environment environment,
@ -128,8 +123,6 @@ public final class HdfsRepository extends BlobStoreRepository {
Class<?> ret = hadoopConfiguration.getClass(configKey, null, FailoverProxyProvider.class);
boolean haEnabled = ret != null;
int bufferSize = repositorySettings.getAsBytesSize("buffer_size", DEFAULT_BUFFER_SIZE).bytesAsInt();
// Create the filecontext with our user information
// This will correctly configure the filecontext to have our UGI as its internal user.
FileContext fileContext = ugi.doAs((PrivilegedAction<FileContext>) () -> {

View File

@ -105,7 +105,7 @@ public class FsBlobStoreRepositoryIT extends ESBlobStoreRepositoryIntegTestCase
Path tempDir = createTempDir();
Path path = tempDir.resolve("bar");
try (FsBlobStore store = new FsBlobStore(Settings.EMPTY, path, true)) {
try (FsBlobStore store = new FsBlobStore(randomIntBetween(1, 8) * 1024, path, true)) {
assertFalse(Files.exists(path));
BlobPath blobPath = BlobPath.cleanPath().add("foo");
store.blobContainer(blobPath);
@ -116,7 +116,7 @@ public class FsBlobStoreRepositoryIT extends ESBlobStoreRepositoryIntegTestCase
assertFalse(Files.exists(storePath));
}
try (FsBlobStore store = new FsBlobStore(Settings.EMPTY, path, false)) {
try (FsBlobStore store = new FsBlobStore(randomIntBetween(1, 8) * 1024, path, false)) {
assertTrue(Files.exists(path));
BlobPath blobPath = BlobPath.cleanPath().add("foo");
BlobContainer container = store.blobContainer(blobPath);

View File

@ -26,10 +26,9 @@ import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.DeleteResult;
import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
import org.elasticsearch.common.blobstore.support.PlainBlobMetadata;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.core.internal.io.Streams;
import java.io.BufferedInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
@ -144,10 +143,6 @@ public class FsBlobContainer extends AbstractBlobContainer {
IOUtils.rm(blobNames.stream().map(path::resolve).toArray(Path[]::new));
}
private InputStream bufferedInputStream(InputStream inputStream) {
return new BufferedInputStream(inputStream, blobStore.bufferSizeInBytes());
}
@Override
public boolean blobExists(String blobName) {
return Files.exists(path.resolve(blobName));
@ -157,7 +152,7 @@ public class FsBlobContainer extends AbstractBlobContainer {
public InputStream readBlob(String name) throws IOException {
final Path resolvedPath = path.resolve(name);
try {
return bufferedInputStream(Files.newInputStream(resolvedPath));
return Files.newInputStream(resolvedPath);
} catch (FileNotFoundException fnfe) {
throw new NoSuchFileException("[" + name + "] blob not found");
}
@ -170,7 +165,7 @@ public class FsBlobContainer extends AbstractBlobContainer {
channel.position(position);
}
assert channel.position() == position;
return bufferedInputStream(org.elasticsearch.common.io.Streams.limitStream(Channels.newInputStream(channel), length));
return org.elasticsearch.common.io.Streams.limitStream(Channels.newInputStream(channel), length);
}
@Override
@ -185,10 +180,7 @@ public class FsBlobContainer extends AbstractBlobContainer {
deleteBlobsIgnoringIfNotExists(Collections.singletonList(blobName));
}
final Path file = path.resolve(blobName);
try (OutputStream outputStream = Files.newOutputStream(file, StandardOpenOption.CREATE_NEW)) {
Streams.copy(inputStream, outputStream);
}
IOUtils.fsync(file, false);
writeToPath(inputStream, file, blobSize);
IOUtils.fsync(path, true);
}
@ -198,10 +190,7 @@ public class FsBlobContainer extends AbstractBlobContainer {
final String tempBlob = tempBlobName(blobName);
final Path tempBlobPath = path.resolve(tempBlob);
try {
try (OutputStream outputStream = Files.newOutputStream(tempBlobPath, StandardOpenOption.CREATE_NEW)) {
Streams.copy(inputStream, outputStream);
}
IOUtils.fsync(tempBlobPath, false);
writeToPath(inputStream, tempBlobPath, blobSize);
moveBlobAtomic(tempBlob, blobName, failIfAlreadyExists);
} catch (IOException ex) {
try {
@ -215,6 +204,14 @@ public class FsBlobContainer extends AbstractBlobContainer {
}
}
private void writeToPath(InputStream inputStream, Path tempBlobPath, long blobSize) throws IOException {
try (OutputStream outputStream = Files.newOutputStream(tempBlobPath, StandardOpenOption.CREATE_NEW)) {
final int bufferSize = blobStore.bufferSizeInBytes();
Streams.copy(inputStream, outputStream, new byte[blobSize < bufferSize ? Math.toIntExact(blobSize) : bufferSize]);
}
IOUtils.fsync(tempBlobPath, false);
}
public void moveBlobAtomic(final String sourceBlobName, final String targetBlobName, final boolean failIfAlreadyExists)
throws IOException {
final Path sourceBlobPath = path.resolve(sourceBlobName);

View File

@ -23,9 +23,6 @@ import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import java.io.IOException;
import java.nio.file.Files;
@ -39,14 +36,13 @@ public class FsBlobStore implements BlobStore {
private final boolean readOnly;
public FsBlobStore(Settings settings, Path path, boolean readonly) throws IOException {
public FsBlobStore(int bufferSizeInBytes, Path path, boolean readonly) throws IOException {
this.path = path;
this.readOnly = readonly;
if (this.readOnly == false) {
Files.createDirectories(path);
}
this.bufferSizeInBytes = (int) settings.getAsBytesSize("repositories.fs.buffer_size",
new ByteSizeValue(100, ByteSizeUnit.KB)).getBytes();
this.bufferSizeInBytes = bufferSizeInBytes;
}
@Override

View File

@ -162,8 +162,6 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
protected final ThreadPool threadPool;
private static final int BUFFER_SIZE = 4096;
public static final String SNAPSHOT_PREFIX = "snap-";
public static final String INDEX_FILE_PREFIX = "index-";
@ -206,6 +204,13 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
public static final Setting<Boolean> CACHE_REPOSITORY_DATA =
Setting.boolSetting("cache_repository_data", true, Setting.Property.Deprecated);
/**
* Size hint for the IO buffer size to use when reading from and writing to the repository.
*/
public static final Setting<ByteSizeValue> BUFFER_SIZE_SETTING = Setting.byteSizeSetting("io_buffer_size",
ByteSizeValue.parseBytesSizeValue("128kb", "io_buffer_size"), ByteSizeValue.parseBytesSizeValue("8kb", "buffer_size"),
ByteSizeValue.parseBytesSizeValue("16mb", "io_buffer_size"), Setting.Property.NodeScope);
private final boolean compress;
private final boolean cacheRepositoryData;
@ -277,6 +282,11 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
*/
private volatile boolean bestEffortConsistency;
/**
* IO buffer size hint for reading and writing to the underlying blob store.
*/
protected final int bufferSize;
/**
* Constructs new BlobStoreRepository
* @param metadata The metadata for this repository including name and settings
@ -298,6 +308,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
restoreRateLimiter = getRateLimiter(metadata.settings(), "max_restore_bytes_per_sec", ByteSizeValue.ZERO);
readOnly = metadata.settings().getAsBoolean("readonly", false);
cacheRepositoryData = CACHE_REPOSITORY_DATA.get(metadata.settings());
bufferSize = Math.toIntExact(BUFFER_SIZE_SETTING.get(metadata.settings()).getBytes());
}
@Override
@ -2059,7 +2070,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
return container.readBlob(fileInfo.partName(slice));
}
})) {
final byte[] buffer = new byte[BUFFER_SIZE];
final byte[] buffer = new byte[Math.toIntExact(Math.min(bufferSize, fileInfo.length()))];
int length;
while ((length = stream.read(buffer)) > 0) {
indexOutput.writeBytes(buffer, 0, length);

View File

@ -117,7 +117,7 @@ public class FsRepository extends BlobStoreRepository {
protected BlobStore createBlobStore() throws Exception {
final String location = REPOSITORIES_LOCATION_SETTING.get(getMetadata().settings());
final Path locationFile = environment.resolveRepoFile(location);
return new FsBlobStore(environment.settings(), locationFile, isReadOnly());
return new FsBlobStore(bufferSize, locationFile, isReadOnly());
}
@Override

View File

@ -25,7 +25,6 @@ import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.io.PathUtils;
import org.elasticsearch.common.io.PathUtilsForTesting;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.test.ESTestCase;
import org.junit.After;
@ -77,7 +76,8 @@ public class FsBlobContainerTests extends ESTestCase {
final Path path = PathUtils.get(createTempDir().toString());
Files.write(path.resolve(blobName), blobData);
final FsBlobContainer container = new FsBlobContainer(new FsBlobStore(Settings.EMPTY, path, false), BlobPath.cleanPath(), path);
final FsBlobContainer container =
new FsBlobContainer(new FsBlobStore(randomIntBetween(1, 8) * 1024, path, false), BlobPath.cleanPath(), path);
assertThat(totalBytesRead.get(), equalTo(0L));
final long start = randomLongBetween(0L, Math.max(0L, blobData.length - 1));

View File

@ -30,7 +30,6 @@ import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.ToXContentFragment;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -149,7 +148,7 @@ public class BlobStoreFormatTests extends ESTestCase {
}
protected BlobStore createTestBlobStore() throws IOException {
return new FsBlobStore(Settings.EMPTY, createTempDir(), false);
return new FsBlobStore(randomIntBetween(1, 8) * 1024, createTempDir(), false);
}
protected void randomCorruption(BlobContainer blobContainer, String blobName) throws IOException {

View File

@ -629,7 +629,7 @@ public class SearchableSnapshotDirectoryTests extends ESTestCase {
final BlobStoreIndexShardSnapshot snapshot = new BlobStoreIndexShardSnapshot("_snapshot", 0L, randomFiles, 0L, 0L, 0, 0L);
final BlobContainer blobContainer = new FsBlobContainer(
new FsBlobStore(Settings.EMPTY, shardSnapshotDir, true),
new FsBlobStore(randomIntBetween(1, 8) * 1024, shardSnapshotDir, true),
BlobPath.cleanPath(),
shardSnapshotDir
);