From 414c04eb66fb7619b5280e726bc91aac52dd7fc3 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Fri, 4 Dec 2015 10:08:11 +0100 Subject: [PATCH 1/7] Restore chunksize of 512kb on recovery and remove configurability This commit restores the chunk size of 512kb lost in a previous but unreleased refactoring. At the same time it removes the configurability of: * `indices.recovery.file_chunk_size` - now fixed to 512kb * `indices.recovery.translog_ops` - removed without replacement * `indices.recovery.translog_size` - now fixed to 512kb * `indices.recovery.compress` - file chunks are not compressed due to lucene's compression but translog operations are. The compress option is gone entirely and compression is used where it makes sense. On sending files of the index we don't compress as we rely on the lucene compression for stored fields etc. Relates to #15161 --- .../elasticsearch/cluster/ClusterModule.java | 4 -- .../cluster/metadata/MetaData.java | 2 - .../indices/recovery/RecoverySettings.java | 64 +------------------ .../recovery/RecoverySourceHandler.java | 26 ++++---- .../indices/recovery/IndexRecoveryIT.java | 2 - .../recovery/RecoverySettingsTests.java | 25 -------- .../recovery/TruncatedRecoveryIT.java | 7 -- .../test/ESBackcompatTestCase.java | 17 ----- .../test/InternalTestCluster.java | 6 +- 9 files changed, 16 insertions(+), 137 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/cluster/ClusterModule.java b/core/src/main/java/org/elasticsearch/cluster/ClusterModule.java index b2e793ba0ab..2b7786fdb1d 100644 --- a/core/src/main/java/org/elasticsearch/cluster/ClusterModule.java +++ b/core/src/main/java/org/elasticsearch/cluster/ClusterModule.java @@ -164,10 +164,6 @@ public class ClusterModule extends AbstractModule { registerClusterDynamicSetting(IndicesTTLService.INDICES_TTL_INTERVAL, Validator.TIME); registerClusterDynamicSetting(MappingUpdatedAction.INDICES_MAPPING_DYNAMIC_TIMEOUT, Validator.TIME); registerClusterDynamicSetting(MetaData.SETTING_READ_ONLY, Validator.EMPTY); - registerClusterDynamicSetting(RecoverySettings.INDICES_RECOVERY_FILE_CHUNK_SIZE, Validator.POSITIVE_BYTES_SIZE); - registerClusterDynamicSetting(RecoverySettings.INDICES_RECOVERY_TRANSLOG_OPS, Validator.INTEGER); - registerClusterDynamicSetting(RecoverySettings.INDICES_RECOVERY_TRANSLOG_SIZE, Validator.BYTES_SIZE); - registerClusterDynamicSetting(RecoverySettings.INDICES_RECOVERY_COMPRESS, Validator.EMPTY); registerClusterDynamicSetting(RecoverySettings.INDICES_RECOVERY_CONCURRENT_STREAMS, Validator.POSITIVE_INTEGER); registerClusterDynamicSetting(RecoverySettings.INDICES_RECOVERY_CONCURRENT_SMALL_FILE_STREAMS, Validator.POSITIVE_INTEGER); registerClusterDynamicSetting(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC, Validator.BYTES_SIZE); diff --git a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java index 68fa6e45d88..751f8a09ea5 100644 --- a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java +++ b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java @@ -746,8 +746,6 @@ public class MetaData implements Iterable, Diffable, Fr /** All known byte-sized cluster settings. */ public static final Set CLUSTER_BYTES_SIZE_SETTINGS = unmodifiableSet(newHashSet( IndexStoreConfig.INDICES_STORE_THROTTLE_MAX_BYTES_PER_SEC, - RecoverySettings.INDICES_RECOVERY_FILE_CHUNK_SIZE, - RecoverySettings.INDICES_RECOVERY_TRANSLOG_SIZE, RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC)); diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java index 749ba4f3360..2841049eedc 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java @@ -40,10 +40,6 @@ import java.util.concurrent.TimeUnit; */ public class RecoverySettings extends AbstractComponent implements Closeable { - public static final String INDICES_RECOVERY_FILE_CHUNK_SIZE = "indices.recovery.file_chunk_size"; - public static final String INDICES_RECOVERY_TRANSLOG_OPS = "indices.recovery.translog_ops"; - public static final String INDICES_RECOVERY_TRANSLOG_SIZE = "indices.recovery.translog_size"; - public static final String INDICES_RECOVERY_COMPRESS = "indices.recovery.compress"; public static final String INDICES_RECOVERY_CONCURRENT_STREAMS = "indices.recovery.concurrent_streams"; public static final String INDICES_RECOVERY_CONCURRENT_SMALL_FILE_STREAMS = "indices.recovery.concurrent_small_file_streams"; public static final String INDICES_RECOVERY_MAX_BYTES_PER_SEC = "indices.recovery.max_bytes_per_sec"; @@ -75,12 +71,6 @@ public class RecoverySettings extends AbstractComponent implements Closeable { public static final long SMALL_FILE_CUTOFF_BYTES = ByteSizeValue.parseBytesSizeValue("5mb", "SMALL_FILE_CUTOFF_BYTES").bytes(); - private volatile ByteSizeValue fileChunkSize; - - private volatile boolean compress; - private volatile int translogOps; - private volatile ByteSizeValue translogSize; - private volatile int concurrentStreams; private volatile int concurrentSmallFileStreams; private final ThreadPoolExecutor concurrentStreamPool; @@ -94,16 +84,10 @@ public class RecoverySettings extends AbstractComponent implements Closeable { private volatile TimeValue internalActionTimeout; private volatile TimeValue internalActionLongTimeout; - @Inject public RecoverySettings(Settings settings, NodeSettingsService nodeSettingsService) { super(settings); - this.fileChunkSize = settings.getAsBytesSize(INDICES_RECOVERY_FILE_CHUNK_SIZE, new ByteSizeValue(512, ByteSizeUnit.KB)); - this.translogOps = settings.getAsInt(INDICES_RECOVERY_TRANSLOG_OPS, 1000); - this.translogSize = settings.getAsBytesSize(INDICES_RECOVERY_TRANSLOG_SIZE, new ByteSizeValue(512, ByteSizeUnit.KB)); - this.compress = settings.getAsBoolean(INDICES_RECOVERY_COMPRESS, true); - this.retryDelayStateSync = settings.getAsTime(INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC, TimeValue.timeValueMillis(500)); // doesn't have to be fast as nodes are reconnected every 10s by default (see InternalClusterService.ReconnectToNodes) // and we want to give the master time to remove a faulty node @@ -132,8 +116,8 @@ public class RecoverySettings extends AbstractComponent implements Closeable { rateLimiter = new SimpleRateLimiter(maxBytesPerSec.mbFrac()); } - logger.debug("using max_bytes_per_sec[{}], concurrent_streams [{}], file_chunk_size [{}], translog_size [{}], translog_ops [{}], and compress [{}]", - maxBytesPerSec, concurrentStreams, fileChunkSize, translogSize, translogOps, compress); + logger.debug("using max_bytes_per_sec[{}], concurrent_streams [{}], file_chunk_size [{}], translog_size [{}]", + maxBytesPerSec, concurrentStreams); nodeSettingsService.addListener(new ApplySettings()); } @@ -144,26 +128,6 @@ public class RecoverySettings extends AbstractComponent implements Closeable { ThreadPool.terminate(concurrentSmallFileStreamPool, 1, TimeUnit.SECONDS); } - public ByteSizeValue fileChunkSize() { - return fileChunkSize; - } - - public boolean compress() { - return compress; - } - - public int translogOps() { - return translogOps; - } - - public ByteSizeValue translogSize() { - return translogSize; - } - - public int concurrentStreams() { - return concurrentStreams; - } - public ThreadPoolExecutor concurrentStreamPool() { return concurrentStreamPool; } @@ -213,30 +177,6 @@ public class RecoverySettings extends AbstractComponent implements Closeable { } } - ByteSizeValue fileChunkSize = settings.getAsBytesSize(INDICES_RECOVERY_FILE_CHUNK_SIZE, RecoverySettings.this.fileChunkSize); - if (!fileChunkSize.equals(RecoverySettings.this.fileChunkSize)) { - logger.info("updating [indices.recovery.file_chunk_size] from [{}] to [{}]", RecoverySettings.this.fileChunkSize, fileChunkSize); - RecoverySettings.this.fileChunkSize = fileChunkSize; - } - - int translogOps = settings.getAsInt(INDICES_RECOVERY_TRANSLOG_OPS, RecoverySettings.this.translogOps); - if (translogOps != RecoverySettings.this.translogOps) { - logger.info("updating [indices.recovery.translog_ops] from [{}] to [{}]", RecoverySettings.this.translogOps, translogOps); - RecoverySettings.this.translogOps = translogOps; - } - - ByteSizeValue translogSize = settings.getAsBytesSize(INDICES_RECOVERY_TRANSLOG_SIZE, RecoverySettings.this.translogSize); - if (!translogSize.equals(RecoverySettings.this.translogSize)) { - logger.info("updating [indices.recovery.translog_size] from [{}] to [{}]", RecoverySettings.this.translogSize, translogSize); - RecoverySettings.this.translogSize = translogSize; - } - - boolean compress = settings.getAsBoolean(INDICES_RECOVERY_COMPRESS, RecoverySettings.this.compress); - if (compress != RecoverySettings.this.compress) { - logger.info("updating [indices.recovery.compress] from [{}] to [{}]", RecoverySettings.this.compress, compress); - RecoverySettings.this.compress = compress; - } - int concurrentStreams = settings.getAsInt(INDICES_RECOVERY_CONCURRENT_STREAMS, RecoverySettings.this.concurrentStreams); if (concurrentStreams != RecoverySettings.this.concurrentStreams) { logger.info("updating [indices.recovery.concurrent_streams] from [{}] to [{}]", RecoverySettings.this.concurrentStreams, concurrentStreams); diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 73193161d12..138e49e5dae 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -49,6 +49,7 @@ import org.elasticsearch.transport.RemoteTransportException; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportService; +import java.io.BufferedOutputStream; import java.io.IOException; import java.io.OutputStream; import java.util.ArrayList; @@ -68,6 +69,7 @@ import java.util.stream.StreamSupport; */ public class RecoverySourceHandler { + private static final int CHUNK_SIZE = 512 * 1000; // 512KB protected final ESLogger logger; // Shard that is going to be recovered (the "source") private final IndexShard shard; @@ -79,7 +81,6 @@ public class RecoverySourceHandler { private final TransportService transportService; protected final RecoveryResponse response; - private final TransportRequestOptions requestOptions; private final CancellableThreads cancellableThreads = new CancellableThreads() { @Override @@ -108,12 +109,6 @@ public class RecoverySourceHandler { this.shardId = this.request.shardId().id(); this.response = new RecoveryResponse(); - this.requestOptions = TransportRequestOptions.builder() - .withCompress(recoverySettings.compress()) - .withType(TransportRequestOptions.Type.RECOVERY) - .withTimeout(recoverySettings.internalActionTimeout()) - .build(); - } /** @@ -218,7 +213,7 @@ public class RecoverySourceHandler { totalSize += md.length(); } List phase1Files = new ArrayList<>(diff.different.size() + diff.missing.size()); - phase1Files.addAll(diff.different); + phase1Files.addAll(diff.different); phase1Files.addAll(diff.missing); for (StoreFileMetaData md : phase1Files) { if (request.metadataSnapshot().asMap().containsKey(md.name())) { @@ -249,7 +244,7 @@ public class RecoverySourceHandler { }); // How many bytes we've copied since we last called RateLimiter.pause final AtomicLong bytesSinceLastPause = new AtomicLong(); - final Function outputStreamFactories = (md) -> new RecoveryOutputStream(md, bytesSinceLastPause, translogView); + final Function outputStreamFactories = (md) -> new BufferedOutputStream(new RecoveryOutputStream(md, bytesSinceLastPause, translogView), CHUNK_SIZE); sendFiles(store, phase1Files.toArray(new StoreFileMetaData[phase1Files.size()]), outputStreamFactories); cancellableThreads.execute(() -> { // Send the CLEAN_FILES request, which takes all of the files that @@ -432,7 +427,7 @@ public class RecoverySourceHandler { } final TransportRequestOptions recoveryOptions = TransportRequestOptions.builder() - .withCompress(recoverySettings.compress()) + .withCompress(true) .withType(TransportRequestOptions.Type.RECOVERY) .withTimeout(recoverySettings.internalActionLongTimeout()) .build(); @@ -451,9 +446,9 @@ public class RecoverySourceHandler { size += operation.estimateSize(); totalOperations++; - // Check if this request is past the size or bytes threshold, and + // Check if this request is past bytes threshold, and // if so, send it off - if (ops >= recoverySettings.translogOps() || size >= recoverySettings.translogSize().bytes()) { + if (size >= CHUNK_SIZE) { // don't throttle translog, since we lock for phase3 indexing, // so we need to move it as fast as possible. Note, since we @@ -548,6 +543,11 @@ public class RecoverySourceHandler { } private void sendNextChunk(long position, BytesArray content, boolean lastChunk) throws IOException { + final TransportRequestOptions chunkSendOptions = TransportRequestOptions.builder() + .withCompress(false) + .withType(TransportRequestOptions.Type.RECOVERY) + .withTimeout(recoverySettings.internalActionTimeout()) + .build(); cancellableThreads.execute(() -> { // Pause using the rate limiter, if desired, to throttle the recovery final long throttleTimeInNanos; @@ -577,7 +577,7 @@ public class RecoverySourceHandler { * see how many translog ops we accumulate while copying files across the network. A future optimization * would be in to restart file copy again (new deltas) if we have too many translog ops are piling up. */ - throttleTimeInNanos), requestOptions, EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); + throttleTimeInNanos), chunkSendOptions, EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); }); if (shard.state() == IndexShardState.CLOSED) { // check if the shard got closed on us throw new IndexShardClosedException(request.shardId()); diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java b/core/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java index 5ece413f796..52e8c6a81ba 100644 --- a/core/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java +++ b/core/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java @@ -143,7 +143,6 @@ public class IndexRecoveryIT extends ESIntegTestCase { .setTransientSettings(Settings.builder() // one chunk per sec.. .put(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC, chunkSize, ByteSizeUnit.BYTES) - .put(RecoverySettings.INDICES_RECOVERY_FILE_CHUNK_SIZE, chunkSize, ByteSizeUnit.BYTES) ) .get().isAcknowledged()); } @@ -152,7 +151,6 @@ public class IndexRecoveryIT extends ESIntegTestCase { assertTrue(client().admin().cluster().prepareUpdateSettings() .setTransientSettings(Settings.builder() .put(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC, "20mb") - .put(RecoverySettings.INDICES_RECOVERY_FILE_CHUNK_SIZE, "512kb") ) .get().isAcknowledged()); } diff --git a/core/src/test/java/org/elasticsearch/recovery/RecoverySettingsTests.java b/core/src/test/java/org/elasticsearch/recovery/RecoverySettingsTests.java index b0288042ecc..4bcbb8c8ee7 100644 --- a/core/src/test/java/org/elasticsearch/recovery/RecoverySettingsTests.java +++ b/core/src/test/java/org/elasticsearch/recovery/RecoverySettingsTests.java @@ -32,24 +32,6 @@ public class RecoverySettingsTests extends ESSingleNodeTestCase { } public void testAllSettingsAreDynamicallyUpdatable() { - innerTestSettings(RecoverySettings.INDICES_RECOVERY_FILE_CHUNK_SIZE, randomIntBetween(1, 200), ByteSizeUnit.BYTES, new Validator() { - @Override - public void validate(RecoverySettings recoverySettings, int expectedValue) { - assertEquals(expectedValue, recoverySettings.fileChunkSize().bytesAsInt()); - } - }); - innerTestSettings(RecoverySettings.INDICES_RECOVERY_TRANSLOG_OPS, randomIntBetween(1, 200), new Validator() { - @Override - public void validate(RecoverySettings recoverySettings, int expectedValue) { - assertEquals(expectedValue, recoverySettings.translogOps()); - } - }); - innerTestSettings(RecoverySettings.INDICES_RECOVERY_TRANSLOG_SIZE, randomIntBetween(1, 200), ByteSizeUnit.BYTES, new Validator() { - @Override - public void validate(RecoverySettings recoverySettings, int expectedValue) { - assertEquals(expectedValue, recoverySettings.translogSize().bytesAsInt()); - } - }); innerTestSettings(RecoverySettings.INDICES_RECOVERY_CONCURRENT_STREAMS, randomIntBetween(1, 200), new Validator() { @Override public void validate(RecoverySettings recoverySettings, int expectedValue) { @@ -98,13 +80,6 @@ public class RecoverySettingsTests extends ESSingleNodeTestCase { assertEquals(expectedValue, recoverySettings.internalActionLongTimeout().millis()); } }); - - innerTestSettings(RecoverySettings.INDICES_RECOVERY_COMPRESS, false, new Validator() { - @Override - public void validate(RecoverySettings recoverySettings, boolean expectedValue) { - assertEquals(expectedValue, recoverySettings.compress()); - } - }); } private static class Validator { diff --git a/core/src/test/java/org/elasticsearch/recovery/TruncatedRecoveryIT.java b/core/src/test/java/org/elasticsearch/recovery/TruncatedRecoveryIT.java index d94f72ea80f..3198c04121f 100644 --- a/core/src/test/java/org/elasticsearch/recovery/TruncatedRecoveryIT.java +++ b/core/src/test/java/org/elasticsearch/recovery/TruncatedRecoveryIT.java @@ -58,13 +58,6 @@ import static org.hamcrest.Matchers.greaterThanOrEqualTo; @ESIntegTestCase.ClusterScope(numDataNodes = 2, numClientNodes = 0, scope = ESIntegTestCase.Scope.TEST) @SuppressCodecs("*") // test relies on exact file extensions public class TruncatedRecoveryIT extends ESIntegTestCase { - @Override - protected Settings nodeSettings(int nodeOrdinal) { - Settings.Builder builder = Settings.builder() - .put(super.nodeSettings(nodeOrdinal)) - .put(RecoverySettings.INDICES_RECOVERY_FILE_CHUNK_SIZE, new ByteSizeValue(randomIntBetween(50, 300), ByteSizeUnit.BYTES)); - return builder.build(); - } @Override protected Collection> nodePlugins() { diff --git a/test-framework/src/main/java/org/elasticsearch/test/ESBackcompatTestCase.java b/test-framework/src/main/java/org/elasticsearch/test/ESBackcompatTestCase.java index 47e163a6291..3e5c903a1ba 100644 --- a/test-framework/src/main/java/org/elasticsearch/test/ESBackcompatTestCase.java +++ b/test-framework/src/main/java/org/elasticsearch/test/ESBackcompatTestCase.java @@ -131,16 +131,6 @@ public abstract class ESBackcompatTestCase extends ESIntegTestCase { return file; } - @Override - protected Settings.Builder setRandomIndexSettings(Random random, Settings.Builder builder) { - if (globalCompatibilityVersion().before(Version.V_1_3_2)) { - // if we test against nodes before 1.3.2 we disable all the compression due to a known bug - // see #7210 - builder.put(RecoverySettings.INDICES_RECOVERY_COMPRESS, false); - } - return builder; - } - /** * Retruns the tests compatibility version. */ @@ -250,13 +240,6 @@ public abstract class ESBackcompatTestCase extends ESIntegTestCase { Settings.Builder builder = Settings.builder().put(requiredSettings()); builder.put(TransportModule.TRANSPORT_TYPE_KEY, "netty"); // run same transport / disco as external builder.put("node.mode", "network"); - - if (compatibilityVersion().before(Version.V_1_3_2)) { - // if we test against nodes before 1.3.2 we disable all the compression due to a known bug - // see #7210 - builder.put(Transport.TransportSettings.TRANSPORT_TCP_COMPRESS, false) - .put(RecoverySettings.INDICES_RECOVERY_COMPRESS, false); - } return builder.build(); } diff --git a/test-framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test-framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index b86c8689699..7ae3226b66a 100644 --- a/test-framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test-framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -447,10 +447,6 @@ public final class InternalTestCluster extends TestCluster { } } - if (random.nextBoolean()) { - builder.put(RecoverySettings.INDICES_RECOVERY_COMPRESS, random.nextBoolean()); - } - if (random.nextBoolean()) { builder.put(NettyTransport.PING_SCHEDULE, RandomInts.randomIntBetween(random, 100, 2000) + "ms"); } @@ -1554,7 +1550,7 @@ public final class InternalTestCluster extends TestCluster { for (int i = 0; i < numNodes; i++) { asyncs.add(startNodeAsync(settings, version)); } - + return () -> { List ids = new ArrayList<>(); for (Async async : asyncs) { From 37b60bd76bf86375684560d492fd02899922b196 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Mon, 7 Dec 2015 10:19:04 +0100 Subject: [PATCH 2/7] more accurate chunk size --- .../elasticsearch/indices/recovery/RecoverySourceHandler.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 138e49e5dae..d9f697ec6a1 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -36,7 +36,9 @@ import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.lucene.store.InputStreamIndexInput; +import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.CancellableThreads; import org.elasticsearch.common.util.CancellableThreads.Interruptable; import org.elasticsearch.index.engine.RecoveryEngineException; @@ -69,7 +71,7 @@ import java.util.stream.StreamSupport; */ public class RecoverySourceHandler { - private static final int CHUNK_SIZE = 512 * 1000; // 512KB + private static final int CHUNK_SIZE = new ByteSizeValue(512, ByteSizeUnit.KB).bytesAsInt(); protected final ESLogger logger; // Shard that is going to be recovered (the "source") private final IndexShard shard; From 8b8de5cb4b6398e3b9655a53fcc0b0b9010fc37d Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Tue, 8 Dec 2015 11:54:37 +0100 Subject: [PATCH 3/7] Don't send chunks after a previous send call failed. --- .../recovery/RecoverySourceHandler.java | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index d9f697ec6a1..b78f8523f1a 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -525,6 +525,7 @@ public class RecoverySourceHandler { private final AtomicLong bytesSinceLastPause; private final Translog.View translogView; private long position = 0; + private boolean failed = false; RecoveryOutputStream(StoreFileMetaData md, AtomicLong bytesSinceLastPause, Translog.View translogView) { this.md = md; @@ -539,9 +540,21 @@ public class RecoverySourceHandler { @Override public final void write(byte[] b, int offset, int length) throws IOException { - sendNextChunk(position, new BytesArray(b, offset, length), md.length() == position + length); - position += length; - assert md.length() >= position : "length: " + md.length() + " but positions was: " + position; + if (failed == false) { + /* since we are an outputstream a wrapper might get flushed on close after we threw an exception. + * that might cause another exception from the other side of the recovery since we are in a bad state + * due to a corrupted file stream etc. the biggest issue is that we will turn into a loop of exceptions + * and we will always suppress the original one which might cause the recovery to retry over and over again. + * To prevent this we try to not send chunks again after we failed once.*/ + try { + sendNextChunk(position, new BytesArray(b, offset, length), md.length() == position + length); + position += length; + assert md.length() >= position : "length: " + md.length() + " but positions was: " + position; + } catch (Exception e) { + failed = true; + throw e; + } + } } private void sendNextChunk(long position, BytesArray content, boolean lastChunk) throws IOException { From 449f9e725896260ff62310f8c046b2d62df1ee97 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Tue, 8 Dec 2015 12:41:19 +0100 Subject: [PATCH 4/7] don't allow writing single bytes on recovery outputstream --- .../indices/recovery/RecoverySourceHandler.java | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index b78f8523f1a..0e1efaf812c 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -535,7 +535,7 @@ public class RecoverySourceHandler { @Override public final void write(int b) throws IOException { - write(new byte[]{(byte) b}, 0, 1); + throw new UnsupportedOperationException("we can't send single bytes over the wire"); } @Override @@ -546,13 +546,16 @@ public class RecoverySourceHandler { * due to a corrupted file stream etc. the biggest issue is that we will turn into a loop of exceptions * and we will always suppress the original one which might cause the recovery to retry over and over again. * To prevent this we try to not send chunks again after we failed once.*/ + boolean success = false; try { sendNextChunk(position, new BytesArray(b, offset, length), md.length() == position + length); position += length; assert md.length() >= position : "length: " + md.length() + " but positions was: " + position; - } catch (Exception e) { - failed = true; - throw e; + success = true; + } finally { + if (success == false) { + failed = true; + } } } } From 7d6663e6e5f33ff91731e87e77611da7026c7833 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Tue, 8 Dec 2015 14:32:48 +0100 Subject: [PATCH 5/7] use AtomicBoolean instead of a simple boolean --- .../indices/recovery/RecoverySourceHandler.java | 7 ++++--- .../elasticsearch/indices/recovery/IndexRecoveryIT.java | 2 +- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 0e1efaf812c..ffd2c8edc6b 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -60,6 +60,7 @@ import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; import java.util.stream.StreamSupport; @@ -525,7 +526,7 @@ public class RecoverySourceHandler { private final AtomicLong bytesSinceLastPause; private final Translog.View translogView; private long position = 0; - private boolean failed = false; + private final AtomicBoolean failed = new AtomicBoolean(false); RecoveryOutputStream(StoreFileMetaData md, AtomicLong bytesSinceLastPause, Translog.View translogView) { this.md = md; @@ -540,7 +541,7 @@ public class RecoverySourceHandler { @Override public final void write(byte[] b, int offset, int length) throws IOException { - if (failed == false) { + if (failed.get() == false) { /* since we are an outputstream a wrapper might get flushed on close after we threw an exception. * that might cause another exception from the other side of the recovery since we are in a bad state * due to a corrupted file stream etc. the biggest issue is that we will turn into a loop of exceptions @@ -554,7 +555,7 @@ public class RecoverySourceHandler { success = true; } finally { if (success == false) { - failed = true; + failed.compareAndSet(false, true); } } } diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java b/core/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java index 52e8c6a81ba..ae63c8d4b92 100644 --- a/core/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java +++ b/core/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java @@ -138,7 +138,7 @@ public class IndexRecoveryIT extends ESIntegTestCase { } private void slowDownRecovery(ByteSizeValue shardSize) { - long chunkSize = shardSize.bytes() / 10; + long chunkSize = shardSize.bytes() / 5; assertTrue(client().admin().cluster().prepareUpdateSettings() .setTransientSettings(Settings.builder() // one chunk per sec.. From 12f905a6759ce962ab76e89a1c5b06dc650a12cf Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Fri, 11 Dec 2015 09:44:08 +0100 Subject: [PATCH 6/7] Make chunkSize configurabel for tests and use correct close handling for closing streams to not hide original exception --- .../org/elasticsearch/common/io/Streams.java | 32 +++++++--------- .../indices/recovery/RecoverySettings.java | 13 +++++++ .../recovery/RecoverySourceHandler.java | 37 ++++++------------- .../indices/recovery/IndexRecoveryIT.java | 12 +++++- .../recovery/TruncatedRecoveryIT.java | 5 +++ 5 files changed, 54 insertions(+), 45 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/common/io/Streams.java b/core/src/main/java/org/elasticsearch/common/io/Streams.java index 8adf0919e50..36b1d9445b0 100644 --- a/core/src/main/java/org/elasticsearch/common/io/Streams.java +++ b/core/src/main/java/org/elasticsearch/common/io/Streams.java @@ -20,6 +20,8 @@ package org.elasticsearch.common.io; import java.nio.charset.StandardCharsets; + +import org.apache.lucene.util.IOUtils; import org.elasticsearch.common.util.Callback; import java.io.BufferedReader; @@ -68,6 +70,7 @@ public abstract class Streams { public static long copy(InputStream in, OutputStream out, byte[] buffer) throws IOException { Objects.requireNonNull(in, "No InputStream specified"); Objects.requireNonNull(out, "No OutputStream specified"); + boolean success = false; try { long byteCount = 0; int bytesRead; @@ -76,17 +79,13 @@ public abstract class Streams { byteCount += bytesRead; } out.flush(); + success = true; return byteCount; } finally { - try { - in.close(); - } catch (IOException ex) { - // do nothing - } - try { - out.close(); - } catch (IOException ex) { - // do nothing + if (success) { + IOUtils.close(in, out); + } else { + IOUtils.closeWhileHandlingException(in, out); } } } @@ -130,6 +129,7 @@ public abstract class Streams { public static int copy(Reader in, Writer out) throws IOException { Objects.requireNonNull(in, "No Reader specified"); Objects.requireNonNull(out, "No Writer specified"); + boolean success = false; try { int byteCount = 0; char[] buffer = new char[BUFFER_SIZE]; @@ -139,17 +139,13 @@ public abstract class Streams { byteCount += bytesRead; } out.flush(); + success = true; return byteCount; } finally { - try { - in.close(); - } catch (IOException ex) { - // do nothing - } - try { - out.close(); - } catch (IOException ex) { - // do nothing + if (success) { + IOUtils.close(in, out); + } else { + IOUtils.closeWhileHandlingException(in, out); } } } diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java index 2841049eedc..f2f06921680 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java @@ -71,6 +71,8 @@ public class RecoverySettings extends AbstractComponent implements Closeable { public static final long SMALL_FILE_CUTOFF_BYTES = ByteSizeValue.parseBytesSizeValue("5mb", "SMALL_FILE_CUTOFF_BYTES").bytes(); + public static final ByteSizeValue DEFAULT_CHUNK_SIZE = new ByteSizeValue(512, ByteSizeUnit.KB); + private volatile int concurrentStreams; private volatile int concurrentSmallFileStreams; private final ThreadPoolExecutor concurrentStreamPool; @@ -84,6 +86,8 @@ public class RecoverySettings extends AbstractComponent implements Closeable { private volatile TimeValue internalActionTimeout; private volatile TimeValue internalActionLongTimeout; + private volatile ByteSizeValue chunkSize = DEFAULT_CHUNK_SIZE; + @Inject public RecoverySettings(Settings settings, NodeSettingsService nodeSettingsService) { super(settings); @@ -160,6 +164,15 @@ public class RecoverySettings extends AbstractComponent implements Closeable { return internalActionLongTimeout; } + public ByteSizeValue getChunkSize() { return chunkSize; } + + void setChunkSize(ByteSizeValue chunkSize) { // only settable for tests + if (chunkSize.bytesAsInt() <= 0) { + throw new IllegalArgumentException("chunkSize must be > 0"); + } + this.chunkSize = chunkSize; + } + class ApplySettings implements NodeSettingsService.Listener { @Override diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index ffd2c8edc6b..036260624a6 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -72,7 +72,6 @@ import java.util.stream.StreamSupport; */ public class RecoverySourceHandler { - private static final int CHUNK_SIZE = new ByteSizeValue(512, ByteSizeUnit.KB).bytesAsInt(); protected final ESLogger logger; // Shard that is going to be recovered (the "source") private final IndexShard shard; @@ -82,6 +81,7 @@ public class RecoverySourceHandler { private final StartRecoveryRequest request; private final RecoverySettings recoverySettings; private final TransportService transportService; + private final int chunkSizeInBytes; protected final RecoveryResponse response; @@ -110,7 +110,7 @@ public class RecoverySourceHandler { this.transportService = transportService; this.indexName = this.request.shardId().index().name(); this.shardId = this.request.shardId().id(); - + this.chunkSizeInBytes = recoverySettings.getChunkSize().bytesAsInt(); this.response = new RecoveryResponse(); } @@ -247,7 +247,7 @@ public class RecoverySourceHandler { }); // How many bytes we've copied since we last called RateLimiter.pause final AtomicLong bytesSinceLastPause = new AtomicLong(); - final Function outputStreamFactories = (md) -> new BufferedOutputStream(new RecoveryOutputStream(md, bytesSinceLastPause, translogView), CHUNK_SIZE); + final Function outputStreamFactories = (md) -> new BufferedOutputStream(new RecoveryOutputStream(md, bytesSinceLastPause, translogView), chunkSizeInBytes); sendFiles(store, phase1Files.toArray(new StoreFileMetaData[phase1Files.size()]), outputStreamFactories); cancellableThreads.execute(() -> { // Send the CLEAN_FILES request, which takes all of the files that @@ -451,7 +451,7 @@ public class RecoverySourceHandler { // Check if this request is past bytes threshold, and // if so, send it off - if (size >= CHUNK_SIZE) { + if (size >= chunkSizeInBytes) { // don't throttle translog, since we lock for phase3 indexing, // so we need to move it as fast as possible. Note, since we @@ -526,7 +526,6 @@ public class RecoverySourceHandler { private final AtomicLong bytesSinceLastPause; private final Translog.View translogView; private long position = 0; - private final AtomicBoolean failed = new AtomicBoolean(false); RecoveryOutputStream(StoreFileMetaData md, AtomicLong bytesSinceLastPause, Translog.View translogView) { this.md = md; @@ -541,24 +540,9 @@ public class RecoverySourceHandler { @Override public final void write(byte[] b, int offset, int length) throws IOException { - if (failed.get() == false) { - /* since we are an outputstream a wrapper might get flushed on close after we threw an exception. - * that might cause another exception from the other side of the recovery since we are in a bad state - * due to a corrupted file stream etc. the biggest issue is that we will turn into a loop of exceptions - * and we will always suppress the original one which might cause the recovery to retry over and over again. - * To prevent this we try to not send chunks again after we failed once.*/ - boolean success = false; - try { - sendNextChunk(position, new BytesArray(b, offset, length), md.length() == position + length); - position += length; - assert md.length() >= position : "length: " + md.length() + " but positions was: " + position; - success = true; - } finally { - if (success == false) { - failed.compareAndSet(false, true); - } - } - } + sendNextChunk(position, new BytesArray(b, offset, length), md.length() == position + length); + position += length; + assert md.length() >= position : "length: " + md.length() + " but positions was: " + position; } private void sendNextChunk(long position, BytesArray content, boolean lastChunk) throws IOException { @@ -689,9 +673,10 @@ public class RecoverySourceHandler { pool = recoverySettings.concurrentSmallFileStreamPool(); } Future future = pool.submit(() -> { - try (final OutputStream outputStream = outputStreamFactory.apply(md); - final IndexInput indexInput = store.directory().openInput(md.name(), IOContext.READONCE)) { - Streams.copy(new InputStreamIndexInput(indexInput, md.length()), outputStream); + try (final IndexInput indexInput = store.directory().openInput(md.name(), IOContext.READONCE)) { + // it's fine that we are only having the indexInput int he try/with block. The copy methods handles + // exceptions during close correctly and doesn't hide the original exception. + Streams.copy(new InputStreamIndexInput(indexInput, md.length()), outputStreamFactory.apply(md)); } return null; }); diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java b/core/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java index ae63c8d4b92..4cf60289a18 100644 --- a/core/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java +++ b/core/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java @@ -138,7 +138,10 @@ public class IndexRecoveryIT extends ESIntegTestCase { } private void slowDownRecovery(ByteSizeValue shardSize) { - long chunkSize = shardSize.bytes() / 5; + long chunkSize = Math.max(1, shardSize.bytes() / 10); + for(RecoverySettings settings : internalCluster().getInstances(RecoverySettings.class)) { + setChunkSize(settings, new ByteSizeValue(chunkSize, ByteSizeUnit.BYTES)); + } assertTrue(client().admin().cluster().prepareUpdateSettings() .setTransientSettings(Settings.builder() // one chunk per sec.. @@ -148,6 +151,9 @@ public class IndexRecoveryIT extends ESIntegTestCase { } private void restoreRecoverySpeed() { + for(RecoverySettings settings : internalCluster().getInstances(RecoverySettings.class)) { + setChunkSize(settings, RecoverySettings.DEFAULT_CHUNK_SIZE); + } assertTrue(client().admin().cluster().prepareUpdateSettings() .setTransientSettings(Settings.builder() .put(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC, "20mb") @@ -629,4 +635,8 @@ public class IndexRecoveryIT extends ESIntegTestCase { transport.sendRequest(node, requestId, action, request, options); } } + + public static void setChunkSize(RecoverySettings recoverySettings, ByteSizeValue chunksSize) { + recoverySettings.setChunkSize(chunksSize); + } } diff --git a/core/src/test/java/org/elasticsearch/recovery/TruncatedRecoveryIT.java b/core/src/test/java/org/elasticsearch/recovery/TruncatedRecoveryIT.java index 3198c04121f..4d111469505 100644 --- a/core/src/test/java/org/elasticsearch/recovery/TruncatedRecoveryIT.java +++ b/core/src/test/java/org/elasticsearch/recovery/TruncatedRecoveryIT.java @@ -31,6 +31,7 @@ import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.discovery.Discovery; import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.indices.recovery.IndexRecoveryIT; import org.elasticsearch.indices.recovery.RecoveryFileChunkRequest; import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.indices.recovery.RecoveryTarget; @@ -71,6 +72,10 @@ public class TruncatedRecoveryIT extends ESIntegTestCase { * Later we allow full recovery to ensure we can still recover and don't run into corruptions. */ public void testCancelRecoveryAndResume() throws Exception { + for(RecoverySettings settings : internalCluster().getInstances(RecoverySettings.class)) { + IndexRecoveryIT.setChunkSize(settings, new ByteSizeValue(randomIntBetween(50, 300), ByteSizeUnit.BYTES)); + } + NodesStatsResponse nodeStats = client().admin().cluster().prepareNodesStats().get(); List dataNodeStats = new ArrayList<>(); for (NodeStats stat : nodeStats.getNodes()) { From db15682b3ca33de6c79d0d3a8a397ede5f748e7e Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Fri, 11 Dec 2015 15:16:14 +0100 Subject: [PATCH 7/7] apply review comments from @bleskes --- .../org/elasticsearch/indices/recovery/RecoverySettings.java | 2 +- .../elasticsearch/indices/recovery/RecoverySourceHandler.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java index f2f06921680..6db38d59e85 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java @@ -120,7 +120,7 @@ public class RecoverySettings extends AbstractComponent implements Closeable { rateLimiter = new SimpleRateLimiter(maxBytesPerSec.mbFrac()); } - logger.debug("using max_bytes_per_sec[{}], concurrent_streams [{}], file_chunk_size [{}], translog_size [{}]", + logger.debug("using max_bytes_per_sec[{}], concurrent_streams [{}]", maxBytesPerSec, concurrentStreams); nodeSettingsService.addListener(new ApplySettings()); diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 036260624a6..4057af00841 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -547,7 +547,7 @@ public class RecoverySourceHandler { private void sendNextChunk(long position, BytesArray content, boolean lastChunk) throws IOException { final TransportRequestOptions chunkSendOptions = TransportRequestOptions.builder() - .withCompress(false) + .withCompress(false) // lucene files are already compressed and therefore compressing this won't really help much so we are safing the cpu for other things .withType(TransportRequestOptions.Type.RECOVERY) .withTimeout(recoverySettings.internalActionTimeout()) .build();