parent
0541456b34
commit
2755eecf65
|
@ -69,6 +69,8 @@ on all data and master nodes. The following settings are supported:
|
|||
`concurrent_streams`:: Throttles the number of streams (per node) preforming snapshot operation. Defaults to `5`
|
||||
`chunk_size`:: Big files can be broken down into chunks during snapshotting if needed. The chunk size can be specified in bytes or by
|
||||
using size value notation, i.e. 1g, 10m, 5k. Defaults to `null` (unlimited chunk size).
|
||||
`max_restore_bytes_per_sec`:: Throttles per node restore rate. Defaults to `20mb` per second.
|
||||
`max_snapshot_bytes_per_sec`:: Throttles per node snapshot rate. Defaults to `20mb` per second.
|
||||
|
||||
|
||||
[float]
|
||||
|
|
|
@ -24,6 +24,7 @@ import com.google.common.collect.Lists;
|
|||
import org.apache.lucene.store.IOContext;
|
||||
import org.apache.lucene.store.IndexInput;
|
||||
import org.apache.lucene.store.IndexOutput;
|
||||
import org.apache.lucene.store.RateLimiter;
|
||||
import org.apache.lucene.util.IOUtils;
|
||||
import org.elasticsearch.cluster.metadata.SnapshotId;
|
||||
import org.elasticsearch.common.blobstore.*;
|
||||
|
@ -47,6 +48,7 @@ import org.elasticsearch.indices.IndicesService;
|
|||
import org.elasticsearch.repositories.RepositoryName;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
|
@ -71,6 +73,14 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
|
|||
|
||||
private final IndicesService indicesService;
|
||||
|
||||
private RateLimiter snapshotRateLimiter;
|
||||
|
||||
private RateLimiter restoreRateLimiter;
|
||||
|
||||
private RateLimiterListener rateLimiterListener;
|
||||
|
||||
private RateLimitingInputStream.Listener snapshotThrottleListener;
|
||||
|
||||
private static final String SNAPSHOT_PREFIX = "snapshot-";
|
||||
|
||||
@Inject
|
||||
|
@ -87,10 +97,21 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
|
|||
* @param basePath base path to blob store
|
||||
* @param chunkSize chunk size
|
||||
*/
|
||||
public void initialize(BlobStore blobStore, BlobPath basePath, ByteSizeValue chunkSize) {
|
||||
public void initialize(BlobStore blobStore, BlobPath basePath, ByteSizeValue chunkSize,
|
||||
RateLimiter snapshotRateLimiter, RateLimiter restoreRateLimiter,
|
||||
final RateLimiterListener rateLimiterListener) {
|
||||
this.blobStore = blobStore;
|
||||
this.basePath = basePath;
|
||||
this.chunkSize = chunkSize;
|
||||
this.snapshotRateLimiter = snapshotRateLimiter;
|
||||
this.restoreRateLimiter = restoreRateLimiter;
|
||||
this.rateLimiterListener = rateLimiterListener;
|
||||
this.snapshotThrottleListener = new RateLimitingInputStream.Listener() {
|
||||
@Override
|
||||
public void onPause(long nanos) {
|
||||
rateLimiterListener.onSnapshotPause(nanos);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -469,10 +490,17 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
|
|||
try {
|
||||
indexInput = store.openInputRaw(fileInfo.physicalName(), IOContext.READONCE);
|
||||
indexInput.seek(i * fileInfo.partBytes());
|
||||
InputStreamIndexInput is = new ThreadSafeInputStreamIndexInput(indexInput, fileInfo.partBytes());
|
||||
InputStreamIndexInput inputStreamIndexInput = new ThreadSafeInputStreamIndexInput(indexInput, fileInfo.partBytes());
|
||||
|
||||
final IndexInput fIndexInput = indexInput;
|
||||
blobContainer.writeBlob(fileInfo.partName(i), is, is.actualSizeToRead(), new ImmutableBlobContainer.WriterListener() {
|
||||
long size = inputStreamIndexInput.actualSizeToRead();
|
||||
InputStream inputStream;
|
||||
if (snapshotRateLimiter != null) {
|
||||
inputStream = new RateLimitingInputStream(inputStreamIndexInput, snapshotRateLimiter, snapshotThrottleListener);
|
||||
} else {
|
||||
inputStream = inputStreamIndexInput;
|
||||
}
|
||||
blobContainer.writeBlob(fileInfo.partName(i), inputStream, size, new ImmutableBlobContainer.WriterListener() {
|
||||
@Override
|
||||
public void onCompleted() {
|
||||
IOUtils.closeWhileHandlingException(fIndexInput);
|
||||
|
@ -683,6 +711,9 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
|
|||
public synchronized void onPartial(byte[] data, int offset, int size) throws IOException {
|
||||
recoveryStatus.index().addCurrentFilesSize(size);
|
||||
indexOutput.writeBytes(data, offset, size);
|
||||
if (restoreRateLimiter != null) {
|
||||
rateLimiterListener.onRestorePause(restoreRateLimiter.pause(size));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -720,4 +751,10 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
|
|||
|
||||
}
|
||||
|
||||
public interface RateLimiterListener {
|
||||
void onRestorePause(long nanos);
|
||||
|
||||
void onSnapshotPause(long nanos);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,101 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.index.snapshots.blobstore;
|
||||
|
||||
import org.apache.lucene.store.RateLimiter;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
|
||||
/**
|
||||
* Rate limiting wrapper for InputStream
|
||||
*/
|
||||
public class RateLimitingInputStream extends InputStream {
|
||||
|
||||
private final InputStream delegate;
|
||||
|
||||
private final RateLimiter rateLimiter;
|
||||
|
||||
private final Listener listener;
|
||||
|
||||
public interface Listener {
|
||||
void onPause(long nanos);
|
||||
}
|
||||
|
||||
public RateLimitingInputStream(InputStream delegate, RateLimiter rateLimiter, Listener listener) {
|
||||
this.delegate = delegate;
|
||||
this.rateLimiter = rateLimiter;
|
||||
this.listener = listener;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int read() throws IOException {
|
||||
int b = delegate.read();
|
||||
long pause = rateLimiter.pause(1);
|
||||
if (pause > 0) {
|
||||
listener.onPause(pause);
|
||||
}
|
||||
return b;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int read(byte[] b) throws IOException {
|
||||
return read(b, 0, b.length);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int read(byte[] b, int off, int len) throws IOException {
|
||||
int n = delegate.read(b, off, len);
|
||||
if (n > 0) {
|
||||
listener.onPause(rateLimiter.pause(n));
|
||||
}
|
||||
return n;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long skip(long n) throws IOException {
|
||||
return delegate.skip(n);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int available() throws IOException {
|
||||
return delegate.available();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
delegate.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void mark(int readlimit) {
|
||||
delegate.mark(readlimit);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void reset() throws IOException {
|
||||
delegate.reset();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean markSupported() {
|
||||
return delegate.markSupported();
|
||||
}
|
||||
}
|
|
@ -97,4 +97,16 @@ public interface Repository extends LifecycleComponent<Repository> {
|
|||
* @param snapshotId snapshot id
|
||||
*/
|
||||
void deleteSnapshot(SnapshotId snapshotId);
|
||||
|
||||
/**
|
||||
* Returns snapshot throttle time in nanoseconds
|
||||
*/
|
||||
long snapshotThrottleTimeInNanos();
|
||||
|
||||
/**
|
||||
* Returns restore throttle time in nanoseconds
|
||||
*/
|
||||
long restoreThrottleTimeInNanos();
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -22,6 +22,7 @@ package org.elasticsearch.repositories.blobstore;
|
|||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Maps;
|
||||
import org.apache.lucene.store.RateLimiter;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.ElasticsearchParseException;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
|
@ -35,11 +36,14 @@ import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
|||
import org.elasticsearch.common.compress.CompressorFactory;
|
||||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.metrics.CounterMetric;
|
||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.xcontent.*;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.index.snapshots.IndexShardRepository;
|
||||
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardRepository;
|
||||
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardRepository.RateLimiterListener;
|
||||
import org.elasticsearch.repositories.Repository;
|
||||
import org.elasticsearch.repositories.RepositoryException;
|
||||
import org.elasticsearch.repositories.RepositorySettings;
|
||||
|
@ -95,7 +99,7 @@ import static com.google.common.collect.Lists.newArrayList;
|
|||
* }
|
||||
* </pre>
|
||||
*/
|
||||
public abstract class BlobStoreRepository extends AbstractLifecycleComponent<Repository> implements Repository {
|
||||
public abstract class BlobStoreRepository extends AbstractLifecycleComponent<Repository> implements Repository, RateLimiterListener {
|
||||
|
||||
private ImmutableBlobContainer snapshotsBlobContainer;
|
||||
|
||||
|
@ -111,6 +115,15 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent<Rep
|
|||
|
||||
private final ToXContent.Params globalOnlyFormatParams;
|
||||
|
||||
private final RateLimiter snapshotRateLimiter;
|
||||
|
||||
private final RateLimiter restoreRateLimiter;
|
||||
|
||||
private final CounterMetric snapshotRateLimitingTimeInNanos = new CounterMetric();
|
||||
|
||||
private final CounterMetric restoreRateLimitingTimeInNanos = new CounterMetric();
|
||||
|
||||
|
||||
/**
|
||||
* Constructs new BlobStoreRepository
|
||||
*
|
||||
|
@ -125,6 +138,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent<Rep
|
|||
Map<String, String> globalOnlyParams = Maps.newHashMap();
|
||||
globalOnlyParams.put(MetaData.GLOBAL_PERSISTENT_ONLY_PARAM, "true");
|
||||
globalOnlyFormatParams = new ToXContent.MapParams(globalOnlyParams);
|
||||
snapshotRateLimiter = getRateLimiter(repositorySettings, "max_snapshot_bytes_per_sec", new ByteSizeValue(20, ByteSizeUnit.MB));
|
||||
restoreRateLimiter = getRateLimiter(repositorySettings, "max_restore_bytes_per_sec", new ByteSizeValue(20, ByteSizeUnit.MB));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -133,7 +148,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent<Rep
|
|||
@Override
|
||||
protected void doStart() throws ElasticsearchException {
|
||||
this.snapshotsBlobContainer = blobStore().immutableBlobContainer(basePath());
|
||||
indexShardRepository.initialize(blobStore(), basePath(), chunkSize());
|
||||
indexShardRepository.initialize(blobStore(), basePath(), chunkSize(), snapshotRateLimiter, restoreRateLimiter, this);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -394,6 +409,23 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent<Rep
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Configures RateLimiter based on repository and global settings
|
||||
* @param repositorySettings repository settings
|
||||
* @param setting setting to use to configure rate limiter
|
||||
* @param defaultRate default limiting rate
|
||||
* @return rate limiter or null of no throttling is needed
|
||||
*/
|
||||
private RateLimiter getRateLimiter(RepositorySettings repositorySettings, String setting, ByteSizeValue defaultRate) {
|
||||
ByteSizeValue maxSnapshotBytesPerSec = repositorySettings.settings().getAsBytesSize(setting,
|
||||
componentSettings.getAsBytesSize(setting, defaultRate));
|
||||
if (maxSnapshotBytesPerSec.bytes() <= 0) {
|
||||
return null;
|
||||
} else {
|
||||
return new RateLimiter.SimpleRateLimiter(maxSnapshotBytesPerSec.mbFrac());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Parses JSON containing snapshot description
|
||||
*
|
||||
|
@ -574,4 +606,23 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent<Rep
|
|||
return ImmutableList.copyOf(snapshots);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onRestorePause(long nanos) {
|
||||
restoreRateLimitingTimeInNanos.inc(nanos);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onSnapshotPause(long nanos) {
|
||||
snapshotRateLimitingTimeInNanos.inc(nanos);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long snapshotThrottleTimeInNanos() {
|
||||
return snapshotRateLimitingTimeInNanos.count();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long restoreThrottleTimeInNanos() {
|
||||
return restoreRateLimitingTimeInNanos.count();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -39,11 +39,10 @@ import org.elasticsearch.common.settings.ImmutableSettings;
|
|||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.indices.InvalidIndexNameException;
|
||||
import org.elasticsearch.repositories.RepositoriesService;
|
||||
import org.elasticsearch.snapshots.mockstore.MockRepositoryModule;
|
||||
import org.elasticsearch.test.junit.annotations.TestLogging;
|
||||
import org.elasticsearch.test.store.MockDirectoryHelper;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.File;
|
||||
|
@ -882,6 +881,69 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests {
|
|||
assertThat(getSnapshotsResponse.getSnapshots().size(), equalTo(0));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void throttlingTest() throws Exception {
|
||||
Client client = client();
|
||||
|
||||
logger.info("--> creating repository");
|
||||
File repositoryLocation = newTempDir(LifecycleScope.SUITE);
|
||||
boolean throttleSnapshot = randomBoolean();
|
||||
boolean throttleRestore = randomBoolean();
|
||||
PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo")
|
||||
.setType("fs").setSettings(ImmutableSettings.settingsBuilder()
|
||||
.put("location", repositoryLocation)
|
||||
.put("compress", randomBoolean())
|
||||
.put("chunk_size", randomIntBetween(100, 1000))
|
||||
.put("max_restore_bytes_per_sec", throttleRestore ? "2.5k" : "0")
|
||||
.put("max_snapshot_bytes_per_sec", throttleSnapshot ? "2.5k" : "0")
|
||||
).get();
|
||||
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
|
||||
|
||||
createIndex("test-idx");
|
||||
ensureGreen();
|
||||
|
||||
logger.info("--> indexing some data");
|
||||
for (int i = 0; i < 100; i++) {
|
||||
index("test-idx", "doc", Integer.toString(i), "foo", "bar" + i);
|
||||
}
|
||||
refresh();
|
||||
assertThat(client.prepareCount("test-idx").get().getCount(), equalTo(100L));
|
||||
|
||||
logger.info("--> snapshot");
|
||||
CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(true).setIndices("test-idx").get();
|
||||
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0));
|
||||
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()));
|
||||
|
||||
logger.info("--> delete index");
|
||||
wipeIndices("test-idx");
|
||||
|
||||
logger.info("--> restore index");
|
||||
RestoreSnapshotResponse restoreSnapshotResponse = client.admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap").setWaitForCompletion(true).execute().actionGet();
|
||||
assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0));
|
||||
|
||||
ensureGreen();
|
||||
assertThat(client.prepareCount("test-idx").get().getCount(), equalTo(100L));
|
||||
|
||||
long snapshotPause = 0L;
|
||||
long restorePause = 0L;
|
||||
for (RepositoriesService repositoriesService : cluster().getInstances(RepositoriesService.class)) {
|
||||
snapshotPause += repositoriesService.repository("test-repo").snapshotThrottleTimeInNanos();
|
||||
restorePause += repositoriesService.repository("test-repo").restoreThrottleTimeInNanos();
|
||||
}
|
||||
|
||||
if (throttleSnapshot) {
|
||||
assertThat(snapshotPause, greaterThan(0L));
|
||||
} else {
|
||||
assertThat(snapshotPause, equalTo(0L));
|
||||
}
|
||||
|
||||
if (throttleRestore) {
|
||||
assertThat(restorePause, greaterThan(0L));
|
||||
} else {
|
||||
assertThat(restorePause, equalTo(0L));
|
||||
}
|
||||
}
|
||||
|
||||
private boolean waitForIndex(String index, TimeValue timeout) throws InterruptedException {
|
||||
long start = System.currentTimeMillis();
|
||||
while (System.currentTimeMillis() - start < timeout.millis()) {
|
||||
|
|
Loading…
Reference in New Issue