diff --git a/core/src/main/java/org/elasticsearch/cluster/ClusterModule.java b/core/src/main/java/org/elasticsearch/cluster/ClusterModule.java index b2e793ba0ab..2b7786fdb1d 100644 --- a/core/src/main/java/org/elasticsearch/cluster/ClusterModule.java +++ b/core/src/main/java/org/elasticsearch/cluster/ClusterModule.java @@ -164,10 +164,6 @@ public class ClusterModule extends AbstractModule { registerClusterDynamicSetting(IndicesTTLService.INDICES_TTL_INTERVAL, Validator.TIME); registerClusterDynamicSetting(MappingUpdatedAction.INDICES_MAPPING_DYNAMIC_TIMEOUT, Validator.TIME); registerClusterDynamicSetting(MetaData.SETTING_READ_ONLY, Validator.EMPTY); - registerClusterDynamicSetting(RecoverySettings.INDICES_RECOVERY_FILE_CHUNK_SIZE, Validator.POSITIVE_BYTES_SIZE); - registerClusterDynamicSetting(RecoverySettings.INDICES_RECOVERY_TRANSLOG_OPS, Validator.INTEGER); - registerClusterDynamicSetting(RecoverySettings.INDICES_RECOVERY_TRANSLOG_SIZE, Validator.BYTES_SIZE); - registerClusterDynamicSetting(RecoverySettings.INDICES_RECOVERY_COMPRESS, Validator.EMPTY); registerClusterDynamicSetting(RecoverySettings.INDICES_RECOVERY_CONCURRENT_STREAMS, Validator.POSITIVE_INTEGER); registerClusterDynamicSetting(RecoverySettings.INDICES_RECOVERY_CONCURRENT_SMALL_FILE_STREAMS, Validator.POSITIVE_INTEGER); registerClusterDynamicSetting(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC, Validator.BYTES_SIZE); diff --git a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java index 68fa6e45d88..751f8a09ea5 100644 --- a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java +++ b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java @@ -746,8 +746,6 @@ public class MetaData implements Iterable, Diffable, Fr /** All known byte-sized cluster settings. */ public static final Set CLUSTER_BYTES_SIZE_SETTINGS = unmodifiableSet(newHashSet( IndexStoreConfig.INDICES_STORE_THROTTLE_MAX_BYTES_PER_SEC, - RecoverySettings.INDICES_RECOVERY_FILE_CHUNK_SIZE, - RecoverySettings.INDICES_RECOVERY_TRANSLOG_SIZE, RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC)); diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java index 749ba4f3360..2841049eedc 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java @@ -40,10 +40,6 @@ import java.util.concurrent.TimeUnit; */ public class RecoverySettings extends AbstractComponent implements Closeable { - public static final String INDICES_RECOVERY_FILE_CHUNK_SIZE = "indices.recovery.file_chunk_size"; - public static final String INDICES_RECOVERY_TRANSLOG_OPS = "indices.recovery.translog_ops"; - public static final String INDICES_RECOVERY_TRANSLOG_SIZE = "indices.recovery.translog_size"; - public static final String INDICES_RECOVERY_COMPRESS = "indices.recovery.compress"; public static final String INDICES_RECOVERY_CONCURRENT_STREAMS = "indices.recovery.concurrent_streams"; public static final String INDICES_RECOVERY_CONCURRENT_SMALL_FILE_STREAMS = "indices.recovery.concurrent_small_file_streams"; public static final String INDICES_RECOVERY_MAX_BYTES_PER_SEC = "indices.recovery.max_bytes_per_sec"; @@ -75,12 +71,6 @@ public class RecoverySettings extends AbstractComponent implements Closeable { public static final long SMALL_FILE_CUTOFF_BYTES = ByteSizeValue.parseBytesSizeValue("5mb", "SMALL_FILE_CUTOFF_BYTES").bytes(); - private volatile ByteSizeValue fileChunkSize; - - private volatile boolean compress; - private volatile int translogOps; - private volatile ByteSizeValue translogSize; - private volatile int concurrentStreams; private volatile int concurrentSmallFileStreams; private final ThreadPoolExecutor concurrentStreamPool; @@ -94,16 +84,10 @@ public class RecoverySettings extends AbstractComponent implements Closeable { private volatile TimeValue internalActionTimeout; private volatile TimeValue internalActionLongTimeout; - @Inject public RecoverySettings(Settings settings, NodeSettingsService nodeSettingsService) { super(settings); - this.fileChunkSize = settings.getAsBytesSize(INDICES_RECOVERY_FILE_CHUNK_SIZE, new ByteSizeValue(512, ByteSizeUnit.KB)); - this.translogOps = settings.getAsInt(INDICES_RECOVERY_TRANSLOG_OPS, 1000); - this.translogSize = settings.getAsBytesSize(INDICES_RECOVERY_TRANSLOG_SIZE, new ByteSizeValue(512, ByteSizeUnit.KB)); - this.compress = settings.getAsBoolean(INDICES_RECOVERY_COMPRESS, true); - this.retryDelayStateSync = settings.getAsTime(INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC, TimeValue.timeValueMillis(500)); // doesn't have to be fast as nodes are reconnected every 10s by default (see InternalClusterService.ReconnectToNodes) // and we want to give the master time to remove a faulty node @@ -132,8 +116,8 @@ public class RecoverySettings extends AbstractComponent implements Closeable { rateLimiter = new SimpleRateLimiter(maxBytesPerSec.mbFrac()); } - logger.debug("using max_bytes_per_sec[{}], concurrent_streams [{}], file_chunk_size [{}], translog_size [{}], translog_ops [{}], and compress [{}]", - maxBytesPerSec, concurrentStreams, fileChunkSize, translogSize, translogOps, compress); + logger.debug("using max_bytes_per_sec[{}], concurrent_streams [{}], file_chunk_size [{}], translog_size [{}]", + maxBytesPerSec, concurrentStreams); nodeSettingsService.addListener(new ApplySettings()); } @@ -144,26 +128,6 @@ public class RecoverySettings extends AbstractComponent implements Closeable { ThreadPool.terminate(concurrentSmallFileStreamPool, 1, TimeUnit.SECONDS); } - public ByteSizeValue fileChunkSize() { - return fileChunkSize; - } - - public boolean compress() { - return compress; - } - - public int translogOps() { - return translogOps; - } - - public ByteSizeValue translogSize() { - return translogSize; - } - - public int concurrentStreams() { - return concurrentStreams; - } - public ThreadPoolExecutor concurrentStreamPool() { return concurrentStreamPool; } @@ -213,30 +177,6 @@ public class RecoverySettings extends AbstractComponent implements Closeable { } } - ByteSizeValue fileChunkSize = settings.getAsBytesSize(INDICES_RECOVERY_FILE_CHUNK_SIZE, RecoverySettings.this.fileChunkSize); - if (!fileChunkSize.equals(RecoverySettings.this.fileChunkSize)) { - logger.info("updating [indices.recovery.file_chunk_size] from [{}] to [{}]", RecoverySettings.this.fileChunkSize, fileChunkSize); - RecoverySettings.this.fileChunkSize = fileChunkSize; - } - - int translogOps = settings.getAsInt(INDICES_RECOVERY_TRANSLOG_OPS, RecoverySettings.this.translogOps); - if (translogOps != RecoverySettings.this.translogOps) { - logger.info("updating [indices.recovery.translog_ops] from [{}] to [{}]", RecoverySettings.this.translogOps, translogOps); - RecoverySettings.this.translogOps = translogOps; - } - - ByteSizeValue translogSize = settings.getAsBytesSize(INDICES_RECOVERY_TRANSLOG_SIZE, RecoverySettings.this.translogSize); - if (!translogSize.equals(RecoverySettings.this.translogSize)) { - logger.info("updating [indices.recovery.translog_size] from [{}] to [{}]", RecoverySettings.this.translogSize, translogSize); - RecoverySettings.this.translogSize = translogSize; - } - - boolean compress = settings.getAsBoolean(INDICES_RECOVERY_COMPRESS, RecoverySettings.this.compress); - if (compress != RecoverySettings.this.compress) { - logger.info("updating [indices.recovery.compress] from [{}] to [{}]", RecoverySettings.this.compress, compress); - RecoverySettings.this.compress = compress; - } - int concurrentStreams = settings.getAsInt(INDICES_RECOVERY_CONCURRENT_STREAMS, RecoverySettings.this.concurrentStreams); if (concurrentStreams != RecoverySettings.this.concurrentStreams) { logger.info("updating [indices.recovery.concurrent_streams] from [{}] to [{}]", RecoverySettings.this.concurrentStreams, concurrentStreams); diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 73193161d12..138e49e5dae 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -49,6 +49,7 @@ import org.elasticsearch.transport.RemoteTransportException; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportService; +import java.io.BufferedOutputStream; import java.io.IOException; import java.io.OutputStream; import java.util.ArrayList; @@ -68,6 +69,7 @@ import java.util.stream.StreamSupport; */ public class RecoverySourceHandler { + private static final int CHUNK_SIZE = 512 * 1000; // 512KB protected final ESLogger logger; // Shard that is going to be recovered (the "source") private final IndexShard shard; @@ -79,7 +81,6 @@ public class RecoverySourceHandler { private final TransportService transportService; protected final RecoveryResponse response; - private final TransportRequestOptions requestOptions; private final CancellableThreads cancellableThreads = new CancellableThreads() { @Override @@ -108,12 +109,6 @@ public class RecoverySourceHandler { this.shardId = this.request.shardId().id(); this.response = new RecoveryResponse(); - this.requestOptions = TransportRequestOptions.builder() - .withCompress(recoverySettings.compress()) - .withType(TransportRequestOptions.Type.RECOVERY) - .withTimeout(recoverySettings.internalActionTimeout()) - .build(); - } /** @@ -218,7 +213,7 @@ public class RecoverySourceHandler { totalSize += md.length(); } List phase1Files = new ArrayList<>(diff.different.size() + diff.missing.size()); - phase1Files.addAll(diff.different); + phase1Files.addAll(diff.different); phase1Files.addAll(diff.missing); for (StoreFileMetaData md : phase1Files) { if (request.metadataSnapshot().asMap().containsKey(md.name())) { @@ -249,7 +244,7 @@ public class RecoverySourceHandler { }); // How many bytes we've copied since we last called RateLimiter.pause final AtomicLong bytesSinceLastPause = new AtomicLong(); - final Function outputStreamFactories = (md) -> new RecoveryOutputStream(md, bytesSinceLastPause, translogView); + final Function outputStreamFactories = (md) -> new BufferedOutputStream(new RecoveryOutputStream(md, bytesSinceLastPause, translogView), CHUNK_SIZE); sendFiles(store, phase1Files.toArray(new StoreFileMetaData[phase1Files.size()]), outputStreamFactories); cancellableThreads.execute(() -> { // Send the CLEAN_FILES request, which takes all of the files that @@ -432,7 +427,7 @@ public class RecoverySourceHandler { } final TransportRequestOptions recoveryOptions = TransportRequestOptions.builder() - .withCompress(recoverySettings.compress()) + .withCompress(true) .withType(TransportRequestOptions.Type.RECOVERY) .withTimeout(recoverySettings.internalActionLongTimeout()) .build(); @@ -451,9 +446,9 @@ public class RecoverySourceHandler { size += operation.estimateSize(); totalOperations++; - // Check if this request is past the size or bytes threshold, and + // Check if this request is past bytes threshold, and // if so, send it off - if (ops >= recoverySettings.translogOps() || size >= recoverySettings.translogSize().bytes()) { + if (size >= CHUNK_SIZE) { // don't throttle translog, since we lock for phase3 indexing, // so we need to move it as fast as possible. Note, since we @@ -548,6 +543,11 @@ public class RecoverySourceHandler { } private void sendNextChunk(long position, BytesArray content, boolean lastChunk) throws IOException { + final TransportRequestOptions chunkSendOptions = TransportRequestOptions.builder() + .withCompress(false) + .withType(TransportRequestOptions.Type.RECOVERY) + .withTimeout(recoverySettings.internalActionTimeout()) + .build(); cancellableThreads.execute(() -> { // Pause using the rate limiter, if desired, to throttle the recovery final long throttleTimeInNanos; @@ -577,7 +577,7 @@ public class RecoverySourceHandler { * see how many translog ops we accumulate while copying files across the network. A future optimization * would be in to restart file copy again (new deltas) if we have too many translog ops are piling up. */ - throttleTimeInNanos), requestOptions, EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); + throttleTimeInNanos), chunkSendOptions, EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); }); if (shard.state() == IndexShardState.CLOSED) { // check if the shard got closed on us throw new IndexShardClosedException(request.shardId()); diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java b/core/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java index 5ece413f796..52e8c6a81ba 100644 --- a/core/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java +++ b/core/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java @@ -143,7 +143,6 @@ public class IndexRecoveryIT extends ESIntegTestCase { .setTransientSettings(Settings.builder() // one chunk per sec.. .put(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC, chunkSize, ByteSizeUnit.BYTES) - .put(RecoverySettings.INDICES_RECOVERY_FILE_CHUNK_SIZE, chunkSize, ByteSizeUnit.BYTES) ) .get().isAcknowledged()); } @@ -152,7 +151,6 @@ public class IndexRecoveryIT extends ESIntegTestCase { assertTrue(client().admin().cluster().prepareUpdateSettings() .setTransientSettings(Settings.builder() .put(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC, "20mb") - .put(RecoverySettings.INDICES_RECOVERY_FILE_CHUNK_SIZE, "512kb") ) .get().isAcknowledged()); } diff --git a/core/src/test/java/org/elasticsearch/recovery/RecoverySettingsTests.java b/core/src/test/java/org/elasticsearch/recovery/RecoverySettingsTests.java index b0288042ecc..4bcbb8c8ee7 100644 --- a/core/src/test/java/org/elasticsearch/recovery/RecoverySettingsTests.java +++ b/core/src/test/java/org/elasticsearch/recovery/RecoverySettingsTests.java @@ -32,24 +32,6 @@ public class RecoverySettingsTests extends ESSingleNodeTestCase { } public void testAllSettingsAreDynamicallyUpdatable() { - innerTestSettings(RecoverySettings.INDICES_RECOVERY_FILE_CHUNK_SIZE, randomIntBetween(1, 200), ByteSizeUnit.BYTES, new Validator() { - @Override - public void validate(RecoverySettings recoverySettings, int expectedValue) { - assertEquals(expectedValue, recoverySettings.fileChunkSize().bytesAsInt()); - } - }); - innerTestSettings(RecoverySettings.INDICES_RECOVERY_TRANSLOG_OPS, randomIntBetween(1, 200), new Validator() { - @Override - public void validate(RecoverySettings recoverySettings, int expectedValue) { - assertEquals(expectedValue, recoverySettings.translogOps()); - } - }); - innerTestSettings(RecoverySettings.INDICES_RECOVERY_TRANSLOG_SIZE, randomIntBetween(1, 200), ByteSizeUnit.BYTES, new Validator() { - @Override - public void validate(RecoverySettings recoverySettings, int expectedValue) { - assertEquals(expectedValue, recoverySettings.translogSize().bytesAsInt()); - } - }); innerTestSettings(RecoverySettings.INDICES_RECOVERY_CONCURRENT_STREAMS, randomIntBetween(1, 200), new Validator() { @Override public void validate(RecoverySettings recoverySettings, int expectedValue) { @@ -98,13 +80,6 @@ public class RecoverySettingsTests extends ESSingleNodeTestCase { assertEquals(expectedValue, recoverySettings.internalActionLongTimeout().millis()); } }); - - innerTestSettings(RecoverySettings.INDICES_RECOVERY_COMPRESS, false, new Validator() { - @Override - public void validate(RecoverySettings recoverySettings, boolean expectedValue) { - assertEquals(expectedValue, recoverySettings.compress()); - } - }); } private static class Validator { diff --git a/core/src/test/java/org/elasticsearch/recovery/TruncatedRecoveryIT.java b/core/src/test/java/org/elasticsearch/recovery/TruncatedRecoveryIT.java index d94f72ea80f..3198c04121f 100644 --- a/core/src/test/java/org/elasticsearch/recovery/TruncatedRecoveryIT.java +++ b/core/src/test/java/org/elasticsearch/recovery/TruncatedRecoveryIT.java @@ -58,13 +58,6 @@ import static org.hamcrest.Matchers.greaterThanOrEqualTo; @ESIntegTestCase.ClusterScope(numDataNodes = 2, numClientNodes = 0, scope = ESIntegTestCase.Scope.TEST) @SuppressCodecs("*") // test relies on exact file extensions public class TruncatedRecoveryIT extends ESIntegTestCase { - @Override - protected Settings nodeSettings(int nodeOrdinal) { - Settings.Builder builder = Settings.builder() - .put(super.nodeSettings(nodeOrdinal)) - .put(RecoverySettings.INDICES_RECOVERY_FILE_CHUNK_SIZE, new ByteSizeValue(randomIntBetween(50, 300), ByteSizeUnit.BYTES)); - return builder.build(); - } @Override protected Collection> nodePlugins() { diff --git a/test-framework/src/main/java/org/elasticsearch/test/ESBackcompatTestCase.java b/test-framework/src/main/java/org/elasticsearch/test/ESBackcompatTestCase.java index 47e163a6291..3e5c903a1ba 100644 --- a/test-framework/src/main/java/org/elasticsearch/test/ESBackcompatTestCase.java +++ b/test-framework/src/main/java/org/elasticsearch/test/ESBackcompatTestCase.java @@ -131,16 +131,6 @@ public abstract class ESBackcompatTestCase extends ESIntegTestCase { return file; } - @Override - protected Settings.Builder setRandomIndexSettings(Random random, Settings.Builder builder) { - if (globalCompatibilityVersion().before(Version.V_1_3_2)) { - // if we test against nodes before 1.3.2 we disable all the compression due to a known bug - // see #7210 - builder.put(RecoverySettings.INDICES_RECOVERY_COMPRESS, false); - } - return builder; - } - /** * Retruns the tests compatibility version. */ @@ -250,13 +240,6 @@ public abstract class ESBackcompatTestCase extends ESIntegTestCase { Settings.Builder builder = Settings.builder().put(requiredSettings()); builder.put(TransportModule.TRANSPORT_TYPE_KEY, "netty"); // run same transport / disco as external builder.put("node.mode", "network"); - - if (compatibilityVersion().before(Version.V_1_3_2)) { - // if we test against nodes before 1.3.2 we disable all the compression due to a known bug - // see #7210 - builder.put(Transport.TransportSettings.TRANSPORT_TCP_COMPRESS, false) - .put(RecoverySettings.INDICES_RECOVERY_COMPRESS, false); - } return builder.build(); } diff --git a/test-framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test-framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index b86c8689699..7ae3226b66a 100644 --- a/test-framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test-framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -447,10 +447,6 @@ public final class InternalTestCluster extends TestCluster { } } - if (random.nextBoolean()) { - builder.put(RecoverySettings.INDICES_RECOVERY_COMPRESS, random.nextBoolean()); - } - if (random.nextBoolean()) { builder.put(NettyTransport.PING_SCHEDULE, RandomInts.randomIntBetween(random, 100, 2000) + "ms"); } @@ -1554,7 +1550,7 @@ public final class InternalTestCluster extends TestCluster { for (int i = 0; i < numNodes; i++) { asyncs.add(startNodeAsync(settings, version)); } - + return () -> { List ids = new ArrayList<>(); for (Async async : asyncs) {