Restore chunksize of 512kb on recovery and remove configurability
This commit restores the chunk size of 512kb lost in a previous but unreleased refactoring. At the same time it removes the configurability of: * `indices.recovery.file_chunk_size` - now fixed to 512kb * `indices.recovery.translog_ops` - removed without replacement * `indices.recovery.translog_size` - now fixed to 512kb * `indices.recovery.compress` - file chunks are not compressed due to lucene's compression but translog operations are. The compress option is gone entirely and compression is used where it makes sense. On sending files of the index we don't compress as we rely on the lucene compression for stored fields etc. Relates to #15161
This commit is contained in:
parent
0809e4a65f
commit
414c04eb66
|
@ -164,10 +164,6 @@ public class ClusterModule extends AbstractModule {
|
|||
registerClusterDynamicSetting(IndicesTTLService.INDICES_TTL_INTERVAL, Validator.TIME);
|
||||
registerClusterDynamicSetting(MappingUpdatedAction.INDICES_MAPPING_DYNAMIC_TIMEOUT, Validator.TIME);
|
||||
registerClusterDynamicSetting(MetaData.SETTING_READ_ONLY, Validator.EMPTY);
|
||||
registerClusterDynamicSetting(RecoverySettings.INDICES_RECOVERY_FILE_CHUNK_SIZE, Validator.POSITIVE_BYTES_SIZE);
|
||||
registerClusterDynamicSetting(RecoverySettings.INDICES_RECOVERY_TRANSLOG_OPS, Validator.INTEGER);
|
||||
registerClusterDynamicSetting(RecoverySettings.INDICES_RECOVERY_TRANSLOG_SIZE, Validator.BYTES_SIZE);
|
||||
registerClusterDynamicSetting(RecoverySettings.INDICES_RECOVERY_COMPRESS, Validator.EMPTY);
|
||||
registerClusterDynamicSetting(RecoverySettings.INDICES_RECOVERY_CONCURRENT_STREAMS, Validator.POSITIVE_INTEGER);
|
||||
registerClusterDynamicSetting(RecoverySettings.INDICES_RECOVERY_CONCURRENT_SMALL_FILE_STREAMS, Validator.POSITIVE_INTEGER);
|
||||
registerClusterDynamicSetting(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC, Validator.BYTES_SIZE);
|
||||
|
|
|
@ -746,8 +746,6 @@ public class MetaData implements Iterable<IndexMetaData>, Diffable<MetaData>, Fr
|
|||
/** All known byte-sized cluster settings. */
|
||||
public static final Set<String> CLUSTER_BYTES_SIZE_SETTINGS = unmodifiableSet(newHashSet(
|
||||
IndexStoreConfig.INDICES_STORE_THROTTLE_MAX_BYTES_PER_SEC,
|
||||
RecoverySettings.INDICES_RECOVERY_FILE_CHUNK_SIZE,
|
||||
RecoverySettings.INDICES_RECOVERY_TRANSLOG_SIZE,
|
||||
RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC));
|
||||
|
||||
|
||||
|
|
|
@ -40,10 +40,6 @@ import java.util.concurrent.TimeUnit;
|
|||
*/
|
||||
public class RecoverySettings extends AbstractComponent implements Closeable {
|
||||
|
||||
public static final String INDICES_RECOVERY_FILE_CHUNK_SIZE = "indices.recovery.file_chunk_size";
|
||||
public static final String INDICES_RECOVERY_TRANSLOG_OPS = "indices.recovery.translog_ops";
|
||||
public static final String INDICES_RECOVERY_TRANSLOG_SIZE = "indices.recovery.translog_size";
|
||||
public static final String INDICES_RECOVERY_COMPRESS = "indices.recovery.compress";
|
||||
public static final String INDICES_RECOVERY_CONCURRENT_STREAMS = "indices.recovery.concurrent_streams";
|
||||
public static final String INDICES_RECOVERY_CONCURRENT_SMALL_FILE_STREAMS = "indices.recovery.concurrent_small_file_streams";
|
||||
public static final String INDICES_RECOVERY_MAX_BYTES_PER_SEC = "indices.recovery.max_bytes_per_sec";
|
||||
|
@ -75,12 +71,6 @@ public class RecoverySettings extends AbstractComponent implements Closeable {
|
|||
|
||||
public static final long SMALL_FILE_CUTOFF_BYTES = ByteSizeValue.parseBytesSizeValue("5mb", "SMALL_FILE_CUTOFF_BYTES").bytes();
|
||||
|
||||
private volatile ByteSizeValue fileChunkSize;
|
||||
|
||||
private volatile boolean compress;
|
||||
private volatile int translogOps;
|
||||
private volatile ByteSizeValue translogSize;
|
||||
|
||||
private volatile int concurrentStreams;
|
||||
private volatile int concurrentSmallFileStreams;
|
||||
private final ThreadPoolExecutor concurrentStreamPool;
|
||||
|
@ -94,16 +84,10 @@ public class RecoverySettings extends AbstractComponent implements Closeable {
|
|||
private volatile TimeValue internalActionTimeout;
|
||||
private volatile TimeValue internalActionLongTimeout;
|
||||
|
||||
|
||||
@Inject
|
||||
public RecoverySettings(Settings settings, NodeSettingsService nodeSettingsService) {
|
||||
super(settings);
|
||||
|
||||
this.fileChunkSize = settings.getAsBytesSize(INDICES_RECOVERY_FILE_CHUNK_SIZE, new ByteSizeValue(512, ByteSizeUnit.KB));
|
||||
this.translogOps = settings.getAsInt(INDICES_RECOVERY_TRANSLOG_OPS, 1000);
|
||||
this.translogSize = settings.getAsBytesSize(INDICES_RECOVERY_TRANSLOG_SIZE, new ByteSizeValue(512, ByteSizeUnit.KB));
|
||||
this.compress = settings.getAsBoolean(INDICES_RECOVERY_COMPRESS, true);
|
||||
|
||||
this.retryDelayStateSync = settings.getAsTime(INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC, TimeValue.timeValueMillis(500));
|
||||
// doesn't have to be fast as nodes are reconnected every 10s by default (see InternalClusterService.ReconnectToNodes)
|
||||
// and we want to give the master time to remove a faulty node
|
||||
|
@ -132,8 +116,8 @@ public class RecoverySettings extends AbstractComponent implements Closeable {
|
|||
rateLimiter = new SimpleRateLimiter(maxBytesPerSec.mbFrac());
|
||||
}
|
||||
|
||||
logger.debug("using max_bytes_per_sec[{}], concurrent_streams [{}], file_chunk_size [{}], translog_size [{}], translog_ops [{}], and compress [{}]",
|
||||
maxBytesPerSec, concurrentStreams, fileChunkSize, translogSize, translogOps, compress);
|
||||
logger.debug("using max_bytes_per_sec[{}], concurrent_streams [{}], file_chunk_size [{}], translog_size [{}]",
|
||||
maxBytesPerSec, concurrentStreams);
|
||||
|
||||
nodeSettingsService.addListener(new ApplySettings());
|
||||
}
|
||||
|
@ -144,26 +128,6 @@ public class RecoverySettings extends AbstractComponent implements Closeable {
|
|||
ThreadPool.terminate(concurrentSmallFileStreamPool, 1, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
public ByteSizeValue fileChunkSize() {
|
||||
return fileChunkSize;
|
||||
}
|
||||
|
||||
public boolean compress() {
|
||||
return compress;
|
||||
}
|
||||
|
||||
public int translogOps() {
|
||||
return translogOps;
|
||||
}
|
||||
|
||||
public ByteSizeValue translogSize() {
|
||||
return translogSize;
|
||||
}
|
||||
|
||||
public int concurrentStreams() {
|
||||
return concurrentStreams;
|
||||
}
|
||||
|
||||
public ThreadPoolExecutor concurrentStreamPool() {
|
||||
return concurrentStreamPool;
|
||||
}
|
||||
|
@ -213,30 +177,6 @@ public class RecoverySettings extends AbstractComponent implements Closeable {
|
|||
}
|
||||
}
|
||||
|
||||
ByteSizeValue fileChunkSize = settings.getAsBytesSize(INDICES_RECOVERY_FILE_CHUNK_SIZE, RecoverySettings.this.fileChunkSize);
|
||||
if (!fileChunkSize.equals(RecoverySettings.this.fileChunkSize)) {
|
||||
logger.info("updating [indices.recovery.file_chunk_size] from [{}] to [{}]", RecoverySettings.this.fileChunkSize, fileChunkSize);
|
||||
RecoverySettings.this.fileChunkSize = fileChunkSize;
|
||||
}
|
||||
|
||||
int translogOps = settings.getAsInt(INDICES_RECOVERY_TRANSLOG_OPS, RecoverySettings.this.translogOps);
|
||||
if (translogOps != RecoverySettings.this.translogOps) {
|
||||
logger.info("updating [indices.recovery.translog_ops] from [{}] to [{}]", RecoverySettings.this.translogOps, translogOps);
|
||||
RecoverySettings.this.translogOps = translogOps;
|
||||
}
|
||||
|
||||
ByteSizeValue translogSize = settings.getAsBytesSize(INDICES_RECOVERY_TRANSLOG_SIZE, RecoverySettings.this.translogSize);
|
||||
if (!translogSize.equals(RecoverySettings.this.translogSize)) {
|
||||
logger.info("updating [indices.recovery.translog_size] from [{}] to [{}]", RecoverySettings.this.translogSize, translogSize);
|
||||
RecoverySettings.this.translogSize = translogSize;
|
||||
}
|
||||
|
||||
boolean compress = settings.getAsBoolean(INDICES_RECOVERY_COMPRESS, RecoverySettings.this.compress);
|
||||
if (compress != RecoverySettings.this.compress) {
|
||||
logger.info("updating [indices.recovery.compress] from [{}] to [{}]", RecoverySettings.this.compress, compress);
|
||||
RecoverySettings.this.compress = compress;
|
||||
}
|
||||
|
||||
int concurrentStreams = settings.getAsInt(INDICES_RECOVERY_CONCURRENT_STREAMS, RecoverySettings.this.concurrentStreams);
|
||||
if (concurrentStreams != RecoverySettings.this.concurrentStreams) {
|
||||
logger.info("updating [indices.recovery.concurrent_streams] from [{}] to [{}]", RecoverySettings.this.concurrentStreams, concurrentStreams);
|
||||
|
|
|
@ -49,6 +49,7 @@ import org.elasticsearch.transport.RemoteTransportException;
|
|||
import org.elasticsearch.transport.TransportRequestOptions;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
import java.io.BufferedOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.util.ArrayList;
|
||||
|
@ -68,6 +69,7 @@ import java.util.stream.StreamSupport;
|
|||
*/
|
||||
public class RecoverySourceHandler {
|
||||
|
||||
private static final int CHUNK_SIZE = 512 * 1000; // 512KB
|
||||
protected final ESLogger logger;
|
||||
// Shard that is going to be recovered (the "source")
|
||||
private final IndexShard shard;
|
||||
|
@ -79,7 +81,6 @@ public class RecoverySourceHandler {
|
|||
private final TransportService transportService;
|
||||
|
||||
protected final RecoveryResponse response;
|
||||
private final TransportRequestOptions requestOptions;
|
||||
|
||||
private final CancellableThreads cancellableThreads = new CancellableThreads() {
|
||||
@Override
|
||||
|
@ -108,12 +109,6 @@ public class RecoverySourceHandler {
|
|||
this.shardId = this.request.shardId().id();
|
||||
|
||||
this.response = new RecoveryResponse();
|
||||
this.requestOptions = TransportRequestOptions.builder()
|
||||
.withCompress(recoverySettings.compress())
|
||||
.withType(TransportRequestOptions.Type.RECOVERY)
|
||||
.withTimeout(recoverySettings.internalActionTimeout())
|
||||
.build();
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -218,7 +213,7 @@ public class RecoverySourceHandler {
|
|||
totalSize += md.length();
|
||||
}
|
||||
List<StoreFileMetaData> phase1Files = new ArrayList<>(diff.different.size() + diff.missing.size());
|
||||
phase1Files.addAll(diff.different);
|
||||
phase1Files.addAll(diff.different);
|
||||
phase1Files.addAll(diff.missing);
|
||||
for (StoreFileMetaData md : phase1Files) {
|
||||
if (request.metadataSnapshot().asMap().containsKey(md.name())) {
|
||||
|
@ -249,7 +244,7 @@ public class RecoverySourceHandler {
|
|||
});
|
||||
// How many bytes we've copied since we last called RateLimiter.pause
|
||||
final AtomicLong bytesSinceLastPause = new AtomicLong();
|
||||
final Function<StoreFileMetaData, OutputStream> outputStreamFactories = (md) -> new RecoveryOutputStream(md, bytesSinceLastPause, translogView);
|
||||
final Function<StoreFileMetaData, OutputStream> outputStreamFactories = (md) -> new BufferedOutputStream(new RecoveryOutputStream(md, bytesSinceLastPause, translogView), CHUNK_SIZE);
|
||||
sendFiles(store, phase1Files.toArray(new StoreFileMetaData[phase1Files.size()]), outputStreamFactories);
|
||||
cancellableThreads.execute(() -> {
|
||||
// Send the CLEAN_FILES request, which takes all of the files that
|
||||
|
@ -432,7 +427,7 @@ public class RecoverySourceHandler {
|
|||
}
|
||||
|
||||
final TransportRequestOptions recoveryOptions = TransportRequestOptions.builder()
|
||||
.withCompress(recoverySettings.compress())
|
||||
.withCompress(true)
|
||||
.withType(TransportRequestOptions.Type.RECOVERY)
|
||||
.withTimeout(recoverySettings.internalActionLongTimeout())
|
||||
.build();
|
||||
|
@ -451,9 +446,9 @@ public class RecoverySourceHandler {
|
|||
size += operation.estimateSize();
|
||||
totalOperations++;
|
||||
|
||||
// Check if this request is past the size or bytes threshold, and
|
||||
// Check if this request is past bytes threshold, and
|
||||
// if so, send it off
|
||||
if (ops >= recoverySettings.translogOps() || size >= recoverySettings.translogSize().bytes()) {
|
||||
if (size >= CHUNK_SIZE) {
|
||||
|
||||
// don't throttle translog, since we lock for phase3 indexing,
|
||||
// so we need to move it as fast as possible. Note, since we
|
||||
|
@ -548,6 +543,11 @@ public class RecoverySourceHandler {
|
|||
}
|
||||
|
||||
private void sendNextChunk(long position, BytesArray content, boolean lastChunk) throws IOException {
|
||||
final TransportRequestOptions chunkSendOptions = TransportRequestOptions.builder()
|
||||
.withCompress(false)
|
||||
.withType(TransportRequestOptions.Type.RECOVERY)
|
||||
.withTimeout(recoverySettings.internalActionTimeout())
|
||||
.build();
|
||||
cancellableThreads.execute(() -> {
|
||||
// Pause using the rate limiter, if desired, to throttle the recovery
|
||||
final long throttleTimeInNanos;
|
||||
|
@ -577,7 +577,7 @@ public class RecoverySourceHandler {
|
|||
* see how many translog ops we accumulate while copying files across the network. A future optimization
|
||||
* would be in to restart file copy again (new deltas) if we have too many translog ops are piling up.
|
||||
*/
|
||||
throttleTimeInNanos), requestOptions, EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
|
||||
throttleTimeInNanos), chunkSendOptions, EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
|
||||
});
|
||||
if (shard.state() == IndexShardState.CLOSED) { // check if the shard got closed on us
|
||||
throw new IndexShardClosedException(request.shardId());
|
||||
|
|
|
@ -143,7 +143,6 @@ public class IndexRecoveryIT extends ESIntegTestCase {
|
|||
.setTransientSettings(Settings.builder()
|
||||
// one chunk per sec..
|
||||
.put(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC, chunkSize, ByteSizeUnit.BYTES)
|
||||
.put(RecoverySettings.INDICES_RECOVERY_FILE_CHUNK_SIZE, chunkSize, ByteSizeUnit.BYTES)
|
||||
)
|
||||
.get().isAcknowledged());
|
||||
}
|
||||
|
@ -152,7 +151,6 @@ public class IndexRecoveryIT extends ESIntegTestCase {
|
|||
assertTrue(client().admin().cluster().prepareUpdateSettings()
|
||||
.setTransientSettings(Settings.builder()
|
||||
.put(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC, "20mb")
|
||||
.put(RecoverySettings.INDICES_RECOVERY_FILE_CHUNK_SIZE, "512kb")
|
||||
)
|
||||
.get().isAcknowledged());
|
||||
}
|
||||
|
|
|
@ -32,24 +32,6 @@ public class RecoverySettingsTests extends ESSingleNodeTestCase {
|
|||
}
|
||||
|
||||
public void testAllSettingsAreDynamicallyUpdatable() {
|
||||
innerTestSettings(RecoverySettings.INDICES_RECOVERY_FILE_CHUNK_SIZE, randomIntBetween(1, 200), ByteSizeUnit.BYTES, new Validator() {
|
||||
@Override
|
||||
public void validate(RecoverySettings recoverySettings, int expectedValue) {
|
||||
assertEquals(expectedValue, recoverySettings.fileChunkSize().bytesAsInt());
|
||||
}
|
||||
});
|
||||
innerTestSettings(RecoverySettings.INDICES_RECOVERY_TRANSLOG_OPS, randomIntBetween(1, 200), new Validator() {
|
||||
@Override
|
||||
public void validate(RecoverySettings recoverySettings, int expectedValue) {
|
||||
assertEquals(expectedValue, recoverySettings.translogOps());
|
||||
}
|
||||
});
|
||||
innerTestSettings(RecoverySettings.INDICES_RECOVERY_TRANSLOG_SIZE, randomIntBetween(1, 200), ByteSizeUnit.BYTES, new Validator() {
|
||||
@Override
|
||||
public void validate(RecoverySettings recoverySettings, int expectedValue) {
|
||||
assertEquals(expectedValue, recoverySettings.translogSize().bytesAsInt());
|
||||
}
|
||||
});
|
||||
innerTestSettings(RecoverySettings.INDICES_RECOVERY_CONCURRENT_STREAMS, randomIntBetween(1, 200), new Validator() {
|
||||
@Override
|
||||
public void validate(RecoverySettings recoverySettings, int expectedValue) {
|
||||
|
@ -98,13 +80,6 @@ public class RecoverySettingsTests extends ESSingleNodeTestCase {
|
|||
assertEquals(expectedValue, recoverySettings.internalActionLongTimeout().millis());
|
||||
}
|
||||
});
|
||||
|
||||
innerTestSettings(RecoverySettings.INDICES_RECOVERY_COMPRESS, false, new Validator() {
|
||||
@Override
|
||||
public void validate(RecoverySettings recoverySettings, boolean expectedValue) {
|
||||
assertEquals(expectedValue, recoverySettings.compress());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private static class Validator {
|
||||
|
|
|
@ -58,13 +58,6 @@ import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
|||
@ESIntegTestCase.ClusterScope(numDataNodes = 2, numClientNodes = 0, scope = ESIntegTestCase.Scope.TEST)
|
||||
@SuppressCodecs("*") // test relies on exact file extensions
|
||||
public class TruncatedRecoveryIT extends ESIntegTestCase {
|
||||
@Override
|
||||
protected Settings nodeSettings(int nodeOrdinal) {
|
||||
Settings.Builder builder = Settings.builder()
|
||||
.put(super.nodeSettings(nodeOrdinal))
|
||||
.put(RecoverySettings.INDICES_RECOVERY_FILE_CHUNK_SIZE, new ByteSizeValue(randomIntBetween(50, 300), ByteSizeUnit.BYTES));
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Collection<Class<? extends Plugin>> nodePlugins() {
|
||||
|
|
|
@ -131,16 +131,6 @@ public abstract class ESBackcompatTestCase extends ESIntegTestCase {
|
|||
return file;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Settings.Builder setRandomIndexSettings(Random random, Settings.Builder builder) {
|
||||
if (globalCompatibilityVersion().before(Version.V_1_3_2)) {
|
||||
// if we test against nodes before 1.3.2 we disable all the compression due to a known bug
|
||||
// see #7210
|
||||
builder.put(RecoverySettings.INDICES_RECOVERY_COMPRESS, false);
|
||||
}
|
||||
return builder;
|
||||
}
|
||||
|
||||
/**
|
||||
* Retruns the tests compatibility version.
|
||||
*/
|
||||
|
@ -250,13 +240,6 @@ public abstract class ESBackcompatTestCase extends ESIntegTestCase {
|
|||
Settings.Builder builder = Settings.builder().put(requiredSettings());
|
||||
builder.put(TransportModule.TRANSPORT_TYPE_KEY, "netty"); // run same transport / disco as external
|
||||
builder.put("node.mode", "network");
|
||||
|
||||
if (compatibilityVersion().before(Version.V_1_3_2)) {
|
||||
// if we test against nodes before 1.3.2 we disable all the compression due to a known bug
|
||||
// see #7210
|
||||
builder.put(Transport.TransportSettings.TRANSPORT_TCP_COMPRESS, false)
|
||||
.put(RecoverySettings.INDICES_RECOVERY_COMPRESS, false);
|
||||
}
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
|
|
|
@ -447,10 +447,6 @@ public final class InternalTestCluster extends TestCluster {
|
|||
}
|
||||
}
|
||||
|
||||
if (random.nextBoolean()) {
|
||||
builder.put(RecoverySettings.INDICES_RECOVERY_COMPRESS, random.nextBoolean());
|
||||
}
|
||||
|
||||
if (random.nextBoolean()) {
|
||||
builder.put(NettyTransport.PING_SCHEDULE, RandomInts.randomIntBetween(random, 100, 2000) + "ms");
|
||||
}
|
||||
|
@ -1554,7 +1550,7 @@ public final class InternalTestCluster extends TestCluster {
|
|||
for (int i = 0; i < numNodes; i++) {
|
||||
asyncs.add(startNodeAsync(settings, version));
|
||||
}
|
||||
|
||||
|
||||
return () -> {
|
||||
List<String> ids = new ArrayList<>();
|
||||
for (Async<String> async : asyncs) {
|
||||
|
|
Loading…
Reference in New Issue