diff --git a/docs/reference/modules/snapshots.asciidoc b/docs/reference/modules/snapshots.asciidoc index 38db497c20f..95b6d18f5e7 100644 --- a/docs/reference/modules/snapshots.asciidoc +++ b/docs/reference/modules/snapshots.asciidoc @@ -69,6 +69,8 @@ on all data and master nodes. The following settings are supported: `concurrent_streams`:: Throttles the number of streams (per node) preforming snapshot operation. Defaults to `5` `chunk_size`:: Big files can be broken down into chunks during snapshotting if needed. The chunk size can be specified in bytes or by using size value notation, i.e. 1g, 10m, 5k. Defaults to `null` (unlimited chunk size). +`max_restore_bytes_per_sec`:: Throttles per node restore rate. Defaults to `20mb` per second. +`max_snapshot_bytes_per_sec`:: Throttles per node snapshot rate. Defaults to `20mb` per second. [float] diff --git a/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardRepository.java b/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardRepository.java index 99a88939090..215ff34d663 100644 --- a/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardRepository.java +++ b/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardRepository.java @@ -24,6 +24,7 @@ import com.google.common.collect.Lists; import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.IndexOutput; +import org.apache.lucene.store.RateLimiter; import org.apache.lucene.util.IOUtils; import org.elasticsearch.cluster.metadata.SnapshotId; import org.elasticsearch.common.blobstore.*; @@ -47,6 +48,7 @@ import org.elasticsearch.indices.IndicesService; import org.elasticsearch.repositories.RepositoryName; import java.io.IOException; +import java.io.InputStream; import java.util.Collections; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; @@ -71,6 +73,14 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements private final IndicesService indicesService; + private RateLimiter snapshotRateLimiter; + + private RateLimiter restoreRateLimiter; + + private RateLimiterListener rateLimiterListener; + + private RateLimitingInputStream.Listener snapshotThrottleListener; + private static final String SNAPSHOT_PREFIX = "snapshot-"; @Inject @@ -87,10 +97,21 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements * @param basePath base path to blob store * @param chunkSize chunk size */ - public void initialize(BlobStore blobStore, BlobPath basePath, ByteSizeValue chunkSize) { + public void initialize(BlobStore blobStore, BlobPath basePath, ByteSizeValue chunkSize, + RateLimiter snapshotRateLimiter, RateLimiter restoreRateLimiter, + final RateLimiterListener rateLimiterListener) { this.blobStore = blobStore; this.basePath = basePath; this.chunkSize = chunkSize; + this.snapshotRateLimiter = snapshotRateLimiter; + this.restoreRateLimiter = restoreRateLimiter; + this.rateLimiterListener = rateLimiterListener; + this.snapshotThrottleListener = new RateLimitingInputStream.Listener() { + @Override + public void onPause(long nanos) { + rateLimiterListener.onSnapshotPause(nanos); + } + }; } /** @@ -469,10 +490,17 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements try { indexInput = store.openInputRaw(fileInfo.physicalName(), IOContext.READONCE); indexInput.seek(i * fileInfo.partBytes()); - InputStreamIndexInput is = new ThreadSafeInputStreamIndexInput(indexInput, fileInfo.partBytes()); + InputStreamIndexInput inputStreamIndexInput = new ThreadSafeInputStreamIndexInput(indexInput, fileInfo.partBytes()); final IndexInput fIndexInput = indexInput; - blobContainer.writeBlob(fileInfo.partName(i), is, is.actualSizeToRead(), new ImmutableBlobContainer.WriterListener() { + long size = inputStreamIndexInput.actualSizeToRead(); + InputStream inputStream; + if (snapshotRateLimiter != null) { + inputStream = new RateLimitingInputStream(inputStreamIndexInput, snapshotRateLimiter, snapshotThrottleListener); + } else { + inputStream = inputStreamIndexInput; + } + blobContainer.writeBlob(fileInfo.partName(i), inputStream, size, new ImmutableBlobContainer.WriterListener() { @Override public void onCompleted() { IOUtils.closeWhileHandlingException(fIndexInput); @@ -683,6 +711,9 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements public synchronized void onPartial(byte[] data, int offset, int size) throws IOException { recoveryStatus.index().addCurrentFilesSize(size); indexOutput.writeBytes(data, offset, size); + if (restoreRateLimiter != null) { + rateLimiterListener.onRestorePause(restoreRateLimiter.pause(size)); + } } @Override @@ -720,4 +751,10 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements } + public interface RateLimiterListener { + void onRestorePause(long nanos); + + void onSnapshotPause(long nanos); + } + } diff --git a/src/main/java/org/elasticsearch/index/snapshots/blobstore/RateLimitingInputStream.java b/src/main/java/org/elasticsearch/index/snapshots/blobstore/RateLimitingInputStream.java new file mode 100644 index 00000000000..71d1e05a13a --- /dev/null +++ b/src/main/java/org/elasticsearch/index/snapshots/blobstore/RateLimitingInputStream.java @@ -0,0 +1,101 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.snapshots.blobstore; + +import org.apache.lucene.store.RateLimiter; + +import java.io.IOException; +import java.io.InputStream; + +/** + * Rate limiting wrapper for InputStream + */ +public class RateLimitingInputStream extends InputStream { + + private final InputStream delegate; + + private final RateLimiter rateLimiter; + + private final Listener listener; + + public interface Listener { + void onPause(long nanos); + } + + public RateLimitingInputStream(InputStream delegate, RateLimiter rateLimiter, Listener listener) { + this.delegate = delegate; + this.rateLimiter = rateLimiter; + this.listener = listener; + } + + @Override + public int read() throws IOException { + int b = delegate.read(); + long pause = rateLimiter.pause(1); + if (pause > 0) { + listener.onPause(pause); + } + return b; + } + + @Override + public int read(byte[] b) throws IOException { + return read(b, 0, b.length); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + int n = delegate.read(b, off, len); + if (n > 0) { + listener.onPause(rateLimiter.pause(n)); + } + return n; + } + + @Override + public long skip(long n) throws IOException { + return delegate.skip(n); + } + + @Override + public int available() throws IOException { + return delegate.available(); + } + + @Override + public void close() throws IOException { + delegate.close(); + } + + @Override + public void mark(int readlimit) { + delegate.mark(readlimit); + } + + @Override + public void reset() throws IOException { + delegate.reset(); + } + + @Override + public boolean markSupported() { + return delegate.markSupported(); + } +} diff --git a/src/main/java/org/elasticsearch/repositories/Repository.java b/src/main/java/org/elasticsearch/repositories/Repository.java index b6864f8c4e5..44c87626211 100644 --- a/src/main/java/org/elasticsearch/repositories/Repository.java +++ b/src/main/java/org/elasticsearch/repositories/Repository.java @@ -97,4 +97,16 @@ public interface Repository extends LifecycleComponent { * @param snapshotId snapshot id */ void deleteSnapshot(SnapshotId snapshotId); + + /** + * Returns snapshot throttle time in nanoseconds + */ + long snapshotThrottleTimeInNanos(); + + /** + * Returns restore throttle time in nanoseconds + */ + long restoreThrottleTimeInNanos(); + + } diff --git a/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index b5f05837a04..5e663c25dbb 100644 --- a/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -22,6 +22,7 @@ package org.elasticsearch.repositories.blobstore; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; +import org.apache.lucene.store.RateLimiter; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -35,11 +36,14 @@ import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.compress.CompressorFactory; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.metrics.CounterMetric; +import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.xcontent.*; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.snapshots.IndexShardRepository; import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardRepository; +import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardRepository.RateLimiterListener; import org.elasticsearch.repositories.Repository; import org.elasticsearch.repositories.RepositoryException; import org.elasticsearch.repositories.RepositorySettings; @@ -95,7 +99,7 @@ import static com.google.common.collect.Lists.newArrayList; * } * */ -public abstract class BlobStoreRepository extends AbstractLifecycleComponent implements Repository { +public abstract class BlobStoreRepository extends AbstractLifecycleComponent implements Repository, RateLimiterListener { private ImmutableBlobContainer snapshotsBlobContainer; @@ -111,6 +115,15 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent globalOnlyParams = Maps.newHashMap(); globalOnlyParams.put(MetaData.GLOBAL_PERSISTENT_ONLY_PARAM, "true"); globalOnlyFormatParams = new ToXContent.MapParams(globalOnlyParams); + snapshotRateLimiter = getRateLimiter(repositorySettings, "max_snapshot_bytes_per_sec", new ByteSizeValue(20, ByteSizeUnit.MB)); + restoreRateLimiter = getRateLimiter(repositorySettings, "max_restore_bytes_per_sec", new ByteSizeValue(20, ByteSizeUnit.MB)); } /** @@ -133,7 +148,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent creating repository"); + File repositoryLocation = newTempDir(LifecycleScope.SUITE); + boolean throttleSnapshot = randomBoolean(); + boolean throttleRestore = randomBoolean(); + PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo") + .setType("fs").setSettings(ImmutableSettings.settingsBuilder() + .put("location", repositoryLocation) + .put("compress", randomBoolean()) + .put("chunk_size", randomIntBetween(100, 1000)) + .put("max_restore_bytes_per_sec", throttleRestore ? "2.5k" : "0") + .put("max_snapshot_bytes_per_sec", throttleSnapshot ? "2.5k" : "0") + ).get(); + assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true)); + + createIndex("test-idx"); + ensureGreen(); + + logger.info("--> indexing some data"); + for (int i = 0; i < 100; i++) { + index("test-idx", "doc", Integer.toString(i), "foo", "bar" + i); + } + refresh(); + assertThat(client.prepareCount("test-idx").get().getCount(), equalTo(100L)); + + logger.info("--> snapshot"); + CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(true).setIndices("test-idx").get(); + assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0)); + assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponse.getSnapshotInfo().totalShards())); + + logger.info("--> delete index"); + wipeIndices("test-idx"); + + logger.info("--> restore index"); + RestoreSnapshotResponse restoreSnapshotResponse = client.admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap").setWaitForCompletion(true).execute().actionGet(); + assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0)); + + ensureGreen(); + assertThat(client.prepareCount("test-idx").get().getCount(), equalTo(100L)); + + long snapshotPause = 0L; + long restorePause = 0L; + for (RepositoriesService repositoriesService : cluster().getInstances(RepositoriesService.class)) { + snapshotPause += repositoriesService.repository("test-repo").snapshotThrottleTimeInNanos(); + restorePause += repositoriesService.repository("test-repo").restoreThrottleTimeInNanos(); + } + + if (throttleSnapshot) { + assertThat(snapshotPause, greaterThan(0L)); + } else { + assertThat(snapshotPause, equalTo(0L)); + } + + if (throttleRestore) { + assertThat(restorePause, greaterThan(0L)); + } else { + assertThat(restorePause, equalTo(0L)); + } + } + private boolean waitForIndex(String index, TimeValue timeout) throws InterruptedException { long start = System.currentTimeMillis(); while (System.currentTimeMillis() - start < timeout.millis()) {